From 52e0dcc986445cb7782ffc71626b71a15f1a6508 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:57:44 +0300 Subject: [PATCH 01/10] add middleware for metrics --- .../external_proof_integration_api/src/lib.rs | 3 +- .../src/metrics.rs | 49 +++++++++++-------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/core/node/external_proof_integration_api/src/lib.rs b/core/node/external_proof_integration_api/src/lib.rs index b1ef33b44c10..3a266dd18d96 100644 --- a/core/node/external_proof_integration_api/src/lib.rs +++ b/core/node/external_proof_integration_api/src/lib.rs @@ -5,7 +5,7 @@ mod processor; use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; -use axum::{extract::Path, routing::post, Json, Router}; +use axum::{extract::Path, middleware, routing::post, Json, Router}; use tokio::sync::watch; use zksync_basic_types::commitment::L1BatchCommitmentMode; use zksync_config::configs::external_proof_integration_api::ExternalProofIntegrationApiConfig; @@ -71,4 +71,5 @@ async fn create_router( }, ), ) + .layer(middleware::from_fn(metrics::call_outcome_tracker)) } diff --git a/core/node/external_proof_integration_api/src/metrics.rs b/core/node/external_proof_integration_api/src/metrics.rs index 70815f542a05..d3ef4184ed6a 100644 --- a/core/node/external_proof_integration_api/src/metrics.rs +++ b/core/node/external_proof_integration_api/src/metrics.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use axum::{extract::Request, middleware::Next, response::Response}; use tokio::time::Instant; use vise::{EncodeLabelSet, EncodeLabelValue, Histogram, LabeledFamily, Metrics}; @@ -16,6 +17,7 @@ pub(crate) enum Method { GetLatestProofGenerationData, GetSpecificProofGenerationData, VerifyProof, + Unknown, } #[derive(Debug, Metrics)] @@ -25,30 +27,37 @@ pub(crate) struct ProofIntegrationApiMetrics { pub call_latency: LabeledFamily<(Method, CallOutcome), Histogram, 2>, } -pub(crate) struct MethodCallGuard { - method_type: Method, - outcome: CallOutcome, - started_at: Instant, -} +pub(crate) async fn call_outcome_tracker(request: Request, next: Next) -> Response { + let start = Instant::now(); + let path = request.uri().path(); -impl MethodCallGuard { - pub(crate) fn new(method_type: Method) -> Self { - MethodCallGuard { - method_type, - outcome: CallOutcome::Failure, - started_at: Instant::now(), + let method = if path.starts_with("/proof_generation_data") { + if let Some(char) = path.get(22..23) { + if char == "/" && path.get(23..).is_some() { + Method::GetSpecificProofGenerationData + } else { + Method::GetLatestProofGenerationData + } + } else { + Method::GetLatestProofGenerationData } - } + } else if path.starts_with("/verify_proof/") { + Method::VerifyProof + } else { + Method::Unknown + }; - pub(crate) fn mark_successful(&mut self) { - self.outcome = CallOutcome::Success; - } -} + let response = next.run(request).await; + + let outcome = if response.status().is_success() { + CallOutcome::Success + } else { + CallOutcome::Failure + }; + + METRICS.call_latency[&(method, outcome)].observe(start.elapsed()); -impl Drop for MethodCallGuard { - fn drop(&mut self) { - METRICS.call_latency[&(self.method_type, self.outcome)].observe(self.started_at.elapsed()); - } + response } #[vise::register] From 2a29a22b041ea2775cbb7ba0ce3bdcd78497a385 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:01:45 +0300 Subject: [PATCH 02/10] fix build --- .../src/processor.rs | 24 ++++--------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index e9e56df4a068..032bd91eeda6 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -17,10 +17,7 @@ use zksync_prover_interface::{ outputs::L1BatchProofForL1, }; -use crate::{ - error::ProcessorError, - metrics::{Method, MethodCallGuard}, -}; +use crate::{error::ProcessorError, metrics::Method}; #[derive(Clone)] pub(crate) struct Processor { @@ -47,8 +44,6 @@ impl Processor { Path(l1_batch_number): Path, Json(payload): Json, ) -> Result<(), ProcessorError> { - let mut guard = MethodCallGuard::new(Method::VerifyProof); - let l1_batch_number = L1BatchNumber(l1_batch_number); tracing::info!( "Received request to verify proof for batch: {:?}", @@ -67,8 +62,6 @@ impl Processor { return Err(ProcessorError::InvalidProof); } - guard.mark_successful(); - Ok(()) } @@ -79,11 +72,6 @@ impl Processor { ) -> Result, ProcessorError> { tracing::info!("Received request for proof generation data: {:?}", request); - let mut guard = match request.0 .0 { - Some(_) => MethodCallGuard::new(Method::GetSpecificProofGenerationData), - None => MethodCallGuard::new(Method::GetLatestProofGenerationData), - }; - let latest_available_batch = self .pool .connection() @@ -112,13 +100,9 @@ impl Processor { .await; match proof_generation_data { - Ok(data) => { - guard.mark_successful(); - - Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new( - data, - ))))) - } + Ok(data) => Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new( + data, + ))))), Err(err) => Err(err), } } From ed1d367e1e0d019696d908b8ea1518eae6bc693a Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:33:14 +0300 Subject: [PATCH 03/10] fix lint --- core/node/external_proof_integration_api/src/processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index 032bd91eeda6..d727ba57c0a4 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -17,7 +17,7 @@ use zksync_prover_interface::{ outputs::L1BatchProofForL1, }; -use crate::{error::ProcessorError, metrics::Method}; +use crate::error::ProcessorError; #[derive(Clone)] pub(crate) struct Processor { From c51822ad42fb94570b8265e2b83fec18c4afbbc2 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 27 Aug 2024 12:30:34 +0300 Subject: [PATCH 04/10] fix lint --- zk_toolbox/crates/zk_supervisor/src/commands/test/prover.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zk_toolbox/crates/zk_supervisor/src/commands/test/prover.rs b/zk_toolbox/crates/zk_supervisor/src/commands/test/prover.rs index 3d8131a180c3..4e9c4fc25283 100644 --- a/zk_toolbox/crates/zk_supervisor/src/commands/test/prover.rs +++ b/zk_toolbox/crates/zk_supervisor/src/commands/test/prover.rs @@ -6,7 +6,7 @@ use crate::messages::MSG_PROVER_TEST_SUCCESS; pub fn run(shell: &Shell) -> anyhow::Result<()> { let ecosystem = EcosystemConfig::from_file(shell)?; - let _dir_guard = shell.push_dir(&ecosystem.link_to_code.join("prover")); + let _dir_guard = shell.push_dir(ecosystem.link_to_code.join("prover")); Cmd::new(cmd!(shell, "cargo test --release --workspace --locked")) .with_force_run() From 27d8cfe6f41c138b2e6c6b92fb01f6e113461813 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 27 Aug 2024 14:14:05 +0300 Subject: [PATCH 05/10] add uploading binaries with verification --- Cargo.lock | 18 +++++++++++++ Cargo.toml | 2 +- .../external_proof_integration_api/src/lib.rs | 13 +++++++--- .../src/processor.rs | 26 ++++++++++++++++--- 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f60faf9fdf96..3b6903bb0e65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -350,6 +350,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -3777,6 +3778,23 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.1.0", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "multimap" version = "0.8.3" diff --git a/Cargo.toml b/Cargo.toml index d4855a34b9de..101d2559419a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ categories = ["cryptography"] anyhow = "1" assert_matches = "1.5" async-trait = "0.1" -axum = "0.7.5" +axum = { version = "0.7.5", features = ["multipart"] } backon = "0.4.4" bigdecimal = "0.4.5" bincode = "1" diff --git a/core/node/external_proof_integration_api/src/lib.rs b/core/node/external_proof_integration_api/src/lib.rs index 3a266dd18d96..bf04dba6f574 100644 --- a/core/node/external_proof_integration_api/src/lib.rs +++ b/core/node/external_proof_integration_api/src/lib.rs @@ -5,13 +5,18 @@ mod processor; use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; -use axum::{extract::Path, middleware, routing::post, Json, Router}; +use axum::{ + extract::{Multipart, Path}, + middleware, + routing::post, + Json, Router, +}; use tokio::sync::watch; use zksync_basic_types::commitment::L1BatchCommitmentMode; use zksync_config::configs::external_proof_integration_api::ExternalProofIntegrationApiConfig; use zksync_dal::{ConnectionPool, Core}; use zksync_object_store::ObjectStore; -use zksync_prover_interface::api::{OptionalProofGenerationDataRequest, VerifyProofRequest}; +use zksync_prover_interface::api::OptionalProofGenerationDataRequest; use crate::processor::Processor; @@ -64,9 +69,9 @@ async fn create_router( .route( "/verify_proof/:l1_batch_number", post( - move |l1_batch_number: Path, payload: Json| async move { + move |l1_batch_number: Path, multipart: Multipart| async move { verify_proof_processor - .verify_proof(l1_batch_number, payload) + .verify_proof(l1_batch_number, multipart) .await }, ), diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index d727ba57c0a4..41c1bc96e3f1 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use axum::{extract::Path, Json}; +use axum::{ + extract::{Multipart, Path}, + Json, +}; use zksync_basic_types::{ basic_fri_types::Eip4844Blobs, commitment::L1BatchCommitmentMode, L1BatchNumber, }; @@ -42,7 +45,7 @@ impl Processor { pub(crate) async fn verify_proof( &self, Path(l1_batch_number): Path, - Json(payload): Json, + mut multipart: Multipart, ) -> Result<(), ProcessorError> { let l1_batch_number = L1BatchNumber(l1_batch_number); tracing::info!( @@ -50,7 +53,24 @@ impl Processor { l1_batch_number ); - let serialized_proof = bincode::serialize(&payload.0)?; + let mut serialized_proof = vec![]; + + while let Some(mut data) = multipart.next_field().await.map_err(|err| { + tracing::error!("Failed to read field: {:?}", err); + ProcessorError::InvalidProof + })? { + while let Some(chunk) = data.chunk().await.map_err(|err| { + tracing::error!("Failed to read chunk: {:?}", err); + ProcessorError::InvalidProof + })? { + serialized_proof.extend_from_slice(&chunk); + } + } + + let payload: VerifyProofRequest = bincode::deserialize(&serialized_proof)?; + + tracing::info!("Received proof is size: {}", serialized_proof.len()); + let expected_proof = bincode::serialize( &self .blob_store From 596839d51729071bdf9bc5e49f2963f957e1e93d Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 27 Aug 2024 17:34:27 +0300 Subject: [PATCH 06/10] make API return binaries --- core/lib/prover_interface/src/api.rs | 3 - .../external_proof_integration_api/src/lib.rs | 22 ++-- .../src/processor.rs | 104 +++++++++++++----- prover/docs/05_proving_batch.md | 16 ++- 4 files changed, 99 insertions(+), 46 deletions(-) diff --git a/core/lib/prover_interface/src/api.rs b/core/lib/prover_interface/src/api.rs index e4fe566618b8..bc95345bbbaa 100644 --- a/core/lib/prover_interface/src/api.rs +++ b/core/lib/prover_interface/src/api.rs @@ -65,9 +65,6 @@ pub enum SubmitProofRequest { SkippedProofGeneration, } -#[derive(Debug, Serialize, Deserialize)] -pub struct OptionalProofGenerationDataRequest(pub Option); - #[derive(Debug, Serialize, Deserialize)] pub struct VerifyProofRequest(pub Box); diff --git a/core/node/external_proof_integration_api/src/lib.rs b/core/node/external_proof_integration_api/src/lib.rs index bf04dba6f574..f3d77454e5a9 100644 --- a/core/node/external_proof_integration_api/src/lib.rs +++ b/core/node/external_proof_integration_api/src/lib.rs @@ -8,15 +8,14 @@ use anyhow::Context; use axum::{ extract::{Multipart, Path}, middleware, - routing::post, - Json, Router, + routing::{get, post}, + Router, }; use tokio::sync::watch; use zksync_basic_types::commitment::L1BatchCommitmentMode; use zksync_config::configs::external_proof_integration_api::ExternalProofIntegrationApiConfig; use zksync_dal::{ConnectionPool, Core}; use zksync_object_store::ObjectStore; -use zksync_prover_interface::api::OptionalProofGenerationDataRequest; use crate::processor::Processor; @@ -55,16 +54,19 @@ async fn create_router( let mut processor = Processor::new(blob_store.clone(), connection_pool.clone(), commitment_mode); let verify_proof_processor = processor.clone(); + let specific_proof_processor = processor.clone(); Router::new() .route( "/proof_generation_data", - post( - // we use post method because the returned data is not idempotent, - // i.e we return different result on each call. - move |payload: Json| async move { - processor.get_proof_generation_data(payload).await - }, - ), + get(move || async move { processor.get_proof_generation_data().await }), + ) + .route( + "/proof_generation_data/:l1_batch_number", + get(move |l1_batch_number: Path| async move { + specific_proof_processor + .proof_generation_data_for_existing_batch(l1_batch_number) + .await + }), ) .route( "/verify_proof/:l1_batch_number", diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index 41c1bc96e3f1..f392b7c9e124 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use axum::{ extract::{Multipart, Path}, - Json, + http::header, + response::IntoResponse, }; use zksync_basic_types::{ basic_fri_types::Eip4844Blobs, commitment::L1BatchCommitmentMode, L1BatchNumber, @@ -10,10 +11,7 @@ use zksync_basic_types::{ use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_object_store::{bincode, ObjectStore}; use zksync_prover_interface::{ - api::{ - OptionalProofGenerationDataRequest, ProofGenerationData, ProofGenerationDataResponse, - VerifyProofRequest, - }, + api::{ProofGenerationData, VerifyProofRequest}, inputs::{ L1BatchMetadataHashes, VMRunWitnessInputData, WitnessInputData, WitnessInputMerklePaths, }, @@ -67,10 +65,10 @@ impl Processor { } } - let payload: VerifyProofRequest = bincode::deserialize(&serialized_proof)?; - tracing::info!("Received proof is size: {}", serialized_proof.len()); + let payload: VerifyProofRequest = bincode::deserialize(&serialized_proof)?; + let expected_proof = bincode::serialize( &self .blob_store @@ -86,11 +84,8 @@ impl Processor { } #[tracing::instrument(skip_all)] - pub(crate) async fn get_proof_generation_data( - &mut self, - request: Json, - ) -> Result, ProcessorError> { - tracing::info!("Received request for proof generation data: {:?}", request); + pub(crate) async fn get_proof_generation_data(&mut self) -> impl IntoResponse { + tracing::info!("Received request for proof generation data"); let latest_available_batch = self .pool @@ -101,34 +96,87 @@ impl Processor { .get_latest_proven_batch() .await?; - let l1_batch_number = if let Some(l1_batch_number) = request.0 .0 { - if l1_batch_number > latest_available_batch { - tracing::error!( - "Requested batch is not available: {:?}, latest available batch is {:?}", - l1_batch_number, - latest_available_batch - ); - return Err(ProcessorError::BatchNotReady(l1_batch_number)); + let proof_generation_data = self + .proof_generation_data_for_existing_batch_internal(latest_available_batch) + .await; + + match proof_generation_data { + Ok(data) => { + let data = bincode::serialize(&data)?; + + let headers = [ + (header::CONTENT_TYPE, "application/octet-stream"), + ( + header::CONTENT_DISPOSITION, + &format!( + "attachment; filename=\"witness_inputs_{}.bin\"", + latest_available_batch.0 + ), + ), + ]; + + Ok((headers, data).into_response()) } + Err(err) => Err(err), + } + } + + #[tracing::instrument(skip(self))] + pub(crate) async fn proof_generation_data_for_existing_batch( + &self, + Path(l1_batch_number): Path, + ) -> impl IntoResponse { + let l1_batch_number = L1BatchNumber(l1_batch_number); + tracing::info!( + "Received request for proof generation data for batch: {:?}", l1_batch_number - } else { - latest_available_batch - }; + ); + + let latest_available_batch = self + .pool + .connection() + .await + .unwrap() + .proof_generation_dal() + .get_latest_proven_batch() + .await?; + + if l1_batch_number > latest_available_batch { + tracing::error!( + "Requested batch is not available: {:?}, latest available batch is {:?}", + l1_batch_number, + latest_available_batch + ); + return Err(ProcessorError::BatchNotReady(l1_batch_number)); + } let proof_generation_data = self - .proof_generation_data_for_existing_batch(l1_batch_number) + .proof_generation_data_for_existing_batch_internal(latest_available_batch) .await; match proof_generation_data { - Ok(data) => Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new( - data, - ))))), + Ok(data) => { + let data = bincode::serialize(&data)?; + + let headers = [ + (header::CONTENT_TYPE, "text/bin; charset=utf-8"), + ( + header::CONTENT_DISPOSITION, + &format!( + "attachment; filename=\"witness_inputs_{}.bin\"", + latest_available_batch.0 + ), + ), + ]; + + Ok((headers, data).into_response()) + } Err(err) => Err(err), } } #[tracing::instrument(skip(self))] - async fn proof_generation_data_for_existing_batch( + async fn proof_generation_data_for_existing_batch_internal( &self, l1_batch_number: L1BatchNumber, ) -> Result { diff --git a/prover/docs/05_proving_batch.md b/prover/docs/05_proving_batch.md index 441a8225f866..76b454c897bb 100644 --- a/prover/docs/05_proving_batch.md +++ b/prover/docs/05_proving_batch.md @@ -72,13 +72,13 @@ input file, called `witness_inputs_.bin` generated by different core comp batch, that was already proven. Example: ```shell - curl -H "Content-Type: application/json" -X POST {address}/proof_generation_data -d 'null' + wget --content-disposition {address}/proof_generation_data ``` or ```shell - curl -H "Content-Type: application/json" -X POST {address}/proof_generation_data -d '1000' + wget --content-disposition {address}/proof_generation_data/{l1_batch_number} ``` ### Preparing database @@ -140,6 +140,12 @@ And you are good to go! The prover subsystem will prove the batch and you can ch Now, assuming the proof is already generated, you can verify using `ExternalProofIntegrationAPI`. Usually proof is stored in GCS bucket(for which you can use the same steps as for getting the witness inputs data [here](#getting-data-needed-for-proving), but locally you can find it in `/artifacts/proofs_fri` directory). Now, simply -send the data to the endpoint `{address}/verify_batch/{batch_number}`. Note, that you need to pass the generated proof -as serialized JSON data when calling the endpoint. API will respond with status 200 if the proof is valid and with the -error message otherwise. +send the data to the endpoint `{address}/verify_batch/{batch_number}`. + +Example: + +```shell +curl -v -F upload=@{path_to_proof_binary} {address_of_API}/verify_proof/{l1_batch_number} +``` + +API will respond with status 200 if the proof is valid and with the error message otherwise. From 03e1f14daa44803bfa7765c2fd616722c68d2244 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Wed, 28 Aug 2024 13:25:12 +0300 Subject: [PATCH 07/10] address comments --- Cargo.toml | 2 +- .../external_proof_integration_api/Cargo.toml | 2 +- .../external_proof_integration_api/src/lib.rs | 37 ++++++-- .../src/metrics.rs | 36 -------- .../src/middleware.rs | 21 +++++ .../src/processor.rs | 86 +++++++++---------- prover/docs/05_proving_batch.md | 2 +- 7 files changed, 95 insertions(+), 91 deletions(-) create mode 100644 core/node/external_proof_integration_api/src/middleware.rs diff --git a/Cargo.toml b/Cargo.toml index 101d2559419a..d4855a34b9de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,7 @@ categories = ["cryptography"] anyhow = "1" assert_matches = "1.5" async-trait = "0.1" -axum = { version = "0.7.5", features = ["multipart"] } +axum = "0.7.5" backon = "0.4.4" bigdecimal = "0.4.5" bincode = "1" diff --git a/core/node/external_proof_integration_api/Cargo.toml b/core/node/external_proof_integration_api/Cargo.toml index 2e8176cd8832..362c315164cb 100644 --- a/core/node/external_proof_integration_api/Cargo.toml +++ b/core/node/external_proof_integration_api/Cargo.toml @@ -11,7 +11,7 @@ keywords.workspace = true categories.workspace = true [dependencies] -axum.workspace = true +axum = { workspace = true, features = ["multipart"] } tracing.workspace = true zksync_prover_interface.workspace = true zksync_basic_types.workspace = true diff --git a/core/node/external_proof_integration_api/src/lib.rs b/core/node/external_proof_integration_api/src/lib.rs index f3d77454e5a9..c81173b4ba8f 100644 --- a/core/node/external_proof_integration_api/src/lib.rs +++ b/core/node/external_proof_integration_api/src/lib.rs @@ -1,13 +1,14 @@ mod error; mod metrics; +mod middleware; mod processor; use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; use axum::{ - extract::{Multipart, Path}, - middleware, + extract::{Multipart, Path, Request}, + middleware::Next, routing::{get, post}, Router, }; @@ -17,7 +18,11 @@ use zksync_config::configs::external_proof_integration_api::ExternalProofIntegra use zksync_dal::{ConnectionPool, Core}; use zksync_object_store::ObjectStore; -use crate::processor::Processor; +use crate::{ + metrics::{CallOutcome, Method}, + middleware::MetricsMiddleware, + processor::Processor, +}; pub async fn run_server( config: ExternalProofIntegrationApiConfig, @@ -27,7 +32,7 @@ pub async fn run_server( mut stop_receiver: watch::Receiver, ) -> anyhow::Result<()> { let bind_address = SocketAddr::from(([0, 0, 0, 0], config.http_port)); - tracing::debug!("Starting external prover API server on {bind_address}"); + tracing::info!("Starting external prover API server on {bind_address}"); let app = create_router(blob_store, connection_pool, commitment_mode).await; let listener = tokio::net::TcpListener::bind(bind_address) @@ -55,10 +60,25 @@ async fn create_router( Processor::new(blob_store.clone(), connection_pool.clone(), commitment_mode); let verify_proof_processor = processor.clone(); let specific_proof_processor = processor.clone(); + + let middleware_factory = |method: Method| { + axum::middleware::from_fn(move |req: Request, next: Next| async move { + let middleware = MetricsMiddleware::new(method); + let response = next.run(req).await; + let outcome = match response.status().is_success() { + true => CallOutcome::Success, + false => CallOutcome::Failure, + }; + middleware.observe(outcome); + response + }) + }; + Router::new() .route( "/proof_generation_data", - get(move || async move { processor.get_proof_generation_data().await }), + get(move || async move { processor.get_proof_generation_data().await }) + .layer(middleware_factory(Method::GetLatestProofGenerationData)), ) .route( "/proof_generation_data/:l1_batch_number", @@ -66,7 +86,8 @@ async fn create_router( specific_proof_processor .proof_generation_data_for_existing_batch(l1_batch_number) .await - }), + }) + .layer(middleware_factory(Method::GetSpecificProofGenerationData)), ) .route( "/verify_proof/:l1_batch_number", @@ -76,7 +97,7 @@ async fn create_router( .verify_proof(l1_batch_number, multipart) .await }, - ), + ) + .layer(middleware_factory(Method::VerifyProof)), ) - .layer(middleware::from_fn(metrics::call_outcome_tracker)) } diff --git a/core/node/external_proof_integration_api/src/metrics.rs b/core/node/external_proof_integration_api/src/metrics.rs index d3ef4184ed6a..f43b49b7b1c0 100644 --- a/core/node/external_proof_integration_api/src/metrics.rs +++ b/core/node/external_proof_integration_api/src/metrics.rs @@ -1,7 +1,5 @@ use std::time::Duration; -use axum::{extract::Request, middleware::Next, response::Response}; -use tokio::time::Instant; use vise::{EncodeLabelSet, EncodeLabelValue, Histogram, LabeledFamily, Metrics}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] @@ -17,7 +15,6 @@ pub(crate) enum Method { GetLatestProofGenerationData, GetSpecificProofGenerationData, VerifyProof, - Unknown, } #[derive(Debug, Metrics)] @@ -27,38 +24,5 @@ pub(crate) struct ProofIntegrationApiMetrics { pub call_latency: LabeledFamily<(Method, CallOutcome), Histogram, 2>, } -pub(crate) async fn call_outcome_tracker(request: Request, next: Next) -> Response { - let start = Instant::now(); - let path = request.uri().path(); - - let method = if path.starts_with("/proof_generation_data") { - if let Some(char) = path.get(22..23) { - if char == "/" && path.get(23..).is_some() { - Method::GetSpecificProofGenerationData - } else { - Method::GetLatestProofGenerationData - } - } else { - Method::GetLatestProofGenerationData - } - } else if path.starts_with("/verify_proof/") { - Method::VerifyProof - } else { - Method::Unknown - }; - - let response = next.run(request).await; - - let outcome = if response.status().is_success() { - CallOutcome::Success - } else { - CallOutcome::Failure - }; - - METRICS.call_latency[&(method, outcome)].observe(start.elapsed()); - - response -} - #[vise::register] pub(crate) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/external_proof_integration_api/src/middleware.rs b/core/node/external_proof_integration_api/src/middleware.rs new file mode 100644 index 000000000000..9b1c8395cf41 --- /dev/null +++ b/core/node/external_proof_integration_api/src/middleware.rs @@ -0,0 +1,21 @@ +use tokio::time::Instant; + +use crate::metrics::{CallOutcome, Method, METRICS}; + +pub(crate) struct MetricsMiddleware { + method: Method, + started_at: Instant, +} + +impl MetricsMiddleware { + pub fn new(method: Method) -> MetricsMiddleware { + MetricsMiddleware { + method, + started_at: Instant::now(), + } + } + + pub fn observe(&self, outcome: CallOutcome) { + METRICS.call_latency[&(self.method, outcome)].observe(self.started_at.elapsed()); + } +} diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index f392b7c9e124..d78912d8b38f 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use axum::{ extract::{Multipart, Path}, http::header, - response::IntoResponse, + response::{IntoResponse, Response}, }; use zksync_basic_types::{ basic_fri_types::Eip4844Blobs, commitment::L1BatchCommitmentMode, L1BatchNumber, @@ -20,6 +20,32 @@ use zksync_prover_interface::{ use crate::error::ProcessorError; +pub(crate) struct ProofGenerationDataResponse(ProofGenerationData); + +impl IntoResponse for ProofGenerationDataResponse { + fn into_response(self) -> Response { + let l1_batch_number = self.0.l1_batch_number; + let data = match bincode::serialize(&self.0) { + Ok(data) => data, + Err(err) => { + return ProcessorError::Serialization(err).into_response(); + } + }; + + let headers = [ + (header::CONTENT_TYPE, "application/octet-stream"), + ( + header::CONTENT_DISPOSITION, + &format!( + "attachment; filename=\"witness_inputs_{}.bin\"", + l1_batch_number.0 + ), + ), + ]; + (headers, data).into_response() + } +} + #[derive(Clone)] pub(crate) struct Processor { blob_store: Arc, @@ -53,15 +79,15 @@ impl Processor { let mut serialized_proof = vec![]; - while let Some(mut data) = multipart.next_field().await.map_err(|err| { - tracing::error!("Failed to read field: {:?}", err); - ProcessorError::InvalidProof - })? { - while let Some(chunk) = data.chunk().await.map_err(|err| { - tracing::error!("Failed to read chunk: {:?}", err); - ProcessorError::InvalidProof - })? { - serialized_proof.extend_from_slice(&chunk); + while let Some(field) = multipart + .next_field() + .await + .map_err(|_| ProcessorError::InvalidProof)? + { + if field.name() == Some("proof") + && field.content_type() == Some("application/octet-stream") + { + serialized_proof.extend_from_slice(&field.bytes().await.unwrap()); } } @@ -84,7 +110,9 @@ impl Processor { } #[tracing::instrument(skip_all)] - pub(crate) async fn get_proof_generation_data(&mut self) -> impl IntoResponse { + pub(crate) async fn get_proof_generation_data( + &mut self, + ) -> Result { tracing::info!("Received request for proof generation data"); let latest_available_batch = self @@ -101,22 +129,7 @@ impl Processor { .await; match proof_generation_data { - Ok(data) => { - let data = bincode::serialize(&data)?; - - let headers = [ - (header::CONTENT_TYPE, "application/octet-stream"), - ( - header::CONTENT_DISPOSITION, - &format!( - "attachment; filename=\"witness_inputs_{}.bin\"", - latest_available_batch.0 - ), - ), - ]; - - Ok((headers, data).into_response()) - } + Ok(data) => Ok(ProofGenerationDataResponse(data)), Err(err) => Err(err), } } @@ -125,7 +138,7 @@ impl Processor { pub(crate) async fn proof_generation_data_for_existing_batch( &self, Path(l1_batch_number): Path, - ) -> impl IntoResponse { + ) -> Result { let l1_batch_number = L1BatchNumber(l1_batch_number); tracing::info!( "Received request for proof generation data for batch: {:?}", @@ -155,22 +168,7 @@ impl Processor { .await; match proof_generation_data { - Ok(data) => { - let data = bincode::serialize(&data)?; - - let headers = [ - (header::CONTENT_TYPE, "text/bin; charset=utf-8"), - ( - header::CONTENT_DISPOSITION, - &format!( - "attachment; filename=\"witness_inputs_{}.bin\"", - latest_available_batch.0 - ), - ), - ]; - - Ok((headers, data).into_response()) - } + Ok(data) => Ok(ProofGenerationDataResponse(data)), Err(err) => Err(err), } } diff --git a/prover/docs/05_proving_batch.md b/prover/docs/05_proving_batch.md index 76b454c897bb..e09a44cb0ff7 100644 --- a/prover/docs/05_proving_batch.md +++ b/prover/docs/05_proving_batch.md @@ -145,7 +145,7 @@ send the data to the endpoint `{address}/verify_batch/{batch_number}`. Example: ```shell -curl -v -F upload=@{path_to_proof_binary} {address_of_API}/verify_proof/{l1_batch_number} +curl -v -F proof=@{path_to_proof_binary} {address_of_API}/verify_proof/{l1_batch_number} ``` API will respond with status 200 if the proof is valid and with the error message otherwise. From ecb5a8ca6333dc2bf0f05e51484b7bfbeafb62e6 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Wed, 28 Aug 2024 15:06:18 +0300 Subject: [PATCH 08/10] address comments --- .../src/middleware.rs | 1 + .../src/processor.rs | 53 ++++++++++--------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/core/node/external_proof_integration_api/src/middleware.rs b/core/node/external_proof_integration_api/src/middleware.rs index 9b1c8395cf41..1dc6aefe9171 100644 --- a/core/node/external_proof_integration_api/src/middleware.rs +++ b/core/node/external_proof_integration_api/src/middleware.rs @@ -2,6 +2,7 @@ use tokio::time::Instant; use crate::metrics::{CallOutcome, Method, METRICS}; +#[derive(Debug)] pub(crate) struct MetricsMiddleware { method: Method, started_at: Instant, diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index d78912d8b38f..4b71fe937088 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -72,11 +72,24 @@ impl Processor { mut multipart: Multipart, ) -> Result<(), ProcessorError> { let l1_batch_number = L1BatchNumber(l1_batch_number); - tracing::info!( + tracing::debug!( "Received request to verify proof for batch: {:?}", l1_batch_number ); + let latest_available_batch = self + .pool + .connection() + .await + .unwrap() + .proof_generation_dal() + .get_latest_proven_batch() + .await?; + + if l1_batch_number > latest_available_batch { + return Err(ProcessorError::BatchNotReady(l1_batch_number)); + } + let mut serialized_proof = vec![]; while let Some(field) = multipart @@ -88,6 +101,7 @@ impl Processor { && field.content_type() == Some("application/octet-stream") { serialized_proof.extend_from_slice(&field.bytes().await.unwrap()); + break; } } @@ -99,7 +113,8 @@ impl Processor { &self .blob_store .get::((l1_batch_number, payload.0.protocol_version)) - .await?, + .await + .map_err(|_| ProcessorError::ProofWasPurged)?, )?; if serialized_proof != expected_proof { @@ -109,11 +124,10 @@ impl Processor { Ok(()) } - #[tracing::instrument(skip_all)] pub(crate) async fn get_proof_generation_data( &mut self, ) -> Result { - tracing::info!("Received request for proof generation data"); + tracing::debug!("Received request for proof generation data"); let latest_available_batch = self .pool @@ -124,23 +138,17 @@ impl Processor { .get_latest_proven_batch() .await?; - let proof_generation_data = self - .proof_generation_data_for_existing_batch_internal(latest_available_batch) - .await; - - match proof_generation_data { - Ok(data) => Ok(ProofGenerationDataResponse(data)), - Err(err) => Err(err), - } + self.proof_generation_data_for_existing_batch_internal(latest_available_batch) + .await + .map(ProofGenerationDataResponse) } - #[tracing::instrument(skip(self))] pub(crate) async fn proof_generation_data_for_existing_batch( &self, Path(l1_batch_number): Path, ) -> Result { let l1_batch_number = L1BatchNumber(l1_batch_number); - tracing::info!( + tracing::debug!( "Received request for proof generation data for batch: {:?}", l1_batch_number ); @@ -163,17 +171,11 @@ impl Processor { return Err(ProcessorError::BatchNotReady(l1_batch_number)); } - let proof_generation_data = self - .proof_generation_data_for_existing_batch_internal(latest_available_batch) - .await; - - match proof_generation_data { - Ok(data) => Ok(ProofGenerationDataResponse(data)), - Err(err) => Err(err), - } + self.proof_generation_data_for_existing_batch_internal(latest_available_batch) + .await + .map(ProofGenerationDataResponse) } - #[tracing::instrument(skip(self))] async fn proof_generation_data_for_existing_batch_internal( &self, l1_batch_number: L1BatchNumber, @@ -182,12 +184,13 @@ impl Processor { .blob_store .get(l1_batch_number) .await - .map_err(ProcessorError::ObjectStore)?; + .map_err(ProcessorError::ObjectStore) + .map_err(ProcessorError::ProofWasPurged)?; let merkle_paths: WitnessInputMerklePaths = self .blob_store .get(l1_batch_number) .await - .map_err(ProcessorError::ObjectStore)?; + .map_err(ProcessorError::ProofWasPurged)?; // Acquire connection after interacting with GCP, to avoid holding the connection for too long. let mut conn = self.pool.connection().await.map_err(ProcessorError::Dal)?; From e6ec914baed82633b34f9826e90f8d42276d9223 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Wed, 28 Aug 2024 15:12:42 +0300 Subject: [PATCH 09/10] fix --- .../external_proof_integration_api/src/processor.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index 4b71fe937088..277c2e694de8 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -113,8 +113,7 @@ impl Processor { &self .blob_store .get::((l1_batch_number, payload.0.protocol_version)) - .await - .map_err(|_| ProcessorError::ProofWasPurged)?, + .await?, )?; if serialized_proof != expected_proof { @@ -184,13 +183,8 @@ impl Processor { .blob_store .get(l1_batch_number) .await - .map_err(ProcessorError::ObjectStore) - .map_err(ProcessorError::ProofWasPurged)?; - let merkle_paths: WitnessInputMerklePaths = self - .blob_store - .get(l1_batch_number) - .await - .map_err(ProcessorError::ProofWasPurged)?; + .map_err(ProcessorError::ObjectStore)?; + let merkle_paths: WitnessInputMerklePaths = self.blob_store.get(l1_batch_number).await?; // Acquire connection after interacting with GCP, to avoid holding the connection for too long. let mut conn = self.pool.connection().await.map_err(ProcessorError::Dal)?; From b9df191e510c009b57b586b5f72061b67ed5821b Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Wed, 28 Aug 2024 15:14:56 +0300 Subject: [PATCH 10/10] fix --- .../node/external_proof_integration_api/src/processor.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index 277c2e694de8..64748f5c2278 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -113,7 +113,8 @@ impl Processor { &self .blob_store .get::((l1_batch_number, payload.0.protocol_version)) - .await?, + .await + .map_err(ProcessorError::ObjectStore)?, )?; if serialized_proof != expected_proof { @@ -184,7 +185,11 @@ impl Processor { .get(l1_batch_number) .await .map_err(ProcessorError::ObjectStore)?; - let merkle_paths: WitnessInputMerklePaths = self.blob_store.get(l1_batch_number).await?; + let merkle_paths: WitnessInputMerklePaths = self + .blob_store + .get(l1_batch_number) + .await + .map_err(ProcessorError::ObjectStore)?; // Acquire connection after interacting with GCP, to avoid holding the connection for too long. let mut conn = self.pool.connection().await.map_err(ProcessorError::Dal)?;