Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(proof_data_handler): TEE blob fetching error handling #2674

Merged
merged 11 commits into from
Aug 26, 2024
2 changes: 1 addition & 1 deletion core/bin/zksync_tee_prover/src/tee_prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ impl Task for TeeProver {
if !err.is_retriable() || retries > self.config.max_retries {
return Err(err.into());
}
retries += 1;
tracing::warn!(%err, "Failed TEE prover step function {retries}/{}, retrying in {} milliseconds.", self.config.max_retries, backoff.as_millis());
retries += 1;
backoff = std::cmp::min(
backoff.mul_f32(self.config.retry_backoff_multiplier),
self.config.max_backoff,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/lib/dal/doc/TeeProofGenerationDal.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ title: Status Diagram
---
stateDiagram-v2
[*] --> ready_to_be_proven : insert_tee_proof_generation_job
ready_to_be_proven --> picked_by_prover : get_next_batch_to_be_proven
ready_to_be_proven --> picked_by_prover : lock_batch_for_proving
picked_by_prover --> generated : save_proof_artifacts_metadata
generated --> [*]

picked_by_prover --> unpicked : unlock_batch
unpicked --> [*]
```
46 changes: 41 additions & 5 deletions core/lib/dal/src/tee_proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
use std::time::Duration;

use zksync_db_connection::{
connection::Connection, error::DalResult, instrument::Instrumented,
connection::Connection,
error::DalResult,
instrument::{InstrumentExt, Instrumented},
utils::pg_interval_from_duration,
};
use zksync_types::{tee_types::TeeType, L1BatchNumber};
Expand All @@ -18,12 +20,14 @@ pub struct TeeProofGenerationDal<'a, 'c> {
}

impl TeeProofGenerationDal<'_, '_> {
pub async fn get_next_batch_to_be_proven(
pub async fn lock_batch_for_proving(
&mut self,
tee_type: TeeType,
processing_timeout: Duration,
min_batch_number: Option<L1BatchNumber>,
) -> DalResult<Option<L1BatchNumber>> {
let processing_timeout = pg_interval_from_duration(processing_timeout);
let min_batch_number = min_batch_number.map_or(0, |num| i64::from(num.0));
let query = sqlx::query!(
r#"
UPDATE tee_proof_generation_details
Expand All @@ -48,6 +52,7 @@ impl TeeProofGenerationDal<'_, '_> {
AND proofs.prover_taken_at < NOW() - $3::INTERVAL
)
)
AND proofs.l1_batch_number >= $4
ORDER BY
l1_batch_number ASC
LIMIT
Expand All @@ -58,13 +63,16 @@ impl TeeProofGenerationDal<'_, '_> {
RETURNING
tee_proof_generation_details.l1_batch_number
"#,
&tee_type.to_string(),
tee_type.to_string(),
TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus,
&processing_timeout,
processing_timeout,
min_batch_number
);
let batch_number = Instrumented::new("get_next_batch_to_be_proven")

let batch_number = Instrumented::new("lock_batch_for_proving")
.with_arg("tee_type", &tee_type)
.with_arg("processing_timeout", &processing_timeout)
.with_arg("l1_batch_number", &min_batch_number)
.with(query)
.fetch_optional(self.storage)
.await?
Expand All @@ -73,6 +81,34 @@ impl TeeProofGenerationDal<'_, '_> {
Ok(batch_number)
}

pub async fn unlock_batch(
&mut self,
l1_batch_number: L1BatchNumber,
tee_type: TeeType,
) -> DalResult<()> {
let batch_number = i64::from(l1_batch_number.0);
sqlx::query!(
r#"
UPDATE tee_proof_generation_details
SET
status = 'unpicked',
updated_at = NOW()
WHERE
l1_batch_number = $1
AND tee_type = $2
"#,
batch_number,
tee_type.to_string()
)
.instrument("unlock_batch")
.with_arg("l1_batch_number", &batch_number)
.with_arg("tee_type", &tee_type)
.execute(self.storage)
.await?;

Ok(())
}

pub async fn save_proof_artifacts_metadata(
&mut self,
batch_number: L1BatchNumber,
Expand Down
6 changes: 6 additions & 0 deletions core/node/proof_data_handler/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ pub(crate) enum RequestProcessorError {
Dal(DalError),
}

impl From<DalError> for RequestProcessorError {
fn from(err: DalError) -> Self {
RequestProcessorError::Dal(err)
}
}

impl IntoResponse for RequestProcessorError {
fn into_response(self) -> Response {
let (status_code, message) = match self {
Expand Down
120 changes: 76 additions & 44 deletions core/node/proof_data_handler/src/tee_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ use std::sync::Arc;
use axum::{extract::Path, Json};
use zksync_config::configs::ProofDataHandlerConfig;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_object_store::ObjectStore;
use zksync_prover_interface::{
api::{
RegisterTeeAttestationRequest, RegisterTeeAttestationResponse, SubmitProofResponse,
SubmitTeeProofRequest, TeeProofGenerationDataRequest, TeeProofGenerationDataResponse,
},
inputs::TeeVerifierInput,
use zksync_object_store::{ObjectStore, ObjectStoreError};
use zksync_prover_interface::api::{
RegisterTeeAttestationRequest, RegisterTeeAttestationResponse, SubmitProofResponse,
SubmitTeeProofRequest, TeeProofGenerationDataRequest, TeeProofGenerationDataResponse,
};
use zksync_types::L1BatchNumber;
use zksync_types::{tee_types::TeeType, L1BatchNumber};

use crate::errors::RequestProcessorError;

Expand Down Expand Up @@ -41,32 +38,77 @@ impl TeeRequestProcessor {
) -> Result<Json<TeeProofGenerationDataResponse>, RequestProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let mut connection = self
.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?;

let l1_batch_number_result = connection
.tee_proof_generation_dal()
.get_next_batch_to_be_proven(request.tee_type, self.config.proof_generation_timeout())
.await
.map_err(RequestProcessorError::Dal)?;

let l1_batch_number = match l1_batch_number_result {
Some(number) => number,
None => return Ok(Json(TeeProofGenerationDataResponse(None))),
let mut min_batch_number: Option<L1BatchNumber> = None;
let mut missing_range: Option<(L1BatchNumber, L1BatchNumber)> = None;
pbeza marked this conversation as resolved.
Show resolved Hide resolved

let result = loop {
let l1_batch_number = match self
.lock_batch_for_proving(request.tee_type, min_batch_number)
.await?
{
Some(number) => number,
None => break Ok(Json(TeeProofGenerationDataResponse(None))),
};

match self.blob_store.get(l1_batch_number).await {
Ok(input) => break Ok(Json(TeeProofGenerationDataResponse(Some(Box::new(input))))),
Err(ObjectStoreError::KeyNotFound(_)) => {
missing_range = match missing_range {
Some((start, _)) => Some((start, l1_batch_number)),
None => Some((l1_batch_number, l1_batch_number)),
};
self.unlock_batch(l1_batch_number, request.tee_type).await?;
min_batch_number = Some(min_batch_number.unwrap_or(l1_batch_number) + 1);
}
Err(err) => {
self.unlock_batch(l1_batch_number, request.tee_type).await?;
break Err(RequestProcessorError::ObjectStore(err));
}
}
};

let tee_verifier_input: TeeVerifierInput = self
.blob_store
.get(l1_batch_number)
.await
.map_err(RequestProcessorError::ObjectStore)?;
if let Some((start, end)) = missing_range {
tracing::warn!(
"Blobs for batch numbers {} to {} not found in the object store. Marked as unpicked.",
start,
end
);
}

result
}

let response = TeeProofGenerationDataResponse(Some(Box::new(tee_verifier_input)));
async fn lock_batch_for_proving(
&self,
tee_type: TeeType,
min_batch_number: Option<L1BatchNumber>,
) -> Result<Option<L1BatchNumber>, RequestProcessorError> {
let result = self
.pool
.connection()
.await?
.tee_proof_generation_dal()
.lock_batch_for_proving(
tee_type,
self.config.proof_generation_timeout(),
min_batch_number,
)
.await?;
Ok(result)
}

Ok(Json(response))
async fn unlock_batch(
&self,
l1_batch_number: L1BatchNumber,
tee_type: TeeType,
) -> Result<(), RequestProcessorError> {
self.pool
.connection()
.await?
.tee_proof_generation_dal()
.unlock_batch(l1_batch_number, tee_type)
.await?;
Ok(())
}

pub(crate) async fn submit_proof(
Expand All @@ -75,11 +117,7 @@ impl TeeRequestProcessor {
Json(proof): Json<SubmitTeeProofRequest>,
) -> Result<Json<SubmitProofResponse>, RequestProcessorError> {
let l1_batch_number = L1BatchNumber(l1_batch_number);
let mut connection = self
.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?;
let mut connection = self.pool.connection().await?;
let mut dal = connection.tee_proof_generation_dal();

tracing::info!(
Expand All @@ -94,8 +132,7 @@ impl TeeRequestProcessor {
&proof.0.signature,
&proof.0.proof,
)
.await
.map_err(RequestProcessorError::Dal)?;
.await?;

Ok(Json(SubmitProofResponse::Success))
}
Expand All @@ -106,16 +143,11 @@ impl TeeRequestProcessor {
) -> Result<Json<RegisterTeeAttestationResponse>, RequestProcessorError> {
tracing::info!("Received attestation: {:?}", payload);

let mut connection = self
.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?;
let mut connection = self.pool.connection().await?;
let mut dal = connection.tee_proof_generation_dal();

dal.save_attestation(&payload.pubkey, &payload.attestation)
.await
.map_err(RequestProcessorError::Dal)?;
.await?;

Ok(Json(RegisterTeeAttestationResponse::Success))
}
Expand Down
Loading