From f722df7c0ae429f43d047ff79e24bca39f81230c Mon Sep 17 00:00:00 2001 From: Ivan Litteri <67517699+ilitteri@users.noreply.github.com> Date: Tue, 14 May 2024 12:11:46 -0300 Subject: [PATCH] feat(Prover CLI): `requeue` cmd (#1719) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Adds a `requeue` command that requeues a given batch if it's stuck. It looks at all jobs that are stuck and requeues them. A job is stuck if it has a large number of attempts and is not successful. For now, the attempts are set via a cmd flag (`--max-attempts`) which if not set, is 10 by default. ### Usage Example ``` cd prover/prover_cli zk f cargo run --release requeue --batch 1 ``` ## Why ❔ We want to be able to requeue a stuck batch with the CLI. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [ ] Spellcheck has been run via `zk spellcheck`. - [x] Linkcheck has been run via `zk linkcheck`. --------- Co-authored-by: Ivan Litteri Co-authored-by: Joaquin Carletti Co-authored-by: Joaquin Carletti <56092489+ColoCarletti@users.noreply.github.com> --- core/lib/basic_types/src/prover_dal.rs | 1 + prover/prover_cli/src/cli.rs | 4 +- prover/prover_cli/src/commands/mod.rs | 1 + prover/prover_cli/src/commands/requeue.rs | 87 ++++++++ ...2279dcfd0261dc58f20fb3454a4d5146a561a.json | 41 ++++ ...dce6412e2725cf5162ce7a733f6dceaecb11.json} | 10 +- ...9718349ac4fc08b455c7f4265d7443f2ec13.json} | 10 +- ...998b9fa0434d590eecab8448e89be853e5352.json | 29 +++ ...bae42849574731d33539bfdcca21c9b64f4e.json} | 10 +- .../src/fri_proof_compressor_dal.rs | 41 ++++ prover/prover_dal/src/fri_prover_dal.rs | 46 +++- .../src/fri_witness_generator_dal.rs | 204 +++++++++++++++++- 12 files changed, 474 insertions(+), 10 deletions(-) create mode 100644 prover/prover_cli/src/commands/requeue.rs create mode 100644 prover/prover_dal/.sqlx/query-36375be0667ab6241a3f6432e802279dcfd0261dc58f20fb3454a4d5146a561a.json rename prover/prover_dal/.sqlx/{query-b17c71983da060f08616e001b42f8dcbcb014b4f808c6232abd9a83354c995ac.json => query-8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11.json} (79%) rename prover/prover_dal/.sqlx/{query-2d31fcce581975a82d6156b52e35fb7a093b73727f75e0cb7db9cea480c95f5c.json => query-c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13.json} (86%) create mode 100644 prover/prover_dal/.sqlx/query-dccb1bb8250716e8b82714c77f7998b9fa0434d590eecab8448e89be853e5352.json rename prover/prover_dal/.sqlx/{query-d8e3ee346375e4b6a8b2c73a3827e88abd0f8164c2413dc83c91c29665ca645e.json => query-e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e.json} (79%) diff --git a/core/lib/basic_types/src/prover_dal.rs b/core/lib/basic_types/src/prover_dal.rs index 693c9138c3ee..c14d1cb91fd3 100644 --- a/core/lib/basic_types/src/prover_dal.rs +++ b/core/lib/basic_types/src/prover_dal.rs @@ -51,6 +51,7 @@ pub struct StuckJobs { pub id: u64, pub status: String, pub attempts: u64, + pub circuit_id: Option, } // TODO (PLA-774): Redundant structure, should be replaced with `std::net::SocketAddr`. diff --git a/prover/prover_cli/src/cli.rs b/prover/prover_cli/src/cli.rs index 7ded422012d5..6d05fe3c97fc 100644 --- a/prover/prover_cli/src/cli.rs +++ b/prover/prover_cli/src/cli.rs @@ -1,7 +1,7 @@ use clap::{command, Args, Parser, Subcommand}; use zksync_types::url::SensitiveUrl; -use crate::commands::{self, delete, get_file_info, restart}; +use crate::commands::{self, delete, get_file_info, requeue, restart}; pub const VERSION_STRING: &str = env!("CARGO_PKG_VERSION"); @@ -32,6 +32,7 @@ enum ProverCommand { Delete(delete::Args), #[command(subcommand)] Status(commands::StatusCommand), + Requeue(requeue::Args), Restart(restart::Args), } @@ -41,6 +42,7 @@ pub async fn start() -> anyhow::Result<()> { ProverCommand::FileInfo(args) => get_file_info::run(args).await?, ProverCommand::Delete(args) => delete::run(args).await?, ProverCommand::Status(cmd) => cmd.run(config).await?, + ProverCommand::Requeue(args) => requeue::run(args, config).await?, ProverCommand::Restart(args) => restart::run(args).await?, }; diff --git a/prover/prover_cli/src/commands/mod.rs b/prover/prover_cli/src/commands/mod.rs index 70822e06dd3b..cd76c6aff960 100644 --- a/prover/prover_cli/src/commands/mod.rs +++ b/prover/prover_cli/src/commands/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod delete; pub(crate) mod get_file_info; +pub(crate) mod requeue; pub(crate) mod restart; pub(crate) mod status; diff --git a/prover/prover_cli/src/commands/requeue.rs b/prover/prover_cli/src/commands/requeue.rs new file mode 100644 index 000000000000..d529aebcc162 --- /dev/null +++ b/prover/prover_cli/src/commands/requeue.rs @@ -0,0 +1,87 @@ +use anyhow::Context; +use clap::Args as ClapArgs; +use prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_types::{basic_fri_types::AggregationRound, prover_dal::StuckJobs, L1BatchNumber}; + +use crate::cli::ProverCLIConfig; + +#[derive(ClapArgs)] +pub struct Args { + #[clap(short, long)] + batch: L1BatchNumber, + /// Maximum number of attempts to re-queue a job. + /// Default value is 10. + /// NOTE: this argument is temporary and will be deprecated once the `config` command is implemented. + #[clap(long, default_value_t = 10)] + max_attempts: u32, +} + +pub async fn run(args: Args, config: ProverCLIConfig) -> anyhow::Result<()> { + let pool = ConnectionPool::::singleton(config.db_url) + .build() + .await + .context("failed to build a prover_connection_pool")?; + + let mut conn = pool + .connection() + .await + .context("failed to acquire a connection")?; + + let mut fri_witness_generator_dal = conn.fri_witness_generator_dal(); + + let stuck_witness_input_jobs = fri_witness_generator_dal + .requeue_stuck_witness_inputs_jobs_for_batch(args.batch, args.max_attempts) + .await; + display_requeued_stuck_jobs(stuck_witness_input_jobs, AggregationRound::BasicCircuits); + + let stuck_leaf_aggregations_stuck_jobs = fri_witness_generator_dal + .requeue_stuck_leaf_aggregation_jobs_for_batch(args.batch, args.max_attempts) + .await; + display_requeued_stuck_jobs( + stuck_leaf_aggregations_stuck_jobs, + AggregationRound::LeafAggregation, + ); + + let stuck_node_aggregations_jobs = fri_witness_generator_dal + .requeue_stuck_node_aggregation_jobs_for_batch(args.batch, args.max_attempts) + .await; + display_requeued_stuck_jobs( + stuck_node_aggregations_jobs, + AggregationRound::NodeAggregation, + ); + + let stuck_recursion_tip_job = fri_witness_generator_dal + .requeue_stuck_recursion_tip_jobs_for_batch(args.batch, args.max_attempts) + .await; + display_requeued_stuck_jobs(stuck_recursion_tip_job, AggregationRound::RecursionTip); + + let stuck_scheduler_jobs = fri_witness_generator_dal + .requeue_stuck_scheduler_jobs_for_batch(args.batch, args.max_attempts) + .await; + display_requeued_stuck_jobs(stuck_scheduler_jobs, AggregationRound::Scheduler); + + let stuck_proof_compressor_jobs = conn + .fri_proof_compressor_dal() + .requeue_stuck_jobs_for_batch(args.batch, args.max_attempts) + .await; + for stuck_job in stuck_proof_compressor_jobs { + println!("Re-queuing proof compressor job {stuck_job:?} 🔁",); + } + + let stuck_prover_jobs = conn + .fri_prover_jobs_dal() + .requeue_stuck_jobs_for_batch(args.batch, args.max_attempts) + .await; + + for stuck_job in stuck_prover_jobs { + println!("Re-queuing prover job {stuck_job:?} 🔁",); + } + + Ok(()) +} + +fn display_requeued_stuck_jobs(stuck_jobs: Vec, aggregation_round: AggregationRound) { + for stuck_job in stuck_jobs { + println!("Re-queuing {aggregation_round} stuck job {stuck_job:?} 🔁",); + } +} diff --git a/prover/prover_dal/.sqlx/query-36375be0667ab6241a3f6432e802279dcfd0261dc58f20fb3454a4d5146a561a.json b/prover/prover_dal/.sqlx/query-36375be0667ab6241a3f6432e802279dcfd0261dc58f20fb3454a4d5146a561a.json new file mode 100644 index 000000000000..b8bfb19ac2d5 --- /dev/null +++ b/prover/prover_dal/.sqlx/query-36375be0667ab6241a3f6432e802279dcfd0261dc58f20fb3454a4d5146a561a.json @@ -0,0 +1,41 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'queued',\n error = 'Manually requeued',\n attempts = 2,\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n l1_batch_number = $1\n AND attempts >= $2\n AND (status = 'in_progress' OR status = 'failed')\n RETURNING\n id,\n status,\n attempts,\n circuit_id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "status", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "attempts", + "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "circuit_id", + "type_info": "Int2" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int2" + ] + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "36375be0667ab6241a3f6432e802279dcfd0261dc58f20fb3454a4d5146a561a" +} diff --git a/prover/prover_dal/.sqlx/query-b17c71983da060f08616e001b42f8dcbcb014b4f808c6232abd9a83354c995ac.json b/prover/prover_dal/.sqlx/query-8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11.json similarity index 79% rename from prover/prover_dal/.sqlx/query-b17c71983da060f08616e001b42f8dcbcb014b4f808c6232abd9a83354c995ac.json rename to prover/prover_dal/.sqlx/query-8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11.json index 82209e00b65a..6493053b122c 100644 --- a/prover/prover_dal/.sqlx/query-b17c71983da060f08616e001b42f8dcbcb014b4f808c6232abd9a83354c995ac.json +++ b/prover/prover_dal/.sqlx/query-8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n id,\n status,\n attempts\n ", + "query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id\n ", "describe": { "columns": [ { @@ -17,6 +17,11 @@ "ordinal": 2, "name": "attempts", "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "circuit_id", + "type_info": "Int2" } ], "parameters": { @@ -26,10 +31,11 @@ ] }, "nullable": [ + false, false, false, false ] }, - "hash": "b17c71983da060f08616e001b42f8dcbcb014b4f808c6232abd9a83354c995ac" + "hash": "8719c090a9ad2488d556e495238cdce6412e2725cf5162ce7a733f6dceaecb11" } diff --git a/prover/prover_dal/.sqlx/query-2d31fcce581975a82d6156b52e35fb7a093b73727f75e0cb7db9cea480c95f5c.json b/prover/prover_dal/.sqlx/query-c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13.json similarity index 86% rename from prover/prover_dal/.sqlx/query-2d31fcce581975a82d6156b52e35fb7a093b73727f75e0cb7db9cea480c95f5c.json rename to prover/prover_dal/.sqlx/query-c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13.json index c4bcd6ea4915..60f8a0df709a 100644 --- a/prover/prover_dal/.sqlx/query-2d31fcce581975a82d6156b52e35fb7a093b73727f75e0cb7db9cea480c95f5c.json +++ b/prover/prover_dal/.sqlx/query-c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n id IN (\n SELECT\n id\n FROM\n prover_jobs_fri\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'in_gpu_proof'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n id,\n status,\n attempts\n ", + "query": "\n UPDATE prover_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n id IN (\n SELECT\n id\n FROM\n prover_jobs_fri\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'in_gpu_proof'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id\n ", "describe": { "columns": [ { @@ -17,6 +17,11 @@ "ordinal": 2, "name": "attempts", "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "circuit_id", + "type_info": "Int2" } ], "parameters": { @@ -26,10 +31,11 @@ ] }, "nullable": [ + false, false, false, false ] }, - "hash": "2d31fcce581975a82d6156b52e35fb7a093b73727f75e0cb7db9cea480c95f5c" + "hash": "c156004a0e5ad5bcc33d3b894fd69718349ac4fc08b455c7f4265d7443f2ec13" } diff --git a/prover/prover_dal/.sqlx/query-dccb1bb8250716e8b82714c77f7998b9fa0434d590eecab8448e89be853e5352.json b/prover/prover_dal/.sqlx/query-dccb1bb8250716e8b82714c77f7998b9fa0434d590eecab8448e89be853e5352.json new file mode 100644 index 000000000000..b3927e9d1198 --- /dev/null +++ b/prover/prover_dal/.sqlx/query-dccb1bb8250716e8b82714c77f7998b9fa0434d590eecab8448e89be853e5352.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE proof_compression_jobs_fri\n SET\n status = 'queued',\n error = 'Manually requeued',\n attempts = 2,\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n l1_batch_number = $1\n AND attempts >= $2\n AND (status = 'in_progress' OR status = 'failed')\n RETURNING\n status,\n attempts\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "status", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "attempts", + "type_info": "Int2" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int2" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "dccb1bb8250716e8b82714c77f7998b9fa0434d590eecab8448e89be853e5352" +} diff --git a/prover/prover_dal/.sqlx/query-d8e3ee346375e4b6a8b2c73a3827e88abd0f8164c2413dc83c91c29665ca645e.json b/prover/prover_dal/.sqlx/query-e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e.json similarity index 79% rename from prover/prover_dal/.sqlx/query-d8e3ee346375e4b6a8b2c73a3827e88abd0f8164c2413dc83c91c29665ca645e.json rename to prover/prover_dal/.sqlx/query-e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e.json index ae7bcea1882f..3a8362d2866d 100644 --- a/prover/prover_dal/.sqlx/query-d8e3ee346375e4b6a8b2c73a3827e88abd0f8164c2413dc83c91c29665ca645e.json +++ b/prover/prover_dal/.sqlx/query-e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n id,\n status,\n attempts\n ", + "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET\n status = 'queued',\n updated_at = NOW(),\n processing_started_at = NOW()\n WHERE\n (\n status = 'in_progress'\n AND processing_started_at <= NOW() - $1::INTERVAL\n AND attempts < $2\n )\n OR (\n status = 'failed'\n AND attempts < $2\n )\n RETURNING\n id,\n status,\n attempts,\n circuit_id\n ", "describe": { "columns": [ { @@ -17,6 +17,11 @@ "ordinal": 2, "name": "attempts", "type_info": "Int2" + }, + { + "ordinal": 3, + "name": "circuit_id", + "type_info": "Int2" } ], "parameters": { @@ -26,10 +31,11 @@ ] }, "nullable": [ + false, false, false, false ] }, - "hash": "d8e3ee346375e4b6a8b2c73a3827e88abd0f8164c2413dc83c91c29665ca645e" + "hash": "e32c0d85cb2841efb0b7cea6b049bae42849574731d33539bfdcca21c9b64f4e" } diff --git a/prover/prover_dal/src/fri_proof_compressor_dal.rs b/prover/prover_dal/src/fri_proof_compressor_dal.rs index 0581d95c58b6..e00fe8962ee4 100644 --- a/prover/prover_dal/src/fri_proof_compressor_dal.rs +++ b/prover/prover_dal/src/fri_proof_compressor_dal.rs @@ -315,6 +315,7 @@ impl FriProofCompressorDal<'_, '_> { id: row.l1_batch_number as u64, status: row.status, attempts: row.attempts as u64, + circuit_id: None, }) .collect() } @@ -374,4 +375,44 @@ impl FriProofCompressorDal<'_, '_> { .execute(self.storage.conn()) .await } + + pub async fn requeue_stuck_jobs_for_batch( + &mut self, + block_number: L1BatchNumber, + max_attempts: u32, + ) -> Vec { + { + sqlx::query!( + r#" + UPDATE proof_compression_jobs_fri + SET + status = 'queued', + error = 'Manually requeued', + attempts = 2, + updated_at = NOW(), + processing_started_at = NOW() + WHERE + l1_batch_number = $1 + AND attempts >= $2 + AND (status = 'in_progress' OR status = 'failed') + RETURNING + status, + attempts + "#, + i64::from(block_number.0), + max_attempts as i32, + ) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| StuckJobs { + id: block_number.0 as u64, + status: row.status, + attempts: row.attempts as u64, + circuit_id: None, + }) + .collect() + } + } } diff --git a/prover/prover_dal/src/fri_prover_dal.rs b/prover/prover_dal/src/fri_prover_dal.rs index 5c69e6ce6d79..7b7a2e4b3f1c 100644 --- a/prover/prover_dal/src/fri_prover_dal.rs +++ b/prover/prover_dal/src/fri_prover_dal.rs @@ -324,7 +324,8 @@ impl FriProverDal<'_, '_> { RETURNING id, status, - attempts + attempts, + circuit_id "#, &processing_timeout, max_attempts as i32, @@ -337,6 +338,7 @@ impl FriProverDal<'_, '_> { id: row.id as u64, status: row.status, attempts: row.attempts as u64, + circuit_id: Some(row.circuit_id as u32), }) .collect() } @@ -772,4 +774,46 @@ impl FriProverDal<'_, '_> { self.delete_prover_jobs_fri().await?; self.delete_prover_jobs_fri_archive().await } + + pub async fn requeue_stuck_jobs_for_batch( + &mut self, + block_number: L1BatchNumber, + max_attempts: u32, + ) -> Vec { + { + sqlx::query!( + r#" + UPDATE prover_jobs_fri + SET + status = 'queued', + error = 'Manually requeued', + attempts = 2, + updated_at = NOW(), + processing_started_at = NOW() + WHERE + l1_batch_number = $1 + AND attempts >= $2 + AND (status = 'in_progress' OR status = 'failed') + RETURNING + id, + status, + attempts, + circuit_id + "#, + i64::from(block_number.0), + max_attempts as i32, + ) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| StuckJobs { + id: row.id as u64, + status: row.status, + attempts: row.attempts as u64, + circuit_id: Some(row.circuit_id as u32), + }) + .collect() + } + } } diff --git a/prover/prover_dal/src/fri_witness_generator_dal.rs b/prover/prover_dal/src/fri_witness_generator_dal.rs index 7274ec79595e..c46b3c6fa569 100644 --- a/prover/prover_dal/src/fri_witness_generator_dal.rs +++ b/prover/prover_dal/src/fri_witness_generator_dal.rs @@ -299,6 +299,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id: row.l1_batch_number as u64, status: row.status, attempts: row.attempts as u64, + circuit_id: None, }) .collect() } @@ -954,7 +955,8 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING id, status, - attempts + attempts, + circuit_id "#, &processing_timeout, max_attempts as i32, @@ -967,6 +969,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id: row.id as u64, status: row.status, attempts: row.attempts as u64, + circuit_id: Some(row.circuit_id as u32), }) .collect() } @@ -997,7 +1000,8 @@ impl FriWitnessGeneratorDal<'_, '_> { RETURNING id, status, - attempts + attempts, + circuit_id "#, &processing_timeout, max_attempts as i32, @@ -1010,6 +1014,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id: row.id as u64, status: row.status, attempts: row.attempts as u64, + circuit_id: Some(row.circuit_id as u32), }) .collect() } @@ -1053,6 +1058,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id: row.l1_batch_number as u64, status: row.status, attempts: row.attempts as u64, + circuit_id: None, }) .collect() } @@ -1156,6 +1162,7 @@ impl FriWitnessGeneratorDal<'_, '_> { id: row.l1_batch_number as u64, status: row.status, attempts: row.attempts as u64, + circuit_id: None, }) .collect() } @@ -1623,4 +1630,197 @@ impl FriWitnessGeneratorDal<'_, '_> { self.delete_witness_generator_data(AggregationRound::Scheduler) .await } + + pub async fn requeue_stuck_witness_inputs_jobs_for_batch( + &mut self, + block_number: L1BatchNumber, + max_attempts: u32, + ) -> Vec { + let query = format!( + r#" + UPDATE witness_inputs_fri + SET + status = 'queued', + updated_at = NOW(), + processing_started_at = NOW() + WHERE + l1_batch_number = {} + AND attempts >= {} + AND (status = 'in_progress' OR status = 'failed') + RETURNING + l1_batch_number, + status, + attempts + "#, + i64::from(block_number.0), + max_attempts + ); + sqlx::query(&query) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| StuckJobs { + id: row.get::("l1_batch_number") as u64, + status: row.get("status"), + attempts: row.get::("attempts") as u64, + circuit_id: None, + }) + .collect() + } + + pub async fn requeue_stuck_leaf_aggregation_jobs_for_batch( + &mut self, + block_number: L1BatchNumber, + max_attempts: u32, + ) -> Vec { + self.requeue_stuck_jobs_for_batch_in_aggregation_round( + AggregationRound::LeafAggregation, + block_number, + max_attempts, + ) + .await + } + + pub async fn requeue_stuck_node_aggregation_jobs_for_batch( + &mut self, + block_number: L1BatchNumber, + max_attempts: u32, + ) -> Vec { + self.requeue_stuck_jobs_for_batch_in_aggregation_round( + AggregationRound::NodeAggregation, + block_number, + max_attempts, + ) + .await + } + + pub async fn requeue_stuck_recursion_tip_jobs_for_batch( + &mut self, + block_number: L1BatchNumber, + max_attempts: u32, + ) -> Vec { + let query = format!( + r#" + UPDATE recursion_tip_witness_jobs_fri + SET + status = 'queued', + updated_at = NOW(), + processing_started_at = NOW() + WHERE + l1_batch_number = {} + AND attempts >= {} + AND (status = 'in_progress' OR status = 'failed') + RETURNING + l1_batch_number, + status, + attempts + "#, + i64::from(block_number.0), + max_attempts + ); + sqlx::query(&query) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| StuckJobs { + id: row.get::("l1_batch_number") as u64, + status: row.get("status"), + attempts: row.get::("attempts") as u64, + circuit_id: None, + }) + .collect() + } + + pub async fn requeue_stuck_scheduler_jobs_for_batch( + &mut self, + block_number: L1BatchNumber, + max_attempts: u32, + ) -> Vec { + let query = format!( + r#" + UPDATE scheduler_witness_jobs_fri + SET + status = 'queued', + updated_at = NOW(), + processing_started_at = NOW() + WHERE + l1_batch_number = {} + AND attempts >= {} + AND (status = 'in_progress' OR status = 'failed') + RETURNING + l1_batch_number, + status, + attempts + "#, + i64::from(block_number.0), + max_attempts + ); + sqlx::query(&query) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| StuckJobs { + id: row.get::("l1_batch_number") as u64, + status: row.get("status"), + attempts: row.get::("attempts") as u64, + circuit_id: None, + }) + .collect() + } + + async fn requeue_stuck_jobs_for_batch_in_aggregation_round( + &mut self, + aggregation_round: AggregationRound, + block_number: L1BatchNumber, + max_attempts: u32, + ) -> Vec { + let table_name = Self::input_table_name_for(aggregation_round); + let job_id_table_name = Self::job_id_table_name_for(aggregation_round); + let query = format!( + r#" + UPDATE {} + SET + status = 'queued', + updated_at = NOW(), + processing_started_at = NOW() + WHERE + l1_batch_number = {} + AND attempts >= {} + AND (status = 'in_progress' OR status = 'failed') + RETURNING + {}, + status, + attempts, + circuit_id + "#, + table_name, + i64::from(block_number.0), + max_attempts, + job_id_table_name + ); + sqlx::query(&query) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| StuckJobs { + id: row.get::(job_id_table_name) as u64, + status: row.get("status"), + attempts: row.get::("attempts") as u64, + circuit_id: Some(row.get::("circuit_id") as u32), + }) + .collect() + } + + fn job_id_table_name_for(aggregation_round: AggregationRound) -> &'static str { + match aggregation_round { + AggregationRound::BasicCircuits + | AggregationRound::RecursionTip + | AggregationRound::Scheduler => "l1_batch_number", + AggregationRound::LeafAggregation | AggregationRound::NodeAggregation => "id", + } + } }