Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(proof_data_handler): Unlock jobs on transient errors #2486

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 50 additions & 7 deletions core/lib/dal/src/proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ enum ProofGenerationJobStatus {
}

impl ProofGenerationDal<'_, '_> {
pub async fn get_next_block_to_be_proven(
/// Chooses the batch number so that it has all the necessary data to generate the proof
/// and is not already picked.
///
/// Marks the batch as picked by the prover, preventing it from being picked twice.
///
/// The batch can be unpicked either via a corresponding DAL method, or it is considered
/// not picked after `processing_timeout` passes.
pub async fn lock_batch_for_proving(
&mut self,
processing_timeout: Duration,
) -> DalResult<Option<L1BatchNumber>> {
Expand Down Expand Up @@ -72,14 +79,38 @@ impl ProofGenerationDal<'_, '_> {
"#,
&processing_timeout,
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.instrument("lock_batch_for_proving")
.with_arg("processing_timeout", &processing_timeout)
.fetch_optional(self.storage)
.await?
.map(|row| L1BatchNumber(row.l1_batch_number as u32));

Ok(result)
}

/// Marks a previously locked batch as 'unpicked', allowing it to be picked without having
/// to wait for the processing timeout.
pub async fn unlock_batch(&mut self, l1_batch_number: L1BatchNumber) -> DalResult<()> {
let batch_number = i64::from(l1_batch_number.0);
sqlx::query!(
r#"
UPDATE proof_generation_details
SET
status = 'unpicked',
updated_at = NOW()
WHERE
l1_batch_number = $1
"#,
batch_number,
)
.instrument("unlock_batch")
.with_arg("l1_batch_number", &l1_batch_number)
.execute(self.storage)
.await?;

Ok(())
}

pub async fn save_proof_artifacts_metadata(
&mut self,
batch_number: L1BatchNumber,
Expand Down Expand Up @@ -388,7 +419,7 @@ mod tests {

let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::MAX)
.lock_batch_for_proving(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));
Expand All @@ -399,10 +430,22 @@ mod tests {
.unwrap();
assert_eq!(unpicked_l1_batch, None);

// Check that we can unlock the batch and then pick it again.
conn.proof_generation_dal()
.unlock_batch(L1BatchNumber(1))
.await
.unwrap();
let picked_l1_batch = conn
.proof_generation_dal()
.lock_batch_for_proving(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));

// Check that with small enough processing timeout, the L1 batch can be picked again
let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::ZERO)
.lock_batch_for_proving(Duration::ZERO)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));
Expand All @@ -414,7 +457,7 @@ mod tests {

let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::MAX)
.lock_batch_for_proving(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, None);
Expand Down
104 changes: 67 additions & 37 deletions core/node/proof_data_handler/src/request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,64 @@ impl RequestProcessor {
) -> Result<Json<ProofGenerationDataResponse>, RequestProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let l1_batch_number_result = self
.pool
let l1_batch_number = match self.lock_batch_for_proving().await? {
Some(number) => number,
None => return Ok(Json(ProofGenerationDataResponse::Success(None))), // no batches pending to be proven
};

let proof_generation_data = self
.proof_generation_data_for_existing_batch(l1_batch_number)
.await;

// If we weren't able to fetch all the data, we should unlock the batch before returning.
match proof_generation_data {
Ok(data) => Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
data,
))))),
Err(err) => {
self.unlock_batch(l1_batch_number).await?;
Err(err)
}
}
}

/// Will choose a batch that has all the required data and isn't picked up by any prover yet.
async fn lock_batch_for_proving(&self) -> Result<Option<L1BatchNumber>, RequestProcessorError> {
self.pool
.connection()
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?
.proof_generation_dal()
.get_next_block_to_be_proven(self.config.proof_generation_timeout())
.lock_batch_for_proving(self.config.proof_generation_timeout())
.await
.map_err(RequestProcessorError::Dal)?;
.map_err(RequestProcessorError::Dal)
}

let l1_batch_number = match l1_batch_number_result {
Some(number) => number,
None => return Ok(Json(ProofGenerationDataResponse::Success(None))), // no batches pending to be proven
};
/// Marks the batch as 'unpicked', allowing it to be picked up by another prover.
async fn unlock_batch(
&self,
l1_batch_number: L1BatchNumber,
) -> Result<(), RequestProcessorError> {
self.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?
.proof_generation_dal()
.unlock_batch(l1_batch_number)
.await
.map_err(RequestProcessorError::Dal)
}

/// Will fetch all the required data for the batch and return it.
///
/// ## Panics
///
/// Expects all the data to be present in the database.
/// Will panic if any of the required data is missing.
async fn proof_generation_data_for_existing_batch(
&self,
l1_batch_number: L1BatchNumber,
) -> Result<ProofGenerationData, RequestProcessorError> {
let vm_run_data: VMRunWitnessInputData = self
.blob_store
.get(l1_batch_number)
Expand All @@ -77,52 +120,43 @@ impl RequestProcessor {
.await
.map_err(RequestProcessorError::ObjectStore)?;

let previous_batch_metadata = self
// Acquire connection after interacting with GCP, to avoid holding the connection for too long.
let mut conn = self
.pool
.connection()
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?;

let previous_batch_metadata = conn
.blocks_dal()
.get_l1_batch_metadata(L1BatchNumber(l1_batch_number.checked_sub(1).unwrap()))
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?
.expect("No metadata for previous batch");

let header = self
.pool
.connection()
.await
.unwrap()
let header = conn
.blocks_dal()
.get_l1_batch_header(l1_batch_number)
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?
.unwrap_or_else(|| panic!("Missing header for {}", l1_batch_number));

let minor_version = header.protocol_version.unwrap();
let protocol_version = self
.pool
.connection()
.await
.unwrap()
let protocol_version = conn
.protocol_versions_dal()
.get_protocol_version_with_latest_patch(minor_version)
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?
.unwrap_or_else(|| {
panic!("Missing l1 verifier info for protocol version {minor_version}")
});

let batch_header = self
.pool
.connection()
.await
.unwrap()
let batch_header = conn
.blocks_dal()
.get_l1_batch_header(l1_batch_number)
.await
.unwrap()
.unwrap();
.map_err(RequestProcessorError::Dal)?
.unwrap_or_else(|| panic!("Missing header for {}", l1_batch_number));

let eip_4844_blobs = match self.commitment_mode {
L1BatchCommitmentMode::Validium => Eip4844Blobs::empty(),
Expand All @@ -149,16 +183,12 @@ impl RequestProcessor {

METRICS.observe_blob_sizes(&blob);

let proof_gen_data = ProofGenerationData {
Ok(ProofGenerationData {
l1_batch_number,
witness_input_data: blob,
protocol_version: protocol_version.version,
l1_verifier_config: protocol_version.l1_verifier_config,
};

Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
proof_gen_data,
)))))
})
}

pub(crate) async fn submit_proof(
Expand Down
Loading