Skip to content

Commit

Permalink
Revert "refactor(external-prover-api): Polish the API implementation (#…
Browse files Browse the repository at this point in the history
…2774)"

This reverts commit 755fc4a.
  • Loading branch information
slowli committed Sep 3, 2024
1 parent b7c969e commit 2f5283f
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 294 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions core/node/external_proof_integration_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ categories.workspace = true

[dependencies]
axum = { workspace = true, features = ["multipart"] }
async-trait.workspace = true
tracing.workspace = true
thiserror.workspace = true
zksync_prover_interface.workspace = true
zksync_basic_types.workspace = true
zksync_config.workspace = true
zksync_object_store.workspace = true
zksync_dal.workspace = true
tokio.workspace = true
Expand Down
117 changes: 62 additions & 55 deletions core/node/external_proof_integration_api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,74 +6,81 @@ use zksync_basic_types::L1BatchNumber;
use zksync_dal::DalError;
use zksync_object_store::ObjectStoreError;

#[derive(Debug, thiserror::Error)]
pub(crate) enum ProcessorError {
#[error("Failed to deserialize proof data")]
Serialization(#[from] bincode::Error),
#[error("Invalid proof submitted")]
ObjectStore(ObjectStoreError),
Dal(DalError),
Serialization(bincode::Error),
InvalidProof,
#[error("Batch {0} is not yet ready for proving. Most likely our proof for this batch is not generated yet, try again later")]
BatchNotReady(L1BatchNumber),
#[error("Invalid file: {0}")]
InvalidFile(#[from] FileError),
#[error("Internal error")]
Internal,
#[error("Proof verification not possible anymore, batch is too old")]
ProofIsGone,
}

impl ProcessorError {
fn status_code(&self) -> StatusCode {
match self {
Self::Internal => StatusCode::INTERNAL_SERVER_ERROR,
Self::Serialization(_) => StatusCode::BAD_REQUEST,
Self::InvalidProof => StatusCode::BAD_REQUEST,
Self::InvalidFile(_) => StatusCode::BAD_REQUEST,
Self::BatchNotReady(_) => StatusCode::NOT_FOUND,
Self::ProofIsGone => StatusCode::GONE,
}
impl From<ObjectStoreError> for ProcessorError {
fn from(err: ObjectStoreError) -> Self {
Self::ObjectStore(err)
}
}

impl IntoResponse for ProcessorError {
fn into_response(self) -> Response {
(self.status_code(), self.to_string()).into_response()
impl From<DalError> for ProcessorError {
fn from(err: DalError) -> Self {
Self::Dal(err)
}
}

impl From<ObjectStoreError> for ProcessorError {
fn from(err: ObjectStoreError) -> Self {
match err {
ObjectStoreError::KeyNotFound(_) => {
tracing::debug!("Too old proof was requested: {:?}", err);
Self::ProofIsGone
}
_ => {
tracing::warn!("GCS error: {:?}", err);
Self::Internal
}
}
impl From<bincode::Error> for ProcessorError {
fn from(err: bincode::Error) -> Self {
Self::Serialization(err)
}
}

impl From<DalError> for ProcessorError {
fn from(_err: DalError) -> Self {
// We don't want to check if the error is `RowNotFound`: we check that batch exists before
// processing a request, so it's handled separately.
// Thus, any unhandled error from DAL is an internal error.
Self::Internal
impl IntoResponse for ProcessorError {
fn into_response(self) -> Response {
let (status_code, message) = match self {
ProcessorError::ObjectStore(err) => {
tracing::error!("GCS error: {:?}", err);
match err {
ObjectStoreError::KeyNotFound(_) => (
StatusCode::NOT_FOUND,
"Proof verification not possible anymore, batch is too old.".to_owned(),
),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed fetching from GCS".to_owned(),
),
}
}
ProcessorError::Dal(err) => {
tracing::error!("Sqlx error: {:?}", err);
match err.inner() {
zksync_dal::SqlxError::RowNotFound => {
(StatusCode::NOT_FOUND, "Non existing L1 batch".to_owned())
}
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed fetching/saving from db".to_owned(),
),
}
}
ProcessorError::Serialization(err) => {
tracing::error!("Serialization error: {:?}", err);
(
StatusCode::BAD_REQUEST,
"Failed to deserialize proof data".to_owned(),
)
}
ProcessorError::BatchNotReady(l1_batch_number) => {
tracing::error!(
"Batch {l1_batch_number:?} is not yet ready for proving. Most likely our proof for this batch is not generated yet"
);
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Batch {l1_batch_number:?} is not yet ready for proving. Most likely our proof for this batch is not generated yet, try again later"),
)
}
ProcessorError::InvalidProof => {
tracing::error!("Invalid proof data");
(StatusCode::BAD_REQUEST, "Invalid proof data".to_owned())
}
};
(status_code, message).into_response()
}
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum FileError {
#[error("Multipart error: {0}")]
MultipartRejection(#[from] axum::extract::multipart::MultipartRejection),
#[error("Multipart error: {0}")]
Multipart(#[from] axum::extract::multipart::MultipartError),
#[error("File not found in request. It was expected to be in the field {field_name} with the content type {content_type}")]
FileNotFound {
field_name: &'static str,
content_type: &'static str,
},
}
155 changes: 71 additions & 84 deletions core/node/external_proof_integration_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,81 +2,43 @@ mod error;
mod metrics;
mod middleware;
mod processor;
mod types;

pub use crate::processor::Processor;

use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use anyhow::Context;
use axum::{
extract::{Path, Request, State},
extract::{Multipart, Path, Request},
middleware::Next,
routing::{get, post},
Router,
};
use error::ProcessorError;
use tokio::sync::watch;
use types::{ExternalProof, ProofGenerationDataResponse};
use zksync_basic_types::L1BatchNumber;
use zksync_basic_types::commitment::L1BatchCommitmentMode;
use zksync_config::configs::external_proof_integration_api::ExternalProofIntegrationApiConfig;
use zksync_dal::{ConnectionPool, Core};
use zksync_object_store::ObjectStore;

use crate::{
metrics::{CallOutcome, Method},
middleware::MetricsMiddleware,
processor::Processor,
};

/// External API implementation.
#[derive(Debug)]
pub struct Api {
router: Router,
port: u16,
}

impl Api {
pub fn new(processor: Processor, port: u16) -> Self {
let middleware_factory = |method: Method| {
axum::middleware::from_fn(move |req: Request, next: Next| async move {
let middleware = MetricsMiddleware::new(method);
let response = next.run(req).await;
let outcome = match response.status().is_success() {
true => CallOutcome::Success,
false => CallOutcome::Failure,
};
middleware.observe(outcome);
response
})
};

let router = Router::new()
.route(
"/proof_generation_data",
get(Api::latest_generation_data)
.layer(middleware_factory(Method::GetLatestProofGenerationData)),
)
.route(
"/proof_generation_data/:l1_batch_number",
get(Api::generation_data_for_existing_batch)
.layer(middleware_factory(Method::GetSpecificProofGenerationData)),
)
.route(
"/verify_proof/:l1_batch_number",
post(Api::verify_proof).layer(middleware_factory(Method::VerifyProof)),
)
.with_state(processor);
pub async fn run_server(
config: ExternalProofIntegrationApiConfig,
blob_store: Arc<dyn ObjectStore>,
connection_pool: ConnectionPool<Core>,
commitment_mode: L1BatchCommitmentMode,
mut stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let bind_address = SocketAddr::from(([0, 0, 0, 0], config.http_port));
tracing::info!("Starting external prover API server on {bind_address}");
let app = create_router(blob_store, connection_pool, commitment_mode).await;

Self { router, port }
}

pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let bind_address = SocketAddr::from(([0, 0, 0, 0], self.port));
tracing::info!("Starting external prover API server on {bind_address}");

let listener = tokio::net::TcpListener::bind(bind_address)
.await
.with_context(|| {
format!("Failed binding external prover API server to {bind_address}")
})?;
axum::serve(listener, self.router)
let listener = tokio::net::TcpListener::bind(bind_address)
.await
.with_context(|| format!("Failed binding external prover API server to {bind_address}"))?;
axum::serve(listener, app)
.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
tracing::warn!("Stop signal sender for external prover API server was dropped without sending a signal");
Expand All @@ -85,32 +47,57 @@ impl Api {
})
.await
.context("External prover API server failed")?;
tracing::info!("External prover API server shut down");
Ok(())
}
tracing::info!("External prover API server shut down");
Ok(())
}

async fn latest_generation_data(
State(processor): State<Processor>,
) -> Result<ProofGenerationDataResponse, ProcessorError> {
processor.get_proof_generation_data().await
}
async fn create_router(
blob_store: Arc<dyn ObjectStore>,
connection_pool: ConnectionPool<Core>,
commitment_mode: L1BatchCommitmentMode,
) -> Router {
let mut processor =
Processor::new(blob_store.clone(), connection_pool.clone(), commitment_mode);
let verify_proof_processor = processor.clone();
let specific_proof_processor = processor.clone();

async fn generation_data_for_existing_batch(
State(processor): State<Processor>,
Path(l1_batch_number): Path<u32>,
) -> Result<ProofGenerationDataResponse, ProcessorError> {
processor
.proof_generation_data_for_existing_batch(L1BatchNumber(l1_batch_number))
.await
}
let middleware_factory = |method: Method| {
axum::middleware::from_fn(move |req: Request, next: Next| async move {
let middleware = MetricsMiddleware::new(method);
let response = next.run(req).await;
let outcome = match response.status().is_success() {
true => CallOutcome::Success,
false => CallOutcome::Failure,
};
middleware.observe(outcome);
response
})
};

async fn verify_proof(
State(processor): State<Processor>,
Path(l1_batch_number): Path<u32>,
proof: ExternalProof,
) -> Result<(), ProcessorError> {
processor
.verify_proof(L1BatchNumber(l1_batch_number), proof)
.await
}
Router::new()
.route(
"/proof_generation_data",
get(move || async move { processor.get_proof_generation_data().await })
.layer(middleware_factory(Method::GetLatestProofGenerationData)),
)
.route(
"/proof_generation_data/:l1_batch_number",
get(move |l1_batch_number: Path<u32>| async move {
specific_proof_processor
.proof_generation_data_for_existing_batch(l1_batch_number)
.await
})
.layer(middleware_factory(Method::GetSpecificProofGenerationData)),
)
.route(
"/verify_proof/:l1_batch_number",
post(
move |l1_batch_number: Path<u32>, multipart: Multipart| async move {
verify_proof_processor
.verify_proof(l1_batch_number, multipart)
.await
},
)
.layer(middleware_factory(Method::VerifyProof)),
)
}
Loading

0 comments on commit 2f5283f

Please sign in to comment.