diff --git a/prover/crates/bin/proof_fri_compressor/src/compressor.rs b/prover/crates/bin/proof_fri_compressor/src/compressor.rs index 077347bce9be..e462097e38d0 100644 --- a/prover/crates/bin/proof_fri_compressor/src/compressor.rs +++ b/prover/crates/bin/proof_fri_compressor/src/compressor.rs @@ -59,7 +59,6 @@ impl ProofCompressor { #[tracing::instrument(skip(proof, _compression_mode))] pub fn compress_proof( - l1_batch: L1BatchNumber, proof: ZkSyncRecursionLayerProof, _compression_mode: u8, keystore: Keystore, @@ -171,16 +170,13 @@ impl JobProcessor for ProofCompressor { async fn process_job( &self, - job_id: &L1BatchNumber, + _job_id: &L1BatchNumber, job: ZkSyncRecursionLayerProof, _started_at: Instant, ) -> JoinHandle> { let compression_mode = self.compression_mode; - let block_number = *job_id; let keystore = self.keystore.clone(); - tokio::task::spawn_blocking(move || { - Self::compress_proof(block_number, job, compression_mode, keystore) - }) + tokio::task::spawn_blocking(move || Self::compress_proof(job, compression_mode, keystore)) } async fn save_result( diff --git a/prover/crates/bin/witness_generator/src/artifacts.rs b/prover/crates/bin/witness_generator/src/artifacts.rs new file mode 100644 index 000000000000..f509d3b2f64a --- /dev/null +++ b/prover/crates/bin/witness_generator/src/artifacts.rs @@ -0,0 +1,50 @@ +use std::time::Instant; + +use async_trait::async_trait; +use zksync_object_store::ObjectStore; +use zksync_prover_dal::{ConnectionPool, Prover}; + +#[derive(Debug)] +pub(crate) struct AggregationBlobUrls { + pub aggregations_urls: String, + pub circuit_ids_and_urls: Vec<(u8, String)>, +} + +#[derive(Debug)] +pub(crate) struct SchedulerBlobUrls { + pub circuit_ids_and_urls: Vec<(u8, String)>, + pub closed_form_inputs_and_urls: Vec<(u8, String, usize)>, + pub scheduler_witness_url: String, +} + +pub(crate) enum BlobUrls { + Url(String), + Aggregation(AggregationBlobUrls), + Scheduler(SchedulerBlobUrls), +} + +#[async_trait] +pub(crate) trait ArtifactsManager { + type InputMetadata; + type InputArtifacts; + type OutputArtifacts; + + async fn get_artifacts( + metadata: &Self::InputMetadata, + object_store: &dyn ObjectStore, + ) -> anyhow::Result; + + async fn save_artifacts( + job_id: u32, + artifacts: Self::OutputArtifacts, + object_store: &dyn ObjectStore, + ) -> BlobUrls; + + async fn update_database( + connection_pool: &ConnectionPool, + job_id: u32, + started_at: Instant, + blob_urls: BlobUrls, + artifacts: Self::OutputArtifacts, + ) -> anyhow::Result<()>; +} diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs b/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs new file mode 100644 index 000000000000..3447659f8296 --- /dev/null +++ b/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs @@ -0,0 +1,108 @@ +use std::time::Instant; + +use async_trait::async_trait; +use zksync_object_store::ObjectStore; +use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_prover_fri_types::AuxOutputWitnessWrapper; +use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer; +use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; + +use crate::{ + artifacts::{ArtifactsManager, BlobUrls}, + basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob}, + utils::SchedulerPartialInputWrapper, +}; + +#[async_trait] +impl ArtifactsManager for BasicWitnessGenerator { + type InputMetadata = L1BatchNumber; + type InputArtifacts = BasicWitnessGeneratorJob; + type OutputArtifacts = BasicCircuitArtifacts; + + async fn get_artifacts( + metadata: &Self::InputMetadata, + object_store: &dyn ObjectStore, + ) -> anyhow::Result { + let l1_batch_number = *metadata; + let data = object_store.get(l1_batch_number).await.unwrap(); + Ok(BasicWitnessGeneratorJob { + block_number: l1_batch_number, + data, + }) + } + + async fn save_artifacts( + job_id: u32, + artifacts: Self::OutputArtifacts, + object_store: &dyn ObjectStore, + ) -> BlobUrls { + let aux_output_witness_wrapper = AuxOutputWitnessWrapper(artifacts.aux_output_witness); + object_store + .put(L1BatchNumber(job_id), &aux_output_witness_wrapper) + .await + .unwrap(); + let wrapper = SchedulerPartialInputWrapper(artifacts.scheduler_witness); + let url = object_store + .put(L1BatchNumber(job_id), &wrapper) + .await + .unwrap(); + + BlobUrls::Url(url) + } + + #[tracing::instrument(skip_all, fields(l1_batch = %job_id))] + async fn update_database( + connection_pool: &ConnectionPool, + job_id: u32, + started_at: Instant, + blob_urls: BlobUrls, + _artifacts: Self::OutputArtifacts, + ) -> anyhow::Result<()> { + let blob_urls = match blob_urls { + BlobUrls::Scheduler(blobs) => blobs, + _ => unreachable!(), + }; + + let mut connection = connection_pool + .connection() + .await + .expect("failed to get database connection"); + let mut transaction = connection + .start_transaction() + .await + .expect("failed to get database transaction"); + let protocol_version_id = transaction + .fri_witness_generator_dal() + .protocol_version_for_l1_batch(L1BatchNumber(job_id)) + .await; + transaction + .fri_prover_jobs_dal() + .insert_prover_jobs( + L1BatchNumber(job_id), + blob_urls.circuit_ids_and_urls, + AggregationRound::BasicCircuits, + 0, + protocol_version_id, + ) + .await; + transaction + .fri_witness_generator_dal() + .create_aggregation_jobs( + L1BatchNumber(job_id), + &blob_urls.closed_form_inputs_and_urls, + &blob_urls.scheduler_witness_url, + get_recursive_layer_circuit_id_for_base_layer, + protocol_version_id, + ) + .await; + transaction + .fri_witness_generator_dal() + .mark_witness_job_as_successful(L1BatchNumber(job_id), started_at.elapsed()) + .await; + transaction + .commit() + .await + .expect("failed to commit database transaction"); + Ok(()) + } +} diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs new file mode 100644 index 000000000000..08732689e3a6 --- /dev/null +++ b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs @@ -0,0 +1,153 @@ +use std::{sync::Arc, time::Instant}; + +use anyhow::Context as _; +use tracing::Instrument; +use zksync_prover_dal::ProverDal; +use zksync_prover_fri_types::{get_current_pod_name, AuxOutputWitnessWrapper}; +use zksync_queued_job_processor::{async_trait, JobProcessor}; +use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; + +use crate::{ + artifacts::{ArtifactsManager, BlobUrls, SchedulerBlobUrls}, + basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob}, + metrics::WITNESS_GENERATOR_METRICS, +}; + +#[async_trait] +impl JobProcessor for BasicWitnessGenerator { + type Job = BasicWitnessGeneratorJob; + type JobId = L1BatchNumber; + // The artifact is optional to support skipping blocks when sampling is enabled. + type JobArtifacts = Option; + + const SERVICE_NAME: &'static str = "fri_basic_circuit_witness_generator"; + + async fn get_next_job(&self) -> anyhow::Result> { + let mut prover_connection = self.prover_connection_pool.connection().await?; + let last_l1_batch_to_process = self.config.last_l1_batch_to_process(); + let pod_name = get_current_pod_name(); + match prover_connection + .fri_witness_generator_dal() + .get_next_basic_circuit_witness_job( + last_l1_batch_to_process, + self.protocol_version, + &pod_name, + ) + .await + { + Some(block_number) => { + tracing::info!( + "Processing FRI basic witness-gen for block {}", + block_number + ); + let started_at = Instant::now(); + let job = Self::get_artifacts(&block_number, &*self.object_store).await?; + + WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::BasicCircuits.into()] + .observe(started_at.elapsed()); + + Ok(Some((block_number, job))) + } + None => Ok(None), + } + } + + async fn save_failure(&self, job_id: L1BatchNumber, _started_at: Instant, error: String) -> () { + self.prover_connection_pool + .connection() + .await + .unwrap() + .fri_witness_generator_dal() + .mark_witness_job_failed(&error, job_id) + .await; + } + + #[allow(clippy::async_yields_async)] + async fn process_job( + &self, + _job_id: &Self::JobId, + job: BasicWitnessGeneratorJob, + started_at: Instant, + ) -> tokio::task::JoinHandle>> { + let object_store = Arc::clone(&self.object_store); + let max_circuits_in_flight = self.config.max_circuits_in_flight; + tokio::spawn(async move { + let block_number = job.block_number; + Ok( + Self::process_job_impl(object_store, job, started_at, max_circuits_in_flight) + .instrument(tracing::info_span!("basic_circuit", %block_number)) + .await, + ) + }) + } + + #[tracing::instrument(skip_all, fields(l1_batch = %job_id))] + async fn save_result( + &self, + job_id: L1BatchNumber, + started_at: Instant, + optional_artifacts: Option, + ) -> anyhow::Result<()> { + match optional_artifacts { + None => Ok(()), + Some(artifacts) => { + let blob_started_at = Instant::now(); + let circuit_urls = artifacts.circuit_urls.clone(); + let queue_urls = artifacts.queue_urls.clone(); + + let aux_output_witness_wrapper = + AuxOutputWitnessWrapper(artifacts.aux_output_witness.clone()); + if self.config.shall_save_to_public_bucket { + self.public_blob_store.as_deref() + .expect("public_object_store shall not be empty while running with shall_save_to_public_bucket config") + .put(job_id, &aux_output_witness_wrapper) + .await + .unwrap(); + } + + let scheduler_witness_url = + match Self::save_artifacts(job_id.0, artifacts.clone(), &*self.object_store) + .await + { + BlobUrls::Url(url) => url, + _ => unreachable!(), + }; + + WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::BasicCircuits.into()] + .observe(blob_started_at.elapsed()); + + Self::update_database( + &self.prover_connection_pool, + job_id.0, + started_at, + BlobUrls::Scheduler(SchedulerBlobUrls { + circuit_ids_and_urls: circuit_urls, + closed_form_inputs_and_urls: queue_urls, + scheduler_witness_url, + }), + artifacts, + ) + .await?; + Ok(()) + } + } + } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .connection() + .await + .context("failed to acquire DB connection for BasicWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_basic_circuit_witness_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for BasicWitnessGenerator") + } +} diff --git a/prover/crates/bin/witness_generator/src/basic_circuits.rs b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs similarity index 63% rename from prover/crates/bin/witness_generator/src/basic_circuits.rs rename to prover/crates/bin/witness_generator/src/basic_circuits/mod.rs index 00a4d99ba9a9..c9755c333dad 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs @@ -1,49 +1,43 @@ use std::{ - collections::{hash_map::DefaultHasher, HashSet}, - hash::{Hash, Hasher}, + collections::HashSet, + hash::{DefaultHasher, Hash, Hasher}, sync::Arc, time::Instant, }; -use anyhow::Context as _; -use async_trait::async_trait; use circuit_definitions::{ circuit_definitions::base_layer::{ZkSyncBaseLayerCircuit, ZkSyncBaseLayerStorage}, encodings::recursion_request::RecursionQueueSimulator, - zkevm_circuits::fsm_input_output::ClosedFormInputCompactFormWitness, + zkevm_circuits::{ + fsm_input_output::ClosedFormInputCompactFormWitness, + scheduler::{ + block_header::BlockAuxilaryOutputWitness, input::SchedulerCircuitInstanceWitness, + }, + }, }; use tokio::sync::Semaphore; use tracing::Instrument; -use zkevm_test_harness::{ - geometry_config::get_geometry_config, witness::oracle::WitnessGenerationArtifact, -}; +use zkevm_test_harness::witness::oracle::WitnessGenerationArtifact; use zksync_config::configs::FriWitnessGeneratorConfig; use zksync_multivm::{ - interface::storage::StorageView, - vm_latest::{constants::MAX_CYCLES_FOR_TX, HistoryDisabled, StorageOracle as VmStorageOracle}, -}; -use zksync_object_store::ObjectStore; -use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; -use zksync_prover_fri_types::{ - circuit_definitions::{ + circuit_sequencer_api_latest::{ boojum::{ field::goldilocks::{GoldilocksExt2, GoldilocksField}, gadgets::recursion::recursive_tree_hasher::CircuitGoldilocksPoseidon2Sponge, }, - zkevm_circuits::scheduler::{ - block_header::BlockAuxilaryOutputWitness, input::SchedulerCircuitInstanceWitness, - }, + geometry_config::get_geometry_config, }, - get_current_pod_name, - keys::ClosedFormInputKey, - AuxOutputWitnessWrapper, CircuitAuxData, + interface::storage::StorageView, + vm_latest::{constants::MAX_CYCLES_FOR_TX, HistoryDisabled, StorageOracle as VmStorageOracle}, + zk_evm_latest::ethereum_types::Address, }; -use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer; +use zksync_object_store::ObjectStore; +use zksync_prover_dal::{ConnectionPool, Prover}; +use zksync_prover_fri_types::{keys::ClosedFormInputKey, CircuitAuxData}; use zksync_prover_interface::inputs::WitnessInputData; -use zksync_queued_job_processor::JobProcessor; +use zksync_system_constants::BOOTLOADER_ADDRESS; use zksync_types::{ - basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, Address, - L1BatchNumber, BOOTLOADER_ADDRESS, + basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; use crate::{ @@ -52,33 +46,30 @@ use crate::{ storage_oracle::StorageOracle, utils::{ expand_bootloader_contents, save_circuit, save_ram_premutation_queue_witness, - ClosedFormInputWrapper, SchedulerPartialInputWrapper, KZG_TRUSTED_SETUP_FILE, + ClosedFormInputWrapper, KZG_TRUSTED_SETUP_FILE, }, witness::WitnessStorage, }; +mod artifacts; +pub mod job_processor; + +#[derive(Clone)] pub struct BasicCircuitArtifacts { - circuit_urls: Vec<(u8, String)>, - queue_urls: Vec<(u8, String, usize)>, - scheduler_witness: SchedulerCircuitInstanceWitness< + pub(super) circuit_urls: Vec<(u8, String)>, + pub(super) queue_urls: Vec<(u8, String, usize)>, + pub(super) scheduler_witness: SchedulerCircuitInstanceWitness< GoldilocksField, CircuitGoldilocksPoseidon2Sponge, GoldilocksExt2, >, - aux_output_witness: BlockAuxilaryOutputWitness, -} - -#[derive(Debug)] -struct BlobUrls { - circuit_ids_and_urls: Vec<(u8, String)>, - closed_form_inputs_and_urls: Vec<(u8, String, usize)>, - scheduler_witness_url: String, + pub(super) aux_output_witness: BlockAuxilaryOutputWitness, } #[derive(Clone)] pub struct BasicWitnessGeneratorJob { - block_number: L1BatchNumber, - job: WitnessInputData, + pub(super) block_number: L1BatchNumber, + pub(super) data: WitnessInputData, } #[derive(Debug)] @@ -90,6 +81,17 @@ pub struct BasicWitnessGenerator { protocol_version: ProtocolSemanticVersion, } +type Witness = ( + Vec<(u8, String)>, + Vec<(u8, String, usize)>, + SchedulerCircuitInstanceWitness< + GoldilocksField, + CircuitGoldilocksPoseidon2Sponge, + GoldilocksExt2, + >, + BlockAuxilaryOutputWitness, +); + impl BasicWitnessGenerator { pub fn new( config: FriWitnessGeneratorConfig, @@ -113,7 +115,10 @@ impl BasicWitnessGenerator { started_at: Instant, max_circuits_in_flight: usize, ) -> Option { - let BasicWitnessGeneratorJob { block_number, job } = basic_job; + let BasicWitnessGeneratorJob { + block_number, + data: job, + } = basic_job; tracing::info!( "Starting witness generation of type {:?} for block {}", @@ -134,135 +139,8 @@ impl BasicWitnessGenerator { } } -#[async_trait] -impl JobProcessor for BasicWitnessGenerator { - type Job = BasicWitnessGeneratorJob; - type JobId = L1BatchNumber; - // The artifact is optional to support skipping blocks when sampling is enabled. - type JobArtifacts = Option; - - const SERVICE_NAME: &'static str = "fri_basic_circuit_witness_generator"; - - async fn get_next_job(&self) -> anyhow::Result> { - let mut prover_connection = self.prover_connection_pool.connection().await?; - let last_l1_batch_to_process = self.config.last_l1_batch_to_process(); - let pod_name = get_current_pod_name(); - match prover_connection - .fri_witness_generator_dal() - .get_next_basic_circuit_witness_job( - last_l1_batch_to_process, - self.protocol_version, - &pod_name, - ) - .await - { - Some(block_number) => { - tracing::info!( - "Processing FRI basic witness-gen for block {}", - block_number - ); - let started_at = Instant::now(); - let job = get_artifacts(block_number, &*self.object_store).await; - - WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::BasicCircuits.into()] - .observe(started_at.elapsed()); - - Ok(Some((block_number, job))) - } - None => Ok(None), - } - } - - async fn save_failure(&self, job_id: L1BatchNumber, _started_at: Instant, error: String) -> () { - self.prover_connection_pool - .connection() - .await - .unwrap() - .fri_witness_generator_dal() - .mark_witness_job_failed(&error, job_id) - .await; - } - - #[allow(clippy::async_yields_async)] - async fn process_job( - &self, - _job_id: &Self::JobId, - job: BasicWitnessGeneratorJob, - started_at: Instant, - ) -> tokio::task::JoinHandle>> { - let object_store = Arc::clone(&self.object_store); - let max_circuits_in_flight = self.config.max_circuits_in_flight; - tokio::spawn(async move { - let block_number = job.block_number; - Ok( - Self::process_job_impl(object_store, job, started_at, max_circuits_in_flight) - .instrument(tracing::info_span!("basic_circuit", %block_number)) - .await, - ) - }) - } - - #[tracing::instrument(skip_all, fields(l1_batch = %job_id))] - async fn save_result( - &self, - job_id: L1BatchNumber, - started_at: Instant, - optional_artifacts: Option, - ) -> anyhow::Result<()> { - match optional_artifacts { - None => Ok(()), - Some(artifacts) => { - let blob_started_at = Instant::now(); - let scheduler_witness_url = save_scheduler_artifacts( - job_id, - artifacts.scheduler_witness, - artifacts.aux_output_witness, - &*self.object_store, - self.public_blob_store.as_deref(), - self.config.shall_save_to_public_bucket, - ) - .await; - - WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::BasicCircuits.into()] - .observe(blob_started_at.elapsed()); - - update_database( - &self.prover_connection_pool, - started_at, - job_id, - BlobUrls { - circuit_ids_and_urls: artifacts.circuit_urls, - closed_form_inputs_and_urls: artifacts.queue_urls, - scheduler_witness_url, - }, - ) - .await; - Ok(()) - } - } - } - - fn max_attempts(&self) -> u32 { - self.config.max_attempts - } - - async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { - let mut prover_storage = self - .prover_connection_pool - .connection() - .await - .context("failed to acquire DB connection for BasicWitnessGenerator")?; - prover_storage - .fri_witness_generator_dal() - .get_basic_circuit_witness_job_attempts(*job_id) - .await - .map(|attempts| attempts.unwrap_or(0)) - .context("failed to get job attempts for BasicWitnessGenerator") - } -} - #[tracing::instrument(skip_all, fields(l1_batch = %block_number))] -async fn process_basic_circuits_job( +pub(super) async fn process_basic_circuits_job( object_store: Arc, started_at: Instant, block_number: L1BatchNumber, @@ -287,93 +165,6 @@ async fn process_basic_circuits_job( } } -#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] -async fn update_database( - prover_connection_pool: &ConnectionPool, - started_at: Instant, - block_number: L1BatchNumber, - blob_urls: BlobUrls, -) { - let mut connection = prover_connection_pool - .connection() - .await - .expect("failed to get database connection"); - let mut transaction = connection - .start_transaction() - .await - .expect("failed to get database transaction"); - let protocol_version_id = transaction - .fri_witness_generator_dal() - .protocol_version_for_l1_batch(block_number) - .await; - transaction - .fri_prover_jobs_dal() - .insert_prover_jobs( - block_number, - blob_urls.circuit_ids_and_urls, - AggregationRound::BasicCircuits, - 0, - protocol_version_id, - ) - .await; - transaction - .fri_witness_generator_dal() - .create_aggregation_jobs( - block_number, - &blob_urls.closed_form_inputs_and_urls, - &blob_urls.scheduler_witness_url, - get_recursive_layer_circuit_id_for_base_layer, - protocol_version_id, - ) - .await; - transaction - .fri_witness_generator_dal() - .mark_witness_job_as_successful(block_number, started_at.elapsed()) - .await; - transaction - .commit() - .await - .expect("failed to commit database transaction"); -} - -#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] -async fn get_artifacts( - block_number: L1BatchNumber, - object_store: &dyn ObjectStore, -) -> BasicWitnessGeneratorJob { - let job = object_store.get(block_number).await.unwrap(); - BasicWitnessGeneratorJob { block_number, job } -} - -#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] -async fn save_scheduler_artifacts( - block_number: L1BatchNumber, - scheduler_partial_input: SchedulerCircuitInstanceWitness< - GoldilocksField, - CircuitGoldilocksPoseidon2Sponge, - GoldilocksExt2, - >, - aux_output_witness: BlockAuxilaryOutputWitness, - object_store: &dyn ObjectStore, - public_object_store: Option<&dyn ObjectStore>, - shall_save_to_public_bucket: bool, -) -> String { - let aux_output_witness_wrapper = AuxOutputWitnessWrapper(aux_output_witness); - if shall_save_to_public_bucket { - public_object_store - .expect("public_object_store shall not be empty while running with shall_save_to_public_bucket config") - .put(block_number, &aux_output_witness_wrapper) - .await - .unwrap(); - } - object_store - .put(block_number, &aux_output_witness_wrapper) - .await - .unwrap(); - let wrapper = SchedulerPartialInputWrapper(scheduler_partial_input); - object_store.put(block_number, &wrapper).await.unwrap() -} - #[tracing::instrument(skip_all, fields(l1_batch = %block_number, circuit_id = %circuit_id))] async fn save_recursion_queue( block_number: L1BatchNumber, @@ -396,17 +187,6 @@ async fn save_recursion_queue( (circuit_id, blob_url, basic_circuit_count) } -type Witness = ( - Vec<(u8, String)>, - Vec<(u8, String, usize)>, - SchedulerCircuitInstanceWitness< - GoldilocksField, - CircuitGoldilocksPoseidon2Sponge, - GoldilocksExt2, - >, - BlockAuxilaryOutputWitness, -); - #[tracing::instrument(skip_all, fields(l1_batch = %block_number))] async fn generate_witness( block_number: L1BatchNumber, diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs new file mode 100644 index 000000000000..a94587d00ec6 --- /dev/null +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs @@ -0,0 +1,150 @@ +use std::time::Instant; + +use async_trait::async_trait; +use zksync_object_store::ObjectStore; +use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_prover_fri_types::keys::ClosedFormInputKey; +use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer; +use zksync_types::{basic_fri_types::AggregationRound, prover_dal::LeafAggregationJobMetadata}; + +use crate::{ + artifacts::{AggregationBlobUrls, ArtifactsManager, BlobUrls}, + leaf_aggregation::{LeafAggregationArtifacts, LeafAggregationWitnessGenerator}, + metrics::WITNESS_GENERATOR_METRICS, + utils::{save_node_aggregations_artifacts, ClosedFormInputWrapper}, +}; + +#[async_trait] +impl ArtifactsManager for LeafAggregationWitnessGenerator { + type InputMetadata = LeafAggregationJobMetadata; + type InputArtifacts = ClosedFormInputWrapper; + type OutputArtifacts = LeafAggregationArtifacts; + + async fn get_artifacts( + metadata: &Self::InputMetadata, + object_store: &dyn ObjectStore, + ) -> anyhow::Result { + let key = ClosedFormInputKey { + block_number: metadata.block_number, + circuit_id: metadata.circuit_id, + }; + + let artifacts = object_store + .get(key) + .await + .unwrap_or_else(|_| panic!("leaf aggregation job artifacts missing: {:?}", key)); + + Ok(artifacts) + } + + #[tracing::instrument( + skip_all, + fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) + )] + async fn save_artifacts( + _job_id: u32, + artifacts: Self::OutputArtifacts, + object_store: &dyn ObjectStore, + ) -> BlobUrls { + let started_at = Instant::now(); + let aggregations_urls = save_node_aggregations_artifacts( + artifacts.block_number, + get_recursive_layer_circuit_id_for_base_layer(artifacts.circuit_id), + 0, + artifacts.aggregations, + object_store, + ) + .await; + WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::LeafAggregation.into()] + .observe(started_at.elapsed()); + + BlobUrls::Aggregation(AggregationBlobUrls { + aggregations_urls, + circuit_ids_and_urls: artifacts.circuit_ids_and_urls, + }) + } + + #[tracing::instrument( + skip_all, + fields(l1_batch = %job_id) + )] + async fn update_database( + connection_pool: &ConnectionPool, + job_id: u32, + started_at: Instant, + blob_urls: BlobUrls, + artifacts: Self::OutputArtifacts, + ) -> anyhow::Result<()> { + tracing::info!( + "Updating database for job_id {}, block {} with circuit id {}", + job_id, + artifacts.block_number.0, + artifacts.circuit_id, + ); + + let blob_urls = match blob_urls { + BlobUrls::Aggregation(blob_urls) => blob_urls, + _ => panic!("Unexpected blob urls type"), + }; + + let mut prover_connection = connection_pool.connection().await.unwrap(); + let mut transaction = prover_connection.start_transaction().await.unwrap(); + let number_of_dependent_jobs = blob_urls.circuit_ids_and_urls.len(); + let protocol_version_id = transaction + .fri_witness_generator_dal() + .protocol_version_for_l1_batch(artifacts.block_number) + .await; + tracing::info!( + "Inserting {} prover jobs for job_id {}, block {} with circuit id {}", + blob_urls.circuit_ids_and_urls.len(), + job_id, + artifacts.block_number.0, + artifacts.circuit_id, + ); + transaction + .fri_prover_jobs_dal() + .insert_prover_jobs( + artifacts.block_number, + blob_urls.circuit_ids_and_urls, + AggregationRound::LeafAggregation, + 0, + protocol_version_id, + ) + .await; + tracing::info!( + "Updating node aggregation jobs url for job_id {}, block {} with circuit id {}", + job_id, + artifacts.block_number.0, + artifacts.circuit_id, + ); + transaction + .fri_witness_generator_dal() + .update_node_aggregation_jobs_url( + artifacts.block_number, + get_recursive_layer_circuit_id_for_base_layer(artifacts.circuit_id), + number_of_dependent_jobs, + 0, + blob_urls.aggregations_urls, + ) + .await; + tracing::info!( + "Marking leaf aggregation job as successful for job id {}, block {} with circuit id {}", + job_id, + artifacts.block_number.0, + artifacts.circuit_id, + ); + transaction + .fri_witness_generator_dal() + .mark_leaf_aggregation_as_successful(job_id, started_at.elapsed()) + .await; + + tracing::info!( + "Committing transaction for job_id {}, block {} with circuit id {}", + job_id, + artifacts.block_number.0, + artifacts.circuit_id, + ); + transaction.commit().await?; + Ok(()) + } +} diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs new file mode 100644 index 000000000000..e032084151eb --- /dev/null +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs @@ -0,0 +1,124 @@ +use std::time::Instant; + +use anyhow::Context as _; +use async_trait::async_trait; +use zksync_prover_dal::ProverDal; +use zksync_prover_fri_types::get_current_pod_name; +use zksync_queued_job_processor::JobProcessor; +use zksync_types::basic_fri_types::AggregationRound; + +use crate::{ + artifacts::ArtifactsManager, + leaf_aggregation::{ + prepare_leaf_aggregation_job, LeafAggregationArtifacts, LeafAggregationWitnessGenerator, + LeafAggregationWitnessGeneratorJob, + }, + metrics::WITNESS_GENERATOR_METRICS, +}; + +#[async_trait] +impl JobProcessor for LeafAggregationWitnessGenerator { + type Job = LeafAggregationWitnessGeneratorJob; + type JobId = u32; + type JobArtifacts = LeafAggregationArtifacts; + + const SERVICE_NAME: &'static str = "fri_leaf_aggregation_witness_generator"; + + async fn get_next_job(&self) -> anyhow::Result> { + let mut prover_connection = self.prover_connection_pool.connection().await?; + let pod_name = get_current_pod_name(); + let Some(metadata) = prover_connection + .fri_witness_generator_dal() + .get_next_leaf_aggregation_job(self.protocol_version, &pod_name) + .await + else { + return Ok(None); + }; + tracing::info!("Processing leaf aggregation job {:?}", metadata.id); + Ok(Some(( + metadata.id, + prepare_leaf_aggregation_job(metadata, &*self.object_store, self.keystore.clone()) + .await + .context("prepare_leaf_aggregation_job()")?, + ))) + } + + async fn save_failure(&self, job_id: u32, _started_at: Instant, error: String) -> () { + self.prover_connection_pool + .connection() + .await + .unwrap() + .fri_witness_generator_dal() + .mark_leaf_aggregation_job_failed(&error, job_id) + .await; + } + + #[allow(clippy::async_yields_async)] + async fn process_job( + &self, + _job_id: &Self::JobId, + job: LeafAggregationWitnessGeneratorJob, + started_at: Instant, + ) -> tokio::task::JoinHandle> { + let object_store = self.object_store.clone(); + let max_circuits_in_flight = self.config.max_circuits_in_flight; + tokio::spawn(async move { + Ok(Self::process_job_impl(job, started_at, object_store, max_circuits_in_flight).await) + }) + } + + async fn save_result( + &self, + job_id: u32, + started_at: Instant, + artifacts: LeafAggregationArtifacts, + ) -> anyhow::Result<()> { + let block_number = artifacts.block_number; + let circuit_id = artifacts.circuit_id; + tracing::info!( + "Saving leaf aggregation artifacts for block {} with circuit {}", + block_number.0, + circuit_id, + ); + + let blob_save_started_at = Instant::now(); + + let blob_urls = Self::save_artifacts(job_id, artifacts.clone(), &*self.object_store).await; + + WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::LeafAggregation.into()] + .observe(blob_save_started_at.elapsed()); + + tracing::info!( + "Saved leaf aggregation artifacts for block {} with circuit {}", + block_number.0, + circuit_id, + ); + Self::update_database( + &self.prover_connection_pool, + job_id, + started_at, + blob_urls, + artifacts, + ) + .await?; + Ok(()) + } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .connection() + .await + .context("failed to acquire DB connection for LeafAggregationWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_leaf_aggregation_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for LeafAggregationWitnessGenerator") + } +} diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs similarity index 52% rename from prover/crates/bin/witness_generator/src/leaf_aggregation.rs rename to prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs index 503c46e41bbd..d669a4cc97e3 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs @@ -1,7 +1,6 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context as _; -use async_trait::async_trait; use circuit_definitions::circuit_definitions::recursion_layer::base_circuit_type_into_recursive_leaf_circuit_type; use tokio::sync::Semaphore; use zkevm_test_harness::{ @@ -12,7 +11,7 @@ use zkevm_test_harness::{ }; use zksync_config::configs::FriWitnessGeneratorConfig; use zksync_object_store::ObjectStore; -use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_prover_dal::{ConnectionPool, Prover}; use zksync_prover_fri_types::{ circuit_definitions::{ boojum::field::goldilocks::GoldilocksField, @@ -22,40 +21,25 @@ use zksync_prover_fri_types::{ encodings::recursion_request::RecursionQueueSimulator, zkevm_circuits::recursion::leaf_layer::input::RecursionLeafParametersWitness, }, - get_current_pod_name, - keys::ClosedFormInputKey, FriProofWrapper, }; -use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer; use zksync_prover_keystore::keystore::Keystore; -use zksync_queued_job_processor::JobProcessor; use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, prover_dal::LeafAggregationJobMetadata, L1BatchNumber, }; use crate::{ + artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, utils::{ - load_proofs_for_job_ids, save_node_aggregations_artifacts, - save_recursive_layer_prover_input_artifacts, ClosedFormInputWrapper, + load_proofs_for_job_ids, save_recursive_layer_prover_input_artifacts, + ClosedFormInputWrapper, }, }; -pub struct LeafAggregationArtifacts { - circuit_id: u8, - block_number: L1BatchNumber, - pub aggregations: Vec<(u64, RecursionQueueSimulator)>, - pub circuit_ids_and_urls: Vec<(u8, String)>, - #[allow(dead_code)] - closed_form_inputs: Vec>, -} - -#[derive(Debug)] -struct BlobUrls { - circuit_ids_and_urls: Vec<(u8, String)>, - aggregations_urls: String, -} +mod artifacts; +mod job_processor; pub struct LeafAggregationWitnessGeneratorJob { pub(crate) circuit_id: u8, @@ -75,6 +59,16 @@ pub struct LeafAggregationWitnessGenerator { keystore: Keystore, } +#[derive(Clone)] +pub struct LeafAggregationArtifacts { + circuit_id: u8, + block_number: L1BatchNumber, + pub aggregations: Vec<(u64, RecursionQueueSimulator)>, + pub circuit_ids_and_urls: Vec<(u8, String)>, + #[allow(dead_code)] + closed_form_inputs: Vec>, +} + impl LeafAggregationWitnessGenerator { pub fn new( config: FriWitnessGeneratorConfig, @@ -113,108 +107,6 @@ impl LeafAggregationWitnessGenerator { } } -#[async_trait] -impl JobProcessor for LeafAggregationWitnessGenerator { - type Job = LeafAggregationWitnessGeneratorJob; - type JobId = u32; - type JobArtifacts = LeafAggregationArtifacts; - - const SERVICE_NAME: &'static str = "fri_leaf_aggregation_witness_generator"; - - async fn get_next_job(&self) -> anyhow::Result> { - let mut prover_connection = self.prover_connection_pool.connection().await?; - let pod_name = get_current_pod_name(); - let Some(metadata) = prover_connection - .fri_witness_generator_dal() - .get_next_leaf_aggregation_job(self.protocol_version, &pod_name) - .await - else { - return Ok(None); - }; - tracing::info!("Processing leaf aggregation job {:?}", metadata.id); - Ok(Some(( - metadata.id, - prepare_leaf_aggregation_job(metadata, &*self.object_store, self.keystore.clone()) - .await - .context("prepare_leaf_aggregation_job()")?, - ))) - } - - async fn save_failure(&self, job_id: u32, _started_at: Instant, error: String) -> () { - self.prover_connection_pool - .connection() - .await - .unwrap() - .fri_witness_generator_dal() - .mark_leaf_aggregation_job_failed(&error, job_id) - .await; - } - - #[allow(clippy::async_yields_async)] - async fn process_job( - &self, - _job_id: &Self::JobId, - job: LeafAggregationWitnessGeneratorJob, - started_at: Instant, - ) -> tokio::task::JoinHandle> { - let object_store = self.object_store.clone(); - let max_circuits_in_flight = self.config.max_circuits_in_flight; - tokio::spawn(async move { - Ok(Self::process_job_impl(job, started_at, object_store, max_circuits_in_flight).await) - }) - } - - async fn save_result( - &self, - job_id: u32, - started_at: Instant, - artifacts: LeafAggregationArtifacts, - ) -> anyhow::Result<()> { - let block_number = artifacts.block_number; - let circuit_id = artifacts.circuit_id; - tracing::info!( - "Saving leaf aggregation artifacts for block {} with circuit {}", - block_number.0, - circuit_id, - ); - let blob_urls = save_artifacts(artifacts, &*self.object_store).await; - tracing::info!( - "Saved leaf aggregation artifacts for block {} with circuit {} (count: {})", - block_number.0, - circuit_id, - blob_urls.circuit_ids_and_urls.len(), - ); - update_database( - &self.prover_connection_pool, - started_at, - block_number, - job_id, - blob_urls, - circuit_id, - ) - .await; - Ok(()) - } - - fn max_attempts(&self) -> u32 { - self.config.max_attempts - } - - async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { - let mut prover_storage = self - .prover_connection_pool - .connection() - .await - .context("failed to acquire DB connection for LeafAggregationWitnessGenerator")?; - prover_storage - .fri_witness_generator_dal() - .get_leaf_aggregation_job_attempts(*job_id) - .await - .map(|attempts| attempts.unwrap_or(0)) - .context("failed to get job attempts for LeafAggregationWitnessGenerator") - } -} - #[tracing::instrument( skip_all, fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) @@ -225,7 +117,8 @@ pub async fn prepare_leaf_aggregation_job( keystore: Keystore, ) -> anyhow::Result { let started_at = Instant::now(); - let closed_form_input = get_artifacts(&metadata, object_store).await; + let closed_form_input = + LeafAggregationWitnessGenerator::get_artifacts(&metadata, object_store).await?; WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::LeafAggregation.into()] .observe(started_at.elapsed()); @@ -368,125 +261,3 @@ pub async fn process_leaf_aggregation_job( closed_form_inputs: job.closed_form_inputs.0, } } - -#[tracing::instrument( - skip_all, - fields(l1_batch = %block_number, circuit_id = %circuit_id) -)] -async fn update_database( - prover_connection_pool: &ConnectionPool, - started_at: Instant, - block_number: L1BatchNumber, - job_id: u32, - blob_urls: BlobUrls, - circuit_id: u8, -) { - tracing::info!( - "Updating database for job_id {}, block {} with circuit id {}", - job_id, - block_number.0, - circuit_id, - ); - let mut prover_connection = prover_connection_pool.connection().await.unwrap(); - let mut transaction = prover_connection.start_transaction().await.unwrap(); - let number_of_dependent_jobs = blob_urls.circuit_ids_and_urls.len(); - let protocol_version_id = transaction - .fri_witness_generator_dal() - .protocol_version_for_l1_batch(block_number) - .await; - tracing::info!( - "Inserting {} prover jobs for job_id {}, block {} with circuit id {}", - blob_urls.circuit_ids_and_urls.len(), - job_id, - block_number.0, - circuit_id, - ); - transaction - .fri_prover_jobs_dal() - .insert_prover_jobs( - block_number, - blob_urls.circuit_ids_and_urls, - AggregationRound::LeafAggregation, - 0, - protocol_version_id, - ) - .await; - tracing::info!( - "Updating node aggregation jobs url for job_id {}, block {} with circuit id {}", - job_id, - block_number.0, - circuit_id, - ); - transaction - .fri_witness_generator_dal() - .update_node_aggregation_jobs_url( - block_number, - get_recursive_layer_circuit_id_for_base_layer(circuit_id), - number_of_dependent_jobs, - 0, - blob_urls.aggregations_urls, - ) - .await; - tracing::info!( - "Marking leaf aggregation job as successful for job id {}, block {} with circuit id {}", - job_id, - block_number.0, - circuit_id, - ); - transaction - .fri_witness_generator_dal() - .mark_leaf_aggregation_as_successful(job_id, started_at.elapsed()) - .await; - - tracing::info!( - "Committing transaction for job_id {}, block {} with circuit id {}", - job_id, - block_number.0, - circuit_id, - ); - transaction.commit().await.unwrap(); -} - -#[tracing::instrument( - skip_all, - fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) -)] -async fn get_artifacts( - metadata: &LeafAggregationJobMetadata, - object_store: &dyn ObjectStore, -) -> ClosedFormInputWrapper { - let key = ClosedFormInputKey { - block_number: metadata.block_number, - circuit_id: metadata.circuit_id, - }; - object_store - .get(key) - .await - .unwrap_or_else(|_| panic!("leaf aggregation job artifacts missing: {:?}", key)) -} - -#[tracing::instrument( - skip_all, - fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) -)] -async fn save_artifacts( - artifacts: LeafAggregationArtifacts, - object_store: &dyn ObjectStore, -) -> BlobUrls { - let started_at = Instant::now(); - let aggregations_urls = save_node_aggregations_artifacts( - artifacts.block_number, - get_recursive_layer_circuit_id_for_base_layer(artifacts.circuit_id), - 0, - artifacts.aggregations, - object_store, - ) - .await; - WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::LeafAggregation.into()] - .observe(started_at.elapsed()); - - BlobUrls { - circuit_ids_and_urls: artifacts.circuit_ids_and_urls, - aggregations_urls, - } -} diff --git a/prover/crates/bin/witness_generator/src/lib.rs b/prover/crates/bin/witness_generator/src/lib.rs index 00d2ebf2bb3d..c0ac9718c6ee 100644 --- a/prover/crates/bin/witness_generator/src/lib.rs +++ b/prover/crates/bin/witness_generator/src/lib.rs @@ -1,6 +1,7 @@ #![allow(incomplete_features)] // We have to use generic const exprs. #![feature(generic_const_exprs)] +pub mod artifacts; pub mod basic_circuits; pub mod leaf_aggregation; pub mod metrics; diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs new file mode 100644 index 000000000000..245027f0d677 --- /dev/null +++ b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs @@ -0,0 +1,146 @@ +use std::time::Instant; + +use async_trait::async_trait; +use zksync_object_store::ObjectStore; +use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_prover_fri_types::keys::AggregationsKey; +use zksync_types::{basic_fri_types::AggregationRound, prover_dal::NodeAggregationJobMetadata}; + +use crate::{ + artifacts::{AggregationBlobUrls, ArtifactsManager, BlobUrls}, + metrics::WITNESS_GENERATOR_METRICS, + node_aggregation::{NodeAggregationArtifacts, NodeAggregationWitnessGenerator}, + utils::{save_node_aggregations_artifacts, AggregationWrapper}, +}; + +#[async_trait] +impl ArtifactsManager for NodeAggregationWitnessGenerator { + type InputMetadata = NodeAggregationJobMetadata; + type InputArtifacts = AggregationWrapper; + type OutputArtifacts = NodeAggregationArtifacts; + + #[tracing::instrument( + skip_all, + fields(l1_batch = % metadata.block_number, circuit_id = % metadata.circuit_id) + )] + async fn get_artifacts( + metadata: &Self::InputMetadata, + object_store: &dyn ObjectStore, + ) -> anyhow::Result { + let key = AggregationsKey { + block_number: metadata.block_number, + circuit_id: metadata.circuit_id, + depth: metadata.depth, + }; + let artifacts = object_store.get(key).await.unwrap_or_else(|error| { + panic!( + "node aggregation job artifacts getting error. Key: {:?}, error: {:?}", + key, error + ) + }); + + Ok(artifacts) + } + + #[tracing::instrument( + skip_all, + fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) + )] + async fn save_artifacts( + _job_id: u32, + artifacts: Self::OutputArtifacts, + object_store: &dyn ObjectStore, + ) -> BlobUrls { + let started_at = Instant::now(); + let aggregations_urls = save_node_aggregations_artifacts( + artifacts.block_number, + artifacts.circuit_id, + artifacts.depth, + artifacts.next_aggregations, + object_store, + ) + .await; + + WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::NodeAggregation.into()] + .observe(started_at.elapsed()); + + BlobUrls::Aggregation(AggregationBlobUrls { + aggregations_urls, + circuit_ids_and_urls: artifacts.recursive_circuit_ids_and_urls, + }) + } + + #[tracing::instrument( + skip_all, + fields(l1_batch = % job_id) + )] + async fn update_database( + connection_pool: &ConnectionPool, + job_id: u32, + started_at: Instant, + blob_urls: BlobUrls, + artifacts: Self::OutputArtifacts, + ) -> anyhow::Result<()> { + let mut prover_connection = connection_pool.connection().await.unwrap(); + let blob_urls = match blob_urls { + BlobUrls::Aggregation(blobs) => blobs, + _ => unreachable!(), + }; + let mut transaction = prover_connection.start_transaction().await.unwrap(); + let dependent_jobs = blob_urls.circuit_ids_and_urls.len(); + let protocol_version_id = transaction + .fri_witness_generator_dal() + .protocol_version_for_l1_batch(artifacts.block_number) + .await; + match artifacts.next_aggregations.len() > 1 { + true => { + transaction + .fri_prover_jobs_dal() + .insert_prover_jobs( + artifacts.block_number, + blob_urls.circuit_ids_and_urls, + AggregationRound::NodeAggregation, + artifacts.depth, + protocol_version_id, + ) + .await; + transaction + .fri_witness_generator_dal() + .insert_node_aggregation_jobs( + artifacts.block_number, + artifacts.circuit_id, + Some(dependent_jobs as i32), + artifacts.depth, + &blob_urls.aggregations_urls, + protocol_version_id, + ) + .await; + } + false => { + let (_, blob_url) = blob_urls.circuit_ids_and_urls[0].clone(); + transaction + .fri_prover_jobs_dal() + .insert_prover_job( + artifacts.block_number, + artifacts.circuit_id, + artifacts.depth, + 0, + AggregationRound::NodeAggregation, + &blob_url, + true, + protocol_version_id, + ) + .await + } + } + + transaction + .fri_witness_generator_dal() + .mark_node_aggregation_as_successful(job_id, started_at.elapsed()) + .await; + + transaction.commit().await?; + + Ok(()) + } +} diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs new file mode 100644 index 000000000000..a015462cd6fe --- /dev/null +++ b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs @@ -0,0 +1,115 @@ +use std::time::Instant; + +use anyhow::Context as _; +use async_trait::async_trait; +use zksync_prover_dal::ProverDal; +use zksync_prover_fri_types::get_current_pod_name; +use zksync_queued_job_processor::JobProcessor; +use zksync_types::basic_fri_types::AggregationRound; + +use crate::{ + artifacts::ArtifactsManager, + metrics::WITNESS_GENERATOR_METRICS, + node_aggregation::{ + prepare_job, NodeAggregationArtifacts, NodeAggregationWitnessGenerator, + NodeAggregationWitnessGeneratorJob, + }, +}; + +#[async_trait] +impl JobProcessor for NodeAggregationWitnessGenerator { + type Job = NodeAggregationWitnessGeneratorJob; + type JobId = u32; + type JobArtifacts = NodeAggregationArtifacts; + + const SERVICE_NAME: &'static str = "fri_node_aggregation_witness_generator"; + + async fn get_next_job(&self) -> anyhow::Result> { + let mut prover_connection = self.prover_connection_pool.connection().await?; + let pod_name = get_current_pod_name(); + let Some(metadata) = prover_connection + .fri_witness_generator_dal() + .get_next_node_aggregation_job(self.protocol_version, &pod_name) + .await + else { + return Ok(None); + }; + tracing::info!("Processing node aggregation job {:?}", metadata.id); + Ok(Some(( + metadata.id, + prepare_job(metadata, &*self.object_store, self.keystore.clone()) + .await + .context("prepare_job()")?, + ))) + } + + async fn save_failure(&self, job_id: u32, _started_at: Instant, error: String) -> () { + self.prover_connection_pool + .connection() + .await + .unwrap() + .fri_witness_generator_dal() + .mark_node_aggregation_job_failed(&error, job_id) + .await; + } + + #[allow(clippy::async_yields_async)] + async fn process_job( + &self, + _job_id: &Self::JobId, + job: NodeAggregationWitnessGeneratorJob, + started_at: Instant, + ) -> tokio::task::JoinHandle> { + let object_store = self.object_store.clone(); + let max_circuits_in_flight = self.config.max_circuits_in_flight; + tokio::spawn(async move { + Ok(Self::process_job_impl(job, started_at, object_store, max_circuits_in_flight).await) + }) + } + + #[tracing::instrument( + skip_all, + fields(l1_batch = % artifacts.block_number, circuit_id = % artifacts.circuit_id) + )] + async fn save_result( + &self, + job_id: u32, + started_at: Instant, + artifacts: NodeAggregationArtifacts, + ) -> anyhow::Result<()> { + let blob_save_started_at = Instant::now(); + + let blob_urls = Self::save_artifacts(job_id, artifacts.clone(), &*self.object_store).await; + + WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::NodeAggregation.into()] + .observe(blob_save_started_at.elapsed()); + + Self::update_database( + &self.prover_connection_pool, + job_id, + started_at, + blob_urls, + artifacts, + ) + .await?; + Ok(()) + } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .connection() + .await + .context("failed to acquire DB connection for NodeAggregationWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_node_aggregation_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for NodeAggregationWitnessGenerator") + } +} diff --git a/prover/crates/bin/witness_generator/src/node_aggregation.rs b/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs similarity index 52% rename from prover/crates/bin/witness_generator/src/node_aggregation.rs rename to prover/crates/bin/witness_generator/src/node_aggregation/mod.rs index 72bdebde572a..047caa363a89 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs @@ -1,7 +1,6 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context as _; -use async_trait::async_trait; use circuit_definitions::circuit_definitions::recursion_layer::RECURSION_ARITY; use tokio::sync::Semaphore; use zkevm_test_harness::witness::recursive_aggregation::{ @@ -9,7 +8,7 @@ use zkevm_test_harness::witness::recursive_aggregation::{ }; use zksync_config::configs::FriWitnessGeneratorConfig; use zksync_object_store::ObjectStore; -use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_prover_dal::{ConnectionPool, Prover}; use zksync_prover_fri_types::{ circuit_definitions::{ boojum::field::goldilocks::GoldilocksField, @@ -19,25 +18,24 @@ use zksync_prover_fri_types::{ encodings::recursion_request::RecursionQueueSimulator, zkevm_circuits::recursion::leaf_layer::input::RecursionLeafParametersWitness, }, - get_current_pod_name, - keys::AggregationsKey, FriProofWrapper, }; use zksync_prover_keystore::{keystore::Keystore, utils::get_leaf_vk_params}; -use zksync_queued_job_processor::JobProcessor; use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, prover_dal::NodeAggregationJobMetadata, L1BatchNumber, }; use crate::{ + artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, - utils::{ - load_proofs_for_job_ids, save_node_aggregations_artifacts, - save_recursive_layer_prover_input_artifacts, AggregationWrapper, - }, + utils::{load_proofs_for_job_ids, save_recursive_layer_prover_input_artifacts}, }; +mod artifacts; +mod job_processor; + +#[derive(Clone)] pub struct NodeAggregationArtifacts { circuit_id: u8, block_number: L1BatchNumber, @@ -46,12 +44,6 @@ pub struct NodeAggregationArtifacts { pub recursive_circuit_ids_and_urls: Vec<(u8, String)>, } -#[derive(Debug)] -struct BlobUrls { - node_aggregations_url: String, - circuit_ids_and_urls: Vec<(u8, String)>, -} - #[derive(Clone)] pub struct NodeAggregationWitnessGeneratorJob { circuit_id: u8, @@ -92,7 +84,7 @@ impl NodeAggregationWitnessGenerator { #[tracing::instrument( skip_all, - fields(l1_batch = %job.block_number, circuit_id = %job.circuit_id) + fields(l1_batch = % job.block_number, circuit_id = % job.circuit_id) )] pub async fn process_job_impl( job: NodeAggregationWitnessGeneratorJob, @@ -223,108 +215,9 @@ impl NodeAggregationWitnessGenerator { } } -#[async_trait] -impl JobProcessor for NodeAggregationWitnessGenerator { - type Job = NodeAggregationWitnessGeneratorJob; - type JobId = u32; - type JobArtifacts = NodeAggregationArtifacts; - - const SERVICE_NAME: &'static str = "fri_node_aggregation_witness_generator"; - - async fn get_next_job(&self) -> anyhow::Result> { - let mut prover_connection = self.prover_connection_pool.connection().await?; - let pod_name = get_current_pod_name(); - let Some(metadata) = prover_connection - .fri_witness_generator_dal() - .get_next_node_aggregation_job(self.protocol_version, &pod_name) - .await - else { - return Ok(None); - }; - tracing::info!("Processing node aggregation job {:?}", metadata.id); - Ok(Some(( - metadata.id, - prepare_job(metadata, &*self.object_store, self.keystore.clone()) - .await - .context("prepare_job()")?, - ))) - } - - async fn save_failure(&self, job_id: u32, _started_at: Instant, error: String) -> () { - self.prover_connection_pool - .connection() - .await - .unwrap() - .fri_witness_generator_dal() - .mark_node_aggregation_job_failed(&error, job_id) - .await; - } - - #[allow(clippy::async_yields_async)] - async fn process_job( - &self, - _job_id: &Self::JobId, - job: NodeAggregationWitnessGeneratorJob, - started_at: Instant, - ) -> tokio::task::JoinHandle> { - let object_store = self.object_store.clone(); - let max_circuits_in_flight = self.config.max_circuits_in_flight; - tokio::spawn(async move { - Ok(Self::process_job_impl(job, started_at, object_store, max_circuits_in_flight).await) - }) - } - - #[tracing::instrument( - skip_all, - fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) - )] - async fn save_result( - &self, - job_id: u32, - started_at: Instant, - artifacts: NodeAggregationArtifacts, - ) -> anyhow::Result<()> { - let block_number = artifacts.block_number; - let circuit_id = artifacts.circuit_id; - let depth = artifacts.depth; - let shall_continue_node_aggregations = artifacts.next_aggregations.len() > 1; - let blob_urls = save_artifacts(artifacts, &*self.object_store).await; - update_database( - &self.prover_connection_pool, - started_at, - job_id, - block_number, - depth, - circuit_id, - blob_urls, - shall_continue_node_aggregations, - ) - .await; - Ok(()) - } - - fn max_attempts(&self) -> u32 { - self.config.max_attempts - } - - async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { - let mut prover_storage = self - .prover_connection_pool - .connection() - .await - .context("failed to acquire DB connection for NodeAggregationWitnessGenerator")?; - prover_storage - .fri_witness_generator_dal() - .get_node_aggregation_job_attempts(*job_id) - .await - .map(|attempts| attempts.unwrap_or(0)) - .context("failed to get job attempts for NodeAggregationWitnessGenerator") - } -} - #[tracing::instrument( skip_all, - fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) + fields(l1_batch = % metadata.block_number, circuit_id = % metadata.circuit_id) )] pub async fn prepare_job( metadata: NodeAggregationJobMetadata, @@ -332,7 +225,7 @@ pub async fn prepare_job( keystore: Keystore, ) -> anyhow::Result { let started_at = Instant::now(); - let artifacts = get_artifacts(&metadata, object_store).await; + let artifacts = NodeAggregationWitnessGenerator::get_artifacts(&metadata, object_store).await?; WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::NodeAggregation.into()] .observe(started_at.elapsed()); @@ -361,123 +254,3 @@ pub async fn prepare_job( all_leafs_layer_params: get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?, }) } - -#[allow(clippy::too_many_arguments)] -#[tracing::instrument( - skip_all, - fields(l1_batch = %block_number, circuit_id = %circuit_id) -)] -async fn update_database( - prover_connection_pool: &ConnectionPool, - started_at: Instant, - id: u32, - block_number: L1BatchNumber, - depth: u16, - circuit_id: u8, - blob_urls: BlobUrls, - shall_continue_node_aggregations: bool, -) { - let mut prover_connection = prover_connection_pool.connection().await.unwrap(); - let mut transaction = prover_connection.start_transaction().await.unwrap(); - let dependent_jobs = blob_urls.circuit_ids_and_urls.len(); - let protocol_version_id = transaction - .fri_witness_generator_dal() - .protocol_version_for_l1_batch(block_number) - .await; - match shall_continue_node_aggregations { - true => { - transaction - .fri_prover_jobs_dal() - .insert_prover_jobs( - block_number, - blob_urls.circuit_ids_and_urls, - AggregationRound::NodeAggregation, - depth, - protocol_version_id, - ) - .await; - transaction - .fri_witness_generator_dal() - .insert_node_aggregation_jobs( - block_number, - circuit_id, - Some(dependent_jobs as i32), - depth, - &blob_urls.node_aggregations_url, - protocol_version_id, - ) - .await; - } - false => { - let (_, blob_url) = blob_urls.circuit_ids_and_urls[0].clone(); - transaction - .fri_prover_jobs_dal() - .insert_prover_job( - block_number, - circuit_id, - depth, - 0, - AggregationRound::NodeAggregation, - &blob_url, - true, - protocol_version_id, - ) - .await - } - } - - transaction - .fri_witness_generator_dal() - .mark_node_aggregation_as_successful(id, started_at.elapsed()) - .await; - - transaction.commit().await.unwrap(); -} - -#[tracing::instrument( - skip_all, - fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) -)] -async fn get_artifacts( - metadata: &NodeAggregationJobMetadata, - object_store: &dyn ObjectStore, -) -> AggregationWrapper { - let key = AggregationsKey { - block_number: metadata.block_number, - circuit_id: metadata.circuit_id, - depth: metadata.depth, - }; - object_store.get(key).await.unwrap_or_else(|error| { - panic!( - "node aggregation job artifacts getting error. Key: {:?}, error: {:?}", - key, error - ) - }) -} - -#[tracing::instrument( - skip_all, - fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) -)] -async fn save_artifacts( - artifacts: NodeAggregationArtifacts, - object_store: &dyn ObjectStore, -) -> BlobUrls { - let started_at = Instant::now(); - let aggregations_urls = save_node_aggregations_artifacts( - artifacts.block_number, - artifacts.circuit_id, - artifacts.depth, - artifacts.next_aggregations, - object_store, - ) - .await; - - WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::NodeAggregation.into()] - .observe(started_at.elapsed()); - - BlobUrls { - node_aggregations_url: aggregations_urls, - circuit_ids_and_urls: artifacts.recursive_circuit_ids_and_urls, - } -} diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs b/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs new file mode 100644 index 000000000000..8379fcf9f933 --- /dev/null +++ b/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs @@ -0,0 +1,141 @@ +use std::{collections::HashMap, time::Instant}; + +use async_trait::async_trait; +use circuit_definitions::{ + circuit_definitions::recursion_layer::{ZkSyncRecursionLayerStorageType, ZkSyncRecursionProof}, + zkevm_circuits::scheduler::aux::BaseLayerCircuitType, +}; +use zkevm_test_harness::empty_node_proof; +use zksync_object_store::ObjectStore; +use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_prover_fri_types::{keys::FriCircuitKey, CircuitWrapper, FriProofWrapper}; +use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; + +use crate::{ + artifacts::{ArtifactsManager, BlobUrls}, + recursion_tip::{RecursionTipArtifacts, RecursionTipWitnessGenerator}, +}; + +#[async_trait] +impl ArtifactsManager for RecursionTipWitnessGenerator { + type InputMetadata = Vec<(u8, u32)>; + type InputArtifacts = Vec; + type OutputArtifacts = RecursionTipArtifacts; + + /// Loads all proofs for a given recursion tip's job ids. + /// Note that recursion tip may not have proofs for some specific circuits (because the batch didn't contain them). + /// In this scenario, we still need to pass a proof, but it won't be taken into account during proving. + /// For this scenario, we use an empty_proof, but any proof would suffice. + async fn get_artifacts( + metadata: &Vec<(u8, u32)>, + object_store: &dyn ObjectStore, + ) -> anyhow::Result> { + let job_mapping: HashMap = metadata + .clone() + .into_iter() + .map(|(leaf_circuit_id, job_id)| { + ( + ZkSyncRecursionLayerStorageType::from_leaf_u8_to_basic_u8(leaf_circuit_id), + job_id, + ) + }) + .collect(); + + let empty_proof = empty_node_proof().into_inner(); + + let mut proofs = Vec::new(); + for circuit_id in BaseLayerCircuitType::as_iter_u8() { + if job_mapping.contains_key(&circuit_id) { + let fri_proof_wrapper = object_store + .get(*job_mapping.get(&circuit_id).unwrap()) + .await + .unwrap_or_else(|_| { + panic!( + "Failed to load proof with circuit_id {} for recursion tip", + circuit_id + ) + }); + match fri_proof_wrapper { + FriProofWrapper::Base(_) => { + return Err(anyhow::anyhow!( + "Expected only recursive proofs for recursion tip, got Base for circuit {}", + circuit_id + )); + } + FriProofWrapper::Recursive(recursive_proof) => { + proofs.push(recursive_proof.into_inner()); + } + } + } else { + proofs.push(empty_proof.clone()); + } + } + Ok(proofs) + } + + async fn save_artifacts( + job_id: u32, + artifacts: Self::OutputArtifacts, + object_store: &dyn ObjectStore, + ) -> BlobUrls { + let key = FriCircuitKey { + block_number: L1BatchNumber(job_id), + circuit_id: 255, + sequence_number: 0, + depth: 0, + aggregation_round: AggregationRound::RecursionTip, + }; + + let blob_url = object_store + .put( + key, + &CircuitWrapper::Recursive(artifacts.recursion_tip_circuit.clone()), + ) + .await + .unwrap(); + + BlobUrls::Url(blob_url) + } + + async fn update_database( + connection_pool: &ConnectionPool, + job_id: u32, + started_at: Instant, + blob_urls: BlobUrls, + _artifacts: Self::OutputArtifacts, + ) -> anyhow::Result<()> { + let blob_url = match blob_urls { + BlobUrls::Url(url) => url, + _ => panic!("Unexpected blob urls type"), + }; + + let mut prover_connection = connection_pool.connection().await?; + let mut transaction = prover_connection.start_transaction().await?; + let protocol_version_id = transaction + .fri_witness_generator_dal() + .protocol_version_for_l1_batch(L1BatchNumber(job_id)) + .await; + transaction + .fri_prover_jobs_dal() + .insert_prover_job( + L1BatchNumber(job_id), + ZkSyncRecursionLayerStorageType::RecursionTipCircuit as u8, + 0, + 0, + AggregationRound::RecursionTip, + &blob_url, + false, + protocol_version_id, + ) + .await; + + transaction + .fri_witness_generator_dal() + .mark_recursion_tip_job_as_successful(L1BatchNumber(job_id), started_at.elapsed()) + .await; + + transaction.commit().await?; + + Ok(()) + } +} diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs new file mode 100644 index 000000000000..f114724cfec4 --- /dev/null +++ b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs @@ -0,0 +1,130 @@ +use std::time::Instant; + +use anyhow::Context as _; +use async_trait::async_trait; +use zksync_prover_dal::ProverDal; +use zksync_prover_fri_types::get_current_pod_name; +use zksync_queued_job_processor::JobProcessor; +use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; + +use crate::{ + artifacts::ArtifactsManager, + metrics::WITNESS_GENERATOR_METRICS, + recursion_tip::{ + prepare_job, RecursionTipArtifacts, RecursionTipWitnessGenerator, + RecursionTipWitnessGeneratorJob, + }, +}; + +#[async_trait] +impl JobProcessor for RecursionTipWitnessGenerator { + type Job = RecursionTipWitnessGeneratorJob; + type JobId = L1BatchNumber; + type JobArtifacts = RecursionTipArtifacts; + + const SERVICE_NAME: &'static str = "recursion_tip_witness_generator"; + + async fn get_next_job(&self) -> anyhow::Result> { + let mut prover_connection = self.prover_connection_pool.connection().await?; + let pod_name = get_current_pod_name(); + let Some((l1_batch_number, number_of_final_node_jobs)) = prover_connection + .fri_witness_generator_dal() + .get_next_recursion_tip_witness_job(self.protocol_version, &pod_name) + .await + else { + return Ok(None); + }; + + let final_node_proof_job_ids = prover_connection + .fri_prover_jobs_dal() + .get_final_node_proof_job_ids_for(l1_batch_number) + .await; + + assert_eq!( + final_node_proof_job_ids.len(), + number_of_final_node_jobs as usize, + "recursion tip witness job was scheduled without all final node jobs being completed; expected {}, got {}", + number_of_final_node_jobs, final_node_proof_job_ids.len() + ); + + Ok(Some(( + l1_batch_number, + prepare_job( + l1_batch_number, + final_node_proof_job_ids, + &*self.object_store, + self.keystore.clone(), + ) + .await + .context("prepare_job()")?, + ))) + } + + async fn save_failure(&self, job_id: L1BatchNumber, _started_at: Instant, error: String) -> () { + self.prover_connection_pool + .connection() + .await + .unwrap() + .fri_witness_generator_dal() + .mark_recursion_tip_job_failed(&error, job_id) + .await; + } + + #[allow(clippy::async_yields_async)] + async fn process_job( + &self, + _job_id: &Self::JobId, + job: RecursionTipWitnessGeneratorJob, + started_at: Instant, + ) -> tokio::task::JoinHandle> { + tokio::task::spawn_blocking(move || Ok(Self::process_job_sync(job, started_at))) + } + + #[tracing::instrument( + skip_all, + fields(l1_batch = %job_id) + )] + async fn save_result( + &self, + job_id: L1BatchNumber, + started_at: Instant, + artifacts: RecursionTipArtifacts, + ) -> anyhow::Result<()> { + let blob_save_started_at = Instant::now(); + + let blob_urls = + Self::save_artifacts(job_id.0, artifacts.clone(), &*self.object_store).await; + + WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::RecursionTip.into()] + .observe(blob_save_started_at.elapsed()); + + Self::update_database( + &self.prover_connection_pool, + job_id.0, + started_at, + blob_urls, + artifacts, + ) + .await?; + + Ok(()) + } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .connection() + .await + .context("failed to acquire DB connection for RecursionTipWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_recursion_tip_witness_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for RecursionTipWitnessGenerator") + } +} diff --git a/prover/crates/bin/witness_generator/src/recursion_tip.rs b/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs similarity index 58% rename from prover/crates/bin/witness_generator/src/recursion_tip.rs rename to prover/crates/bin/witness_generator/src/recursion_tip/mod.rs index 5e97631babb9..4abb56a7d788 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs @@ -1,7 +1,6 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context; -use async_trait::async_trait; use circuit_definitions::{ circuit_definitions::recursion_layer::{ recursion_tip::RecursionTipCircuit, ZkSyncRecursionLayerStorageType, @@ -37,23 +36,20 @@ use zkevm_test_harness::{ }; use zksync_config::configs::FriWitnessGeneratorConfig; use zksync_object_store::ObjectStore; -use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; -use zksync_prover_fri_types::{ - get_current_pod_name, - keys::{ClosedFormInputKey, FriCircuitKey}, - CircuitWrapper, -}; +use zksync_prover_dal::{ConnectionPool, Prover}; +use zksync_prover_fri_types::keys::ClosedFormInputKey; use zksync_prover_keystore::{keystore::Keystore, utils::get_leaf_vk_params}; -use zksync_queued_job_processor::JobProcessor; use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; use crate::{ - metrics::WITNESS_GENERATOR_METRICS, - utils::{load_proofs_for_recursion_tip, ClosedFormInputWrapper}, + artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, utils::ClosedFormInputWrapper, }; +mod artifacts; +mod job_processor; + #[derive(Clone)] pub struct RecursionTipWitnessGeneratorJob { block_number: L1BatchNumber, @@ -65,6 +61,7 @@ pub struct RecursionTipWitnessGeneratorJob { node_vk: ZkSyncRecursionLayerVerificationKey, } +#[derive(Clone)] pub struct RecursionTipArtifacts { pub recursion_tip_circuit: ZkSyncRecursiveLayerCircuit, } @@ -138,148 +135,6 @@ impl RecursionTipWitnessGenerator { } } -#[async_trait] -impl JobProcessor for RecursionTipWitnessGenerator { - type Job = RecursionTipWitnessGeneratorJob; - type JobId = L1BatchNumber; - type JobArtifacts = RecursionTipArtifacts; - - const SERVICE_NAME: &'static str = "recursion_tip_witness_generator"; - - async fn get_next_job(&self) -> anyhow::Result> { - let mut prover_connection = self.prover_connection_pool.connection().await?; - let pod_name = get_current_pod_name(); - let Some((l1_batch_number, number_of_final_node_jobs)) = prover_connection - .fri_witness_generator_dal() - .get_next_recursion_tip_witness_job(self.protocol_version, &pod_name) - .await - else { - return Ok(None); - }; - - let final_node_proof_job_ids = prover_connection - .fri_prover_jobs_dal() - .get_final_node_proof_job_ids_for(l1_batch_number) - .await; - - assert_eq!( - final_node_proof_job_ids.len(), - number_of_final_node_jobs as usize, - "recursion tip witness job was scheduled without all final node jobs being completed; expected {}, got {}", - number_of_final_node_jobs, final_node_proof_job_ids.len() - ); - - Ok(Some(( - l1_batch_number, - prepare_job( - l1_batch_number, - final_node_proof_job_ids, - &*self.object_store, - self.keystore.clone(), - ) - .await - .context("prepare_job()")?, - ))) - } - - async fn save_failure(&self, job_id: L1BatchNumber, _started_at: Instant, error: String) -> () { - self.prover_connection_pool - .connection() - .await - .unwrap() - .fri_witness_generator_dal() - .mark_recursion_tip_job_failed(&error, job_id) - .await; - } - - #[allow(clippy::async_yields_async)] - async fn process_job( - &self, - _job_id: &Self::JobId, - job: RecursionTipWitnessGeneratorJob, - started_at: Instant, - ) -> tokio::task::JoinHandle> { - tokio::task::spawn_blocking(move || Ok(Self::process_job_sync(job, started_at))) - } - - #[tracing::instrument( - skip_all, - fields(l1_batch = %job_id) - )] - async fn save_result( - &self, - job_id: L1BatchNumber, - started_at: Instant, - artifacts: RecursionTipArtifacts, - ) -> anyhow::Result<()> { - let key = FriCircuitKey { - block_number: job_id, - circuit_id: 255, - sequence_number: 0, - depth: 0, - aggregation_round: AggregationRound::RecursionTip, - }; - let blob_save_started_at = Instant::now(); - - let recursion_tip_circuit_blob_url = self - .object_store - .put( - key, - &CircuitWrapper::Recursive(artifacts.recursion_tip_circuit), - ) - .await?; - - WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::RecursionTip.into()] - .observe(blob_save_started_at.elapsed()); - - let mut prover_connection = self.prover_connection_pool.connection().await?; - let mut transaction = prover_connection.start_transaction().await?; - let protocol_version_id = transaction - .fri_witness_generator_dal() - .protocol_version_for_l1_batch(job_id) - .await; - transaction - .fri_prover_jobs_dal() - .insert_prover_job( - job_id, - ZkSyncRecursionLayerStorageType::RecursionTipCircuit as u8, - 0, - 0, - AggregationRound::RecursionTip, - &recursion_tip_circuit_blob_url, - false, - protocol_version_id, - ) - .await; - - transaction - .fri_witness_generator_dal() - .mark_recursion_tip_job_as_successful(job_id, started_at.elapsed()) - .await; - - transaction.commit().await?; - Ok(()) - } - - fn max_attempts(&self) -> u32 { - self.config.max_attempts - } - - async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { - let mut prover_storage = self - .prover_connection_pool - .connection() - .await - .context("failed to acquire DB connection for RecursionTipWitnessGenerator")?; - prover_storage - .fri_witness_generator_dal() - .get_recursion_tip_witness_job_attempts(*job_id) - .await - .map(|attempts| attempts.unwrap_or(0)) - .context("failed to get job attempts for RecursionTipWitnessGenerator") - } -} - #[tracing::instrument( skip_all, fields(l1_batch = %l1_batch_number) @@ -292,7 +147,8 @@ pub async fn prepare_job( ) -> anyhow::Result { let started_at = Instant::now(); let recursion_tip_proofs = - load_proofs_for_recursion_tip(final_node_proof_job_ids, object_store).await?; + RecursionTipWitnessGenerator::get_artifacts(&final_node_proof_job_ids, object_store) + .await?; WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::RecursionTip.into()] .observe(started_at.elapsed()); diff --git a/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs b/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs new file mode 100644 index 000000000000..b20a97641887 --- /dev/null +++ b/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs @@ -0,0 +1,94 @@ +use std::time::Instant; + +use async_trait::async_trait; +use circuit_definitions::circuit_definitions::recursion_layer::ZkSyncRecursionLayerStorageType; +use zksync_object_store::ObjectStore; +use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_prover_fri_types::{keys::FriCircuitKey, CircuitWrapper, FriProofWrapper}; +use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; + +use crate::{ + artifacts::{ArtifactsManager, BlobUrls}, + scheduler::{SchedulerArtifacts, SchedulerWitnessGenerator}, +}; + +#[async_trait] +impl ArtifactsManager for SchedulerWitnessGenerator { + type InputMetadata = u32; + type InputArtifacts = FriProofWrapper; + type OutputArtifacts = SchedulerArtifacts; + + async fn get_artifacts( + metadata: &Self::InputMetadata, + object_store: &dyn ObjectStore, + ) -> anyhow::Result { + let artifacts = object_store.get(*metadata).await?; + + Ok(artifacts) + } + + async fn save_artifacts( + job_id: u32, + artifacts: Self::OutputArtifacts, + object_store: &dyn ObjectStore, + ) -> BlobUrls { + let key = FriCircuitKey { + block_number: L1BatchNumber(job_id), + circuit_id: 1, + sequence_number: 0, + depth: 0, + aggregation_round: AggregationRound::Scheduler, + }; + + let blob_url = object_store + .put( + key, + &CircuitWrapper::Recursive(artifacts.scheduler_circuit.clone()), + ) + .await + .unwrap(); + + BlobUrls::Url(blob_url) + } + + async fn update_database( + connection_pool: &ConnectionPool, + job_id: u32, + started_at: Instant, + blob_urls: BlobUrls, + _artifacts: Self::OutputArtifacts, + ) -> anyhow::Result<()> { + let blob_url = match blob_urls { + BlobUrls::Url(url) => url, + _ => panic!("Unexpected blob urls type"), + }; + + let mut prover_connection = connection_pool.connection().await?; + let mut transaction = prover_connection.start_transaction().await?; + let protocol_version_id = transaction + .fri_witness_generator_dal() + .protocol_version_for_l1_batch(L1BatchNumber(job_id)) + .await; + transaction + .fri_prover_jobs_dal() + .insert_prover_job( + L1BatchNumber(job_id), + ZkSyncRecursionLayerStorageType::SchedulerCircuit as u8, + 0, + 0, + AggregationRound::Scheduler, + &blob_url, + false, + protocol_version_id, + ) + .await; + + transaction + .fri_witness_generator_dal() + .mark_scheduler_job_as_successful(L1BatchNumber(job_id), started_at.elapsed()) + .await; + + transaction.commit().await?; + Ok(()) + } +} diff --git a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs new file mode 100644 index 000000000000..fe4f2db4090a --- /dev/null +++ b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs @@ -0,0 +1,129 @@ +use std::time::Instant; + +use anyhow::Context as _; +use async_trait::async_trait; +use zksync_prover_dal::ProverDal; +use zksync_prover_fri_types::get_current_pod_name; +use zksync_queued_job_processor::JobProcessor; +use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; + +use crate::{ + artifacts::ArtifactsManager, + metrics::WITNESS_GENERATOR_METRICS, + scheduler::{ + prepare_job, SchedulerArtifacts, SchedulerWitnessGenerator, SchedulerWitnessGeneratorJob, + }, +}; + +#[async_trait] +impl JobProcessor for SchedulerWitnessGenerator { + type Job = SchedulerWitnessGeneratorJob; + type JobId = L1BatchNumber; + type JobArtifacts = SchedulerArtifacts; + + const SERVICE_NAME: &'static str = "fri_scheduler_witness_generator"; + + async fn get_next_job(&self) -> anyhow::Result> { + let mut prover_connection = self.prover_connection_pool.connection().await?; + let pod_name = get_current_pod_name(); + let Some(l1_batch_number) = prover_connection + .fri_witness_generator_dal() + .get_next_scheduler_witness_job(self.protocol_version, &pod_name) + .await + else { + return Ok(None); + }; + let recursion_tip_job_id = prover_connection + .fri_prover_jobs_dal() + .get_recursion_tip_proof_job_id(l1_batch_number) + .await + .context(format!( + "could not find recursion tip proof for l1 batch {}", + l1_batch_number + ))?; + + Ok(Some(( + l1_batch_number, + prepare_job( + l1_batch_number, + recursion_tip_job_id, + &*self.object_store, + self.keystore.clone(), + ) + .await + .context("prepare_job()")?, + ))) + } + + async fn save_failure(&self, job_id: L1BatchNumber, _started_at: Instant, error: String) -> () { + self.prover_connection_pool + .connection() + .await + .unwrap() + .fri_witness_generator_dal() + .mark_scheduler_job_failed(&error, job_id) + .await; + } + + #[allow(clippy::async_yields_async)] + async fn process_job( + &self, + _job_id: &Self::JobId, + job: SchedulerWitnessGeneratorJob, + started_at: Instant, + ) -> tokio::task::JoinHandle> { + tokio::task::spawn_blocking(move || { + let block_number = job.block_number; + let _span = tracing::info_span!("scheduler", %block_number).entered(); + Ok(Self::process_job_sync(job, started_at)) + }) + } + + #[tracing::instrument( + skip_all, + fields(l1_batch = %job_id) + )] + async fn save_result( + &self, + job_id: L1BatchNumber, + started_at: Instant, + artifacts: SchedulerArtifacts, + ) -> anyhow::Result<()> { + let blob_save_started_at = Instant::now(); + + let blob_urls = + Self::save_artifacts(job_id.0, artifacts.clone(), &*self.object_store).await; + + WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::Scheduler.into()] + .observe(blob_save_started_at.elapsed()); + + Self::update_database( + &self.prover_connection_pool, + job_id.0, + started_at, + blob_urls, + artifacts, + ) + .await?; + + Ok(()) + } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .connection() + .await + .context("failed to acquire DB connection for SchedulerWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_scheduler_witness_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for SchedulerWitnessGenerator") + } +} diff --git a/prover/crates/bin/witness_generator/src/scheduler.rs b/prover/crates/bin/witness_generator/src/scheduler/mod.rs similarity index 54% rename from prover/crates/bin/witness_generator/src/scheduler.rs rename to prover/crates/bin/witness_generator/src/scheduler/mod.rs index c6e43582bbdb..10230b35c4f6 100644 --- a/prover/crates/bin/witness_generator/src/scheduler.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/mod.rs @@ -1,13 +1,12 @@ use std::{convert::TryInto, sync::Arc, time::Instant}; use anyhow::Context as _; -use async_trait::async_trait; use zkevm_test_harness::zkevm_circuits::recursion::{ leaf_layer::input::RecursionLeafParametersWitness, NUM_BASE_LAYER_CIRCUITS, }; use zksync_config::configs::FriWitnessGeneratorConfig; use zksync_object_store::ObjectStore; -use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_prover_dal::{ConnectionPool, Prover}; use zksync_prover_fri_types::{ circuit_definitions::{ boojum::{ @@ -21,18 +20,22 @@ use zksync_prover_fri_types::{ recursion_layer_proof_config, zkevm_circuits::scheduler::{input::SchedulerCircuitInstanceWitness, SchedulerConfig}, }, - get_current_pod_name, - keys::FriCircuitKey, - CircuitWrapper, FriProofWrapper, + FriProofWrapper, }; use zksync_prover_keystore::{keystore::Keystore, utils::get_leaf_vk_params}; -use zksync_queued_job_processor::JobProcessor; use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; -use crate::{metrics::WITNESS_GENERATOR_METRICS, utils::SchedulerPartialInputWrapper}; +use crate::{ + artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, + utils::SchedulerPartialInputWrapper, +}; + +mod artifacts; +mod job_processor; +#[derive(Clone)] pub struct SchedulerArtifacts { pub scheduler_circuit: ZkSyncRecursiveLayerCircuit, } @@ -121,143 +124,6 @@ impl SchedulerWitnessGenerator { } } -#[async_trait] -impl JobProcessor for SchedulerWitnessGenerator { - type Job = SchedulerWitnessGeneratorJob; - type JobId = L1BatchNumber; - type JobArtifacts = SchedulerArtifacts; - - const SERVICE_NAME: &'static str = "fri_scheduler_witness_generator"; - - async fn get_next_job(&self) -> anyhow::Result> { - let mut prover_connection = self.prover_connection_pool.connection().await?; - let pod_name = get_current_pod_name(); - let Some(l1_batch_number) = prover_connection - .fri_witness_generator_dal() - .get_next_scheduler_witness_job(self.protocol_version, &pod_name) - .await - else { - return Ok(None); - }; - let recursion_tip_job_id = prover_connection - .fri_prover_jobs_dal() - .get_recursion_tip_proof_job_id(l1_batch_number) - .await - .context(format!( - "could not find recursion tip proof for l1 batch {}", - l1_batch_number - ))?; - - Ok(Some(( - l1_batch_number, - prepare_job( - l1_batch_number, - recursion_tip_job_id, - &*self.object_store, - self.keystore.clone(), - ) - .await - .context("prepare_job()")?, - ))) - } - - async fn save_failure(&self, job_id: L1BatchNumber, _started_at: Instant, error: String) -> () { - self.prover_connection_pool - .connection() - .await - .unwrap() - .fri_witness_generator_dal() - .mark_scheduler_job_failed(&error, job_id) - .await; - } - - #[allow(clippy::async_yields_async)] - async fn process_job( - &self, - _job_id: &Self::JobId, - job: SchedulerWitnessGeneratorJob, - started_at: Instant, - ) -> tokio::task::JoinHandle> { - tokio::task::spawn_blocking(move || { - let block_number = job.block_number; - let _span = tracing::info_span!("scheduler", %block_number).entered(); - Ok(Self::process_job_sync(job, started_at)) - }) - } - - #[tracing::instrument( - skip_all, - fields(l1_batch = %job_id) - )] - async fn save_result( - &self, - job_id: L1BatchNumber, - started_at: Instant, - artifacts: SchedulerArtifacts, - ) -> anyhow::Result<()> { - let key = FriCircuitKey { - block_number: job_id, - circuit_id: 1, - sequence_number: 0, - depth: 0, - aggregation_round: AggregationRound::Scheduler, - }; - let blob_save_started_at = Instant::now(); - let scheduler_circuit_blob_url = self - .object_store - .put(key, &CircuitWrapper::Recursive(artifacts.scheduler_circuit)) - .await?; - WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::Scheduler.into()] - .observe(blob_save_started_at.elapsed()); - - let mut prover_connection = self.prover_connection_pool.connection().await?; - let mut transaction = prover_connection.start_transaction().await?; - let protocol_version_id = transaction - .fri_witness_generator_dal() - .protocol_version_for_l1_batch(job_id) - .await; - transaction - .fri_prover_jobs_dal() - .insert_prover_job( - job_id, - ZkSyncRecursionLayerStorageType::SchedulerCircuit as u8, - 0, - 0, - AggregationRound::Scheduler, - &scheduler_circuit_blob_url, - false, - protocol_version_id, - ) - .await; - - transaction - .fri_witness_generator_dal() - .mark_scheduler_job_as_successful(job_id, started_at.elapsed()) - .await; - - transaction.commit().await?; - Ok(()) - } - - fn max_attempts(&self) -> u32 { - self.config.max_attempts - } - - async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { - let mut prover_storage = self - .prover_connection_pool - .connection() - .await - .context("failed to acquire DB connection for SchedulerWitnessGenerator")?; - prover_storage - .fri_witness_generator_dal() - .get_scheduler_witness_job_attempts(*job_id) - .await - .map(|attempts| attempts.unwrap_or(0)) - .context("failed to get job attempts for SchedulerWitnessGenerator") - } -} - #[tracing::instrument( skip_all, fields(l1_batch = %l1_batch_number) @@ -269,7 +135,8 @@ pub async fn prepare_job( keystore: Keystore, ) -> anyhow::Result { let started_at = Instant::now(); - let wrapper = object_store.get(recursion_tip_job_id).await?; + let wrapper = + SchedulerWitnessGenerator::get_artifacts(&recursion_tip_job_id, object_store).await?; let recursion_tip_proof = match wrapper { FriProofWrapper::Base(_) => Err(anyhow::anyhow!( "Expected only recursive proofs for scheduler l1 batch {l1_batch_number}, got Base" diff --git a/prover/crates/bin/witness_generator/src/utils.rs b/prover/crates/bin/witness_generator/src/utils.rs index f8656ac90f44..3ea2b539773f 100644 --- a/prover/crates/bin/witness_generator/src/utils.rs +++ b/prover/crates/bin/witness_generator/src/utils.rs @@ -1,21 +1,14 @@ use std::{ - collections::HashMap, io::{BufWriter, Write as _}, sync::Arc, }; use circuit_definitions::{ - circuit_definitions::{ - base_layer::ZkSyncBaseLayerCircuit, - recursion_layer::{ZkSyncRecursionLayerStorageType, ZkSyncRecursionProof}, - }, + circuit_definitions::base_layer::ZkSyncBaseLayerCircuit, encodings::memory_query::MemoryQueueStateWitnesses, }; use once_cell::sync::Lazy; -use zkevm_test_harness::{ - boojum::field::goldilocks::GoldilocksField, empty_node_proof, - zkevm_circuits::scheduler::aux::BaseLayerCircuitType, -}; +use zkevm_test_harness::boojum::field::goldilocks::GoldilocksField; use zksync_multivm::utils::get_used_bootloader_memory_bytes; use zksync_object_store::{serialize_using_bincode, Bucket, ObjectStore, StoredObject}; use zksync_prover_fri_types::{ @@ -248,54 +241,3 @@ pub async fn load_proofs_for_job_ids( .map(|x| x.unwrap()) .collect() } - -/// Loads all proofs for a given recursion tip's job ids. -/// Note that recursion tip may not have proofs for some specific circuits (because the batch didn't contain them). -/// In this scenario, we still need to pass a proof, but it won't be taken into account during proving. -/// For this scenario, we use an empty_proof, but any proof would suffice. -#[tracing::instrument(skip_all)] -pub async fn load_proofs_for_recursion_tip( - job_ids: Vec<(u8, u32)>, - object_store: &dyn ObjectStore, -) -> anyhow::Result> { - let job_mapping: HashMap = job_ids - .into_iter() - .map(|(leaf_circuit_id, job_id)| { - ( - ZkSyncRecursionLayerStorageType::from_leaf_u8_to_basic_u8(leaf_circuit_id), - job_id, - ) - }) - .collect(); - - let empty_proof = empty_node_proof().into_inner(); - - let mut proofs = Vec::new(); - for circuit_id in BaseLayerCircuitType::as_iter_u8() { - if job_mapping.contains_key(&circuit_id) { - let fri_proof_wrapper = object_store - .get(*job_mapping.get(&circuit_id).unwrap()) - .await - .unwrap_or_else(|_| { - panic!( - "Failed to load proof with circuit_id {} for recursion tip", - circuit_id - ) - }); - match fri_proof_wrapper { - FriProofWrapper::Base(_) => { - return Err(anyhow::anyhow!( - "Expected only recursive proofs for recursion tip, got Base for circuit {}", - circuit_id - )); - } - FriProofWrapper::Recursive(recursive_proof) => { - proofs.push(recursive_proof.into_inner()); - } - } - } else { - proofs.push(empty_proof.clone()); - } - } - Ok(proofs) -}