Skip to content

Commit

Permalink
fix: add cancellation handling in proving (#638)
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder authored Dec 16, 2024
1 parent 7d43ff4 commit 48893f3
Showing 1 changed file with 71 additions and 24 deletions.
95 changes: 71 additions & 24 deletions storage-provider/server/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub enum PipelineError {
SendError(#[from] SendError<PipelineMessage>),
#[error("failed to schedule windowed PoSt")]
SchedulingError,
#[error("Proving cancelled")]
ProvingCancelled,
#[error("Custom error: {0}")]
CustomError(String),
}
Expand Down Expand Up @@ -124,8 +126,18 @@ pub async fn start_pipeline(
trait PipelineOperations {
fn add_piece(&self, state: Arc<PipelineState>, msg: AddPieceMessage, token: CancellationToken);
fn precommit(&self, state: Arc<PipelineState>, msg: PreCommitMessage);
fn prove_commit(&self, state: Arc<PipelineState>, msg: ProveCommitMessage);
fn submit_windowed_post(&self, state: Arc<PipelineState>, msg: SubmitWindowedPoStMessage);
fn prove_commit(
&self,
state: Arc<PipelineState>,
msg: ProveCommitMessage,
token: CancellationToken,
);
fn submit_windowed_post(
&self,
state: Arc<PipelineState>,
msg: SubmitWindowedPoStMessage,
token: CancellationToken,
);
fn schedule_posts(&self, state: Arc<PipelineState>);
}

Expand Down Expand Up @@ -174,11 +186,15 @@ impl PipelineOperations for TaskTracker {
});
}

fn prove_commit(&self, state: Arc<PipelineState>, msg: ProveCommitMessage) {
fn prove_commit(
&self,
state: Arc<PipelineState>,
msg: ProveCommitMessage,
token: CancellationToken,
) {
let ProveCommitMessage { sector_number } = msg;
self.spawn(async move {
// ProveCommit is not cancellation safe.
match prove_commit(state, sector_number).await {
match prove_commit(state, sector_number, token).await {
Ok(_) => {
tracing::info!(
"ProveCommit for sector {} finished successfully.",
Expand All @@ -192,19 +208,31 @@ impl PipelineOperations for TaskTracker {
});
}

fn submit_windowed_post(&self, state: Arc<PipelineState>, msg: SubmitWindowedPoStMessage) {
fn submit_windowed_post(
&self,
state: Arc<PipelineState>,
msg: SubmitWindowedPoStMessage,
token: CancellationToken,
) {
let SubmitWindowedPoStMessage { deadline_index } = msg;
self.spawn(async move {
// SubmitWindowedPoSt is not cancellation safe.
match submit_windowed_post(state, deadline_index).await {
Ok(_) => {
tracing::info!(
"SubmitWindowedPoSt for deadline {} finished successfully.",
deadline_index
)
}
Err(err) => {
tracing::error!(%err, "SubmitWindowedPoSt failed for deadline: {}", deadline_index)
tokio::select! {
// SubmitWindowedPoSt is not cancellation safe.
res = submit_windowed_post(state, deadline_index) => {
match res {
Ok(_) => {
tracing::info!(
"SubmitWindowedPoSt for deadline {} finished successfully.",
deadline_index
)
}
Err(err) => {
tracing::error!(%err, "SubmitWindowedPoSt failed for deadline: {}", deadline_index)
}
}
},
() = token.cancelled() => {
tracing::warn!("submit_windowed_post for deadline {} has been cancelled.", deadline_index);
}
}
});
Expand Down Expand Up @@ -233,9 +261,11 @@ fn process(
match msg {
PipelineMessage::AddPiece(msg) => tracker.add_piece(state.clone(), msg, token.clone()),
PipelineMessage::PreCommit(msg) => tracker.precommit(state.clone(), msg),
PipelineMessage::ProveCommit(msg) => tracker.prove_commit(state.clone(), msg),
PipelineMessage::ProveCommit(msg) => {
tracker.prove_commit(state.clone(), msg, token.clone())
}
PipelineMessage::SubmitWindowedPoStMessage(msg) => {
tracker.submit_windowed_post(state.clone(), msg)
tracker.submit_windowed_post(state.clone(), msg, token.clone())
}
PipelineMessage::SchedulePoSts => tracker.schedule_posts(state.clone()),
}
Expand Down Expand Up @@ -446,10 +476,11 @@ async fn precommit(
Ok(())
}

#[tracing::instrument(skip(state))]
#[tracing::instrument(skip(state, token))]
async fn prove_commit(
state: Arc<PipelineState>,
sector_number: SectorNumber,
token: CancellationToken,
) -> Result<(), PipelineError> {
tracing::info!("Starting prove commit");

Expand Down Expand Up @@ -485,10 +516,16 @@ async fn prove_commit(
let prove_commit_block = sector.precommit_block + PRECOMMIT_CHALLENGE_DELAY;

tracing::info!("Wait for block {} to get randomness", prove_commit_block);
state
.xt_client
.wait_for_height(prove_commit_block, true)
.await?;
tokio::select! {
res = state.xt_client.wait_for_height(prove_commit_block, true) => {
res?;
},
() = token.cancelled() => {
tracing::warn!("Cancelled while waiting to get randomness at block {}", prove_commit_block);
return Err(PipelineError::ProvingCancelled);
}
};

let Some(digest) = state.xt_client.get_randomness(prove_commit_block).await? else {
tracing::error!("Randomness for the block not available.");
return Err(PipelineError::RandomnessNotAvailable);
Expand Down Expand Up @@ -527,7 +564,17 @@ async fn prove_commit(
)
})
};
let proofs = sealing_handle.await??;

let proofs = tokio::select! {
// Up to this point everything is retryable.
// Pipeline ends up being in an inconsistent state if we prove commit to the chain, and don't wait for it, so the sector's not persisted in the DB.
res = sealing_handle => {
res??
},
() = token.cancelled() => {
return Err(PipelineError::ProvingCancelled);
}
};

// We use sector size 2KiB only at this point, which guarantees to have 1 proof, because it has 1 partition in the config.
// That's why `prove_commit` will always generate a 1 proof.
Expand Down

0 comments on commit 48893f3

Please sign in to comment.