Skip to content

Commit

Permalink
Merge pull request #1789 from matter-labs/prover_cli_compressor_status
Browse files Browse the repository at this point in the history
feat(Prover CLI): Batch proof compression job status
  • Loading branch information
ilitteri authored Apr 25, 2024
2 parents 3fdc055 + efd70ac commit 6ec0ac3
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 28 deletions.
33 changes: 32 additions & 1 deletion core/lib/basic_types/src/prover_dal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Types exposed by the prover DAL for general-purpose use.
use std::{net::IpAddr, ops::Add, str::FromStr};

use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Duration, NaiveDateTime, NaiveTime, Utc};
use strum::{Display, EnumString};

use crate::{basic_fri_types::AggregationRound, L1BatchNumber};

Expand Down Expand Up @@ -229,3 +230,33 @@ impl FromStr for GpuProverInstanceStatus {
}
}
}

#[derive(Debug, EnumString, Display)]
pub enum ProofCompressionJobStatus {
#[strum(serialize = "queued")]
Queued,
#[strum(serialize = "in_progress")]
InProgress,
#[strum(serialize = "successful")]
Successful,
#[strum(serialize = "failed")]
Failed,
#[strum(serialize = "sent_to_server")]
SentToServer,
#[strum(serialize = "skipped")]
Skipped,
}

pub struct ProofCompressionJobInfo {
pub l1_batch_number: L1BatchNumber,
pub attempts: u32,
pub status: ProofCompressionJobStatus,
pub fri_proof_blob_url: Option<String>,
pub l1_proof_blob_url: Option<String>,
pub error: Option<String>,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub processing_started_at: Option<NaiveDateTime>,
pub time_taken: Option<NaiveTime>,
pub picked_by: Option<String>,
}
32 changes: 26 additions & 6 deletions prover/prover_cli/src/commands/status/batch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use anyhow::{ensure, Context as _};
use clap::Args as ClapArgs;
use prover_dal::{ConnectionPool, Prover};
use prover_dal::{Connection, ConnectionPool, Prover, ProverDal};
use zksync_types::L1BatchNumber;

use super::utils::BatchData;
use super::utils::{BatchData, Task, TaskStatus};
use crate::commands::status::utils::postgres_config;

#[derive(ClapArgs)]
Expand All @@ -29,7 +29,7 @@ pub(crate) async fn run(args: Args) -> anyhow::Result<()> {
Ok(())
}

async fn get_batches_data(_batches: Vec<L1BatchNumber>) -> anyhow::Result<Vec<BatchData>> {
async fn get_batches_data(batches: Vec<L1BatchNumber>) -> anyhow::Result<Vec<BatchData>> {
let config = postgres_config()?;

let prover_connection_pool =
Expand All @@ -38,9 +38,29 @@ async fn get_batches_data(_batches: Vec<L1BatchNumber>) -> anyhow::Result<Vec<Ba
.await
.context("failed to build a prover_connection_pool")?;

let _conn = prover_connection_pool.connection().await.unwrap();
let mut conn = prover_connection_pool.connection().await.unwrap();

// Queries here...
let mut batches_data = Vec::new();
for batch in batches {
let current_batch_data = BatchData {
compressor: Task::Compressor(
get_proof_compression_job_status_for_batch(batch, &mut conn).await,
),
..Default::default()
};
batches_data.push(current_batch_data);
}

Ok(batches_data)
}

Ok(vec![BatchData::default()])
async fn get_proof_compression_job_status_for_batch<'a>(
batch_number: L1BatchNumber,
conn: &mut Connection<'a, Prover>,
) -> TaskStatus {
conn.fri_proof_compressor_dal()
.get_proof_compression_job_for_batch(batch_number)
.await
.map(|job| TaskStatus::from(job.status))
.unwrap_or(TaskStatus::Custom("Compressor job not found 🚫".to_owned()))
}
24 changes: 21 additions & 3 deletions prover/prover_cli/src/commands/status/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use strum::{Display, EnumString};
use zksync_basic_types::{basic_fri_types::AggregationRound, prover_dal::JobCountStatistics};
use zksync_config::PostgresConfig;
use zksync_env_config::FromEnv;
use zksync_types::L1BatchNumber;
use zksync_types::{prover_dal::ProofCompressionJobStatus, L1BatchNumber};

pub fn postgres_config() -> anyhow::Result<PostgresConfig> {
Ok(PostgresConfig::from_env()?)
Expand Down Expand Up @@ -74,7 +74,6 @@ impl Default for BatchData {
pub enum TaskStatus {
/// A custom status that can be set manually.
/// Mostly used when a task has singular status.
#[strum(to_string = "{0}")]
Custom(String),
/// A task is considered queued when all of its jobs is queued.
#[strum(to_string = "Queued 📥")]
Expand All @@ -99,6 +98,21 @@ impl Default for TaskStatus {
}
}

impl From<ProofCompressionJobStatus> for TaskStatus {
fn from(status: ProofCompressionJobStatus) -> Self {
match status {
ProofCompressionJobStatus::Queued => TaskStatus::Queued,
ProofCompressionJobStatus::InProgress => TaskStatus::InProgress,
ProofCompressionJobStatus::Successful => TaskStatus::Successful,
ProofCompressionJobStatus::Failed => TaskStatus::InProgress,
ProofCompressionJobStatus::SentToServer => {
TaskStatus::Custom("Sent to server 📤".to_owned())
}
ProofCompressionJobStatus::Skipped => TaskStatus::Custom("Skipped ⏩".to_owned()),
}
}
}

type ProverJobsData = HashMap<(L1BatchNumber, AggregationRound), JobCountStatistics>;

#[derive(EnumString, Clone, Display)]
Expand Down Expand Up @@ -148,6 +162,10 @@ impl Task {
impl Debug for Task {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "-- {} --", self.to_string().bold())?;
writeln!(f, "> {}", self.status().to_string())
if let TaskStatus::Custom(msg) = self.status() {
writeln!(f, "> {msg}")
} else {
writeln!(f, "> {}", self.status().to_string())
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 36 additions & 18 deletions prover/prover_dal/src/fri_proof_compressor_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
use std::{collections::HashMap, str::FromStr, time::Duration};

use sqlx::Row;
use strum::{Display, EnumString};
use zksync_basic_types::{
prover_dal::{JobCountStatistics, StuckJobs},
prover_dal::{
JobCountStatistics, ProofCompressionJobInfo, ProofCompressionJobStatus, StuckJobs,
},
L1BatchNumber,
};
use zksync_db_connection::connection::Connection;
Expand All @@ -16,22 +17,6 @@ pub struct FriProofCompressorDal<'a, 'c> {
pub(crate) storage: &'a mut Connection<'c, Prover>,
}

#[derive(Debug, EnumString, Display)]
pub enum ProofCompressionJobStatus {
#[strum(serialize = "queued")]
Queued,
#[strum(serialize = "in_progress")]
InProgress,
#[strum(serialize = "successful")]
Successful,
#[strum(serialize = "failed")]
Failed,
#[strum(serialize = "sent_to_server")]
SentToServer,
#[strum(serialize = "skipped")]
Skipped,
}

impl FriProofCompressorDal<'_, '_> {
pub async fn insert_proof_compression_job(
&mut self,
Expand Down Expand Up @@ -328,4 +313,37 @@ impl FriProofCompressorDal<'_, '_> {
.collect()
}
}

pub async fn get_proof_compression_job_for_batch(
&mut self,
block_number: L1BatchNumber,
) -> Option<ProofCompressionJobInfo> {
sqlx::query!(
r#"
SELECT
*
FROM
proof_compression_jobs_fri
WHERE
l1_batch_number = $1
"#,
i64::from(block_number.0)
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.map(|row| ProofCompressionJobInfo {
l1_batch_number: block_number,
attempts: row.attempts as u32,
status: ProofCompressionJobStatus::from_str(&row.status).unwrap(),
fri_proof_blob_url: row.fri_proof_blob_url,
l1_proof_blob_url: row.l1_proof_blob_url,
error: row.error,
created_at: row.created_at,
updated_at: row.updated_at,
processing_started_at: row.processing_started_at,
time_taken: row.time_taken,
picked_by: row.picked_by,
})
}
}

0 comments on commit 6ec0ac3

Please sign in to comment.