From ab72aa2bf086cc65edd172c89c8bd3061feb2ebc Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 2 Jul 2024 19:23:27 +1000 Subject: [PATCH 1/4] make vm runner report time taken --- ...e8c023bd418e7f3472e67cd379732ac2db36cfb0.json | 15 +++++++++++++++ ...56e1687e91d8367347b3830830a4c76407d60bc5.json | 14 -------------- core/lib/dal/src/vm_runner_dal.rs | 13 ++++++++++--- core/node/metadata_calculator/src/tests.rs | 14 ++++++++++---- .../node/vm_runner/src/impls/protective_reads.rs | 5 +++-- core/node/vm_runner/src/io.rs | 3 ++- core/node/vm_runner/src/output_handler.rs | 16 ++++++++-------- core/node/vm_runner/src/tests/mod.rs | 8 +++++++- 8 files changed, 55 insertions(+), 33 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0.json delete mode 100644 core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json diff --git a/core/lib/dal/.sqlx/query-22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0.json b/core/lib/dal/.sqlx/query-22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0.json new file mode 100644 index 000000000000..55aad98f1a7f --- /dev/null +++ b/core/lib/dal/.sqlx/query-22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n vm_runner_protective_reads (l1_batch_number, created_at, updated_at, time_taken)\n VALUES\n ($1, NOW(), NOW(), $2)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Time" + ] + }, + "nullable": [] + }, + "hash": "22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0" +} diff --git a/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json b/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json deleted file mode 100644 index e49cc211cdcd..000000000000 --- a/core/lib/dal/.sqlx/query-f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n vm_runner_protective_reads (l1_batch_number, created_at, updated_at)\n VALUES\n ($1, NOW(), NOW())\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "f2f1b6c4f4686b423a4c449c56e1687e91d8367347b3830830a4c76407d60bc5" -} diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs index 4c07901c32bc..d92552cd9cfe 100644 --- a/core/lib/dal/src/vm_runner_dal.rs +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -1,4 +1,9 @@ -use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; +use std::time::Instant; + +use zksync_db_connection::{ + connection::Connection, error::DalResult, instrument::InstrumentExt, + utils::duration_to_naive_time, +}; use zksync_types::L1BatchNumber; use crate::Core; @@ -68,15 +73,17 @@ impl VmRunnerDal<'_, '_> { pub async fn mark_protective_reads_batch_as_completed( &mut self, l1_batch_number: L1BatchNumber, + started_at: Instant, ) -> DalResult<()> { sqlx::query!( r#" INSERT INTO - vm_runner_protective_reads (l1_batch_number, created_at, updated_at) + vm_runner_protective_reads (l1_batch_number, created_at, updated_at, time_taken) VALUES - ($1, NOW(), NOW()) + ($1, NOW(), NOW(), $2) "#, i64::from(l1_batch_number.0), + duration_to_naive_time(started_at.elapsed()), ) .instrument("mark_protective_reads_batch_as_completed") .report_latency() diff --git a/core/node/metadata_calculator/src/tests.rs b/core/node/metadata_calculator/src/tests.rs index c5a00ecd7563..52095313890b 100644 --- a/core/node/metadata_calculator/src/tests.rs +++ b/core/node/metadata_calculator/src/tests.rs @@ -1,6 +1,12 @@ //! Tests for the metadata calculator component life cycle. -use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration}; +use std::{ + future::Future, + ops, panic, + path::Path, + sync::Arc, + time::{Duration, Instant}, +}; use assert_matches::assert_matches; use itertools::Itertools; @@ -545,7 +551,7 @@ async fn test_postgres_backup_recovery( .unwrap(); storage .vm_runner_dal() - .mark_protective_reads_batch_as_completed(batch_without_metadata.number) + .mark_protective_reads_batch_as_completed(batch_without_metadata.number, Instant::now()) .await .unwrap(); insert_initial_writes_for_batch(&mut storage, batch_without_metadata.number).await; @@ -575,7 +581,7 @@ async fn test_postgres_backup_recovery( .await .unwrap(); txn.vm_runner_dal() - .mark_protective_reads_batch_as_completed(batch_header.number) + .mark_protective_reads_batch_as_completed(batch_header.number, Instant::now()) .await .unwrap(); insert_initial_writes_for_batch(&mut txn, batch_header.number).await; @@ -812,7 +818,7 @@ pub(super) async fn extend_db_state_from_l1_batch( .unwrap(); storage .vm_runner_dal() - .mark_protective_reads_batch_as_completed(batch_number) + .mark_protective_reads_batch_as_completed(batch_number, Instant::now()) .await .unwrap(); insert_initial_writes_for_batch(storage, batch_number).await; diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index b09e48e2cb0b..700417746f3a 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use anyhow::Context; use async_trait::async_trait; @@ -111,10 +111,11 @@ impl VmRunnerIo for ProtectiveReadsIo { &self, conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, + started_at: Instant, ) -> anyhow::Result<()> { Ok(conn .vm_runner_dal() - .mark_protective_reads_batch_as_completed(l1_batch_number) + .mark_protective_reads_batch_as_completed(l1_batch_number, started_at) .await?) } } diff --git a/core/node/vm_runner/src/io.rs b/core/node/vm_runner/src/io.rs index e67da0e8235c..e45627cb0ee6 100644 --- a/core/node/vm_runner/src/io.rs +++ b/core/node/vm_runner/src/io.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::{fmt::Debug, time::Instant}; use async_trait::async_trait; use zksync_dal::{Connection, Core}; @@ -41,5 +41,6 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static { &self, conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, + started_at: Instant, ) -> anyhow::Result<()>; } diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 4052c245a44f..40c65b5aea46 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -2,7 +2,7 @@ use std::{ fmt::{Debug, Formatter}, mem, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use anyhow::Context; @@ -18,7 +18,7 @@ use zksync_types::L1BatchNumber; use crate::{metrics::METRICS, VmRunnerIo}; -type BatchReceiver = oneshot::Receiver>>; +type BatchReceiver = oneshot::Receiver>>; /// Functionality to produce a [`StateKeeperOutputHandler`] implementation for a specific L1 batch. /// @@ -131,7 +131,7 @@ impl OutputHandlerFactory enum AsyncOutputHandler { Running { handler: Box, - sender: oneshot::Sender>>, + sender: oneshot::Sender>>, }, Finished, } @@ -173,10 +173,10 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { } => { sender .send(tokio::task::spawn(async move { - let latency = METRICS.output_handle_time.start(); + let started_at = Instant::now(); let result = handler.handle_l1_batch(updates_manager).await; - latency.observe(); - result + METRICS.output_handle_time.observe(started_at.elapsed()); + result.map(|_| started_at) })) .ok(); Ok(()) @@ -243,13 +243,13 @@ impl ConcurrentOutputHandlerFactoryTask { .context("handler was dropped before the batch was fully processed")?; // Wait until the handle is resolved, meaning that the `handle_l1_batch` // computation has finished, and we can consider this batch to be completed - handle + let started_at = handle .await .context("failed to await for batch to be processed")??; latest_processed_batch += 1; let mut conn = self.pool.connection_tagged(self.io.name()).await?; self.io - .mark_l1_batch_as_completed(&mut conn, latest_processed_batch) + .mark_l1_batch_as_completed(&mut conn, latest_processed_batch, started_at) .await?; METRICS .last_processed_batch diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index c592122b1e09..8111adf48c81 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -1,4 +1,9 @@ -use std::{collections::HashMap, ops, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + ops, + sync::Arc, + time::{Duration, Instant}, +}; use async_trait::async_trait; use rand::{prelude::SliceRandom, Rng}; @@ -59,6 +64,7 @@ impl VmRunnerIo for Arc> { &self, _conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, + _started_at: Instant, ) -> anyhow::Result<()> { self.write().await.current = l1_batch_number; Ok(()) From 4b441056629ef98c27c9fd4f4ac692db85941ba4 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Fri, 5 Jul 2024 17:39:07 +1000 Subject: [PATCH 2/4] add `processing_started_at` to VM runner --- ...b7dbe0bb23c4fc87a78be2d01b77da2ecbd3.json} | 4 +- ...023bd418e7f3472e67cd379732ac2db36cfb0.json | 15 ------ ...8712c1dce398032d1f2d2b405840f2b5746c.json} | 4 +- ...83b7955a058705093d7372726c3fc7ce506ad.json | 14 ++++++ ...9411ba30ac67080552279d821d66b1b804db3.json | 14 ++++++ ...e_reads_add_processing_started_at.down.sql | 1 + ...ive_reads_add_processing_started_at.up.sql | 1 + core/lib/dal/src/vm_runner_dal.rs | 50 +++++++++++++++---- core/node/metadata_calculator/src/tests.rs | 28 +++++++---- .../vm_runner/src/impls/protective_reads.rs | 17 +++++-- core/node/vm_runner/src/io.rs | 15 +++++- core/node/vm_runner/src/output_handler.rs | 16 +++--- core/node/vm_runner/src/process.rs | 3 ++ core/node/vm_runner/src/tests/mod.rs | 16 +++--- 14 files changed, 137 insertions(+), 61 deletions(-) rename core/lib/dal/.sqlx/{query-0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f.json => query-00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3.json} (64%) delete mode 100644 core/lib/dal/.sqlx/query-22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0.json rename core/lib/dal/.sqlx/{query-decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b.json => query-310527fb49f13d7daa3d7f37f4bb8712c1dce398032d1f2d2b405840f2b5746c.json} (77%) create mode 100644 core/lib/dal/.sqlx/query-3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad.json create mode 100644 core/lib/dal/.sqlx/query-d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3.json create mode 100644 core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.down.sql create mode 100644 core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.up.sql diff --git a/core/lib/dal/.sqlx/query-0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f.json b/core/lib/dal/.sqlx/query-00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3.json similarity index 64% rename from core/lib/dal/.sqlx/query-0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f.json rename to core/lib/dal/.sqlx/query-00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3.json index eaef732751ec..d83713192cb4 100644 --- a/core/lib/dal/.sqlx/query-0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f.json +++ b/core/lib/dal/.sqlx/query-00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), $1) + $2 AS \"last_ready_batch\"\n FROM\n vm_runner_protective_reads\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", + "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), $1) + $2 AS \"last_ready_batch\"\n FROM\n vm_runner_protective_reads\n WHERE\n time_taken IS NOT NULL\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", "describe": { "columns": [ { @@ -19,5 +19,5 @@ true ] }, - "hash": "0a2138a1cbf21546931867319ccbfe1e597151ecfaeb3cfa6624f2a1978ef23f" + "hash": "00c0389f4cde049078885cdf05bdb7dbe0bb23c4fc87a78be2d01b77da2ecbd3" } diff --git a/core/lib/dal/.sqlx/query-22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0.json b/core/lib/dal/.sqlx/query-22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0.json deleted file mode 100644 index 55aad98f1a7f..000000000000 --- a/core/lib/dal/.sqlx/query-22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n vm_runner_protective_reads (l1_batch_number, created_at, updated_at, time_taken)\n VALUES\n ($1, NOW(), NOW(), $2)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Time" - ] - }, - "nullable": [] - }, - "hash": "22f8f62d6a1e620bea7d1c0de8c023bd418e7f3472e67cd379732ac2db36cfb0" -} diff --git a/core/lib/dal/.sqlx/query-decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b.json b/core/lib/dal/.sqlx/query-310527fb49f13d7daa3d7f37f4bb8712c1dce398032d1f2d2b405840f2b5746c.json similarity index 77% rename from core/lib/dal/.sqlx/query-decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b.json rename to core/lib/dal/.sqlx/query-310527fb49f13d7daa3d7f37f4bb8712c1dce398032d1f2d2b405840f2b5746c.json index b2a1ae0eb956..9d944e62a7a8 100644 --- a/core/lib/dal/.sqlx/query-decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b.json +++ b/core/lib/dal/.sqlx/query-310527fb49f13d7daa3d7f37f4bb8712c1dce398032d1f2d2b405840f2b5746c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n COALESCE(MAX(l1_batch_number), $1) AS \"last_processed_l1_batch!\"\n FROM\n vm_runner_protective_reads\n ", + "query": "\n SELECT\n COALESCE(MAX(l1_batch_number), $1) AS \"last_processed_l1_batch!\"\n FROM\n vm_runner_protective_reads\n WHERE\n time_taken IS NOT NULL\n ", "describe": { "columns": [ { @@ -18,5 +18,5 @@ null ] }, - "hash": "decbf1c9c344253f692d0eae57323edbf31a923e7a45a431267e1bd9fc67b47b" + "hash": "310527fb49f13d7daa3d7f37f4bb8712c1dce398032d1f2d2b405840f2b5746c" } diff --git a/core/lib/dal/.sqlx/query-3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad.json b/core/lib/dal/.sqlx/query-3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad.json new file mode 100644 index 000000000000..7b95614bfdff --- /dev/null +++ b/core/lib/dal/.sqlx/query-3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE vm_runner_protective_reads\n SET\n time_taken = NOW() - processing_started_at\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "3f0966f082e9e7cdfa18c107a1283b7955a058705093d7372726c3fc7ce506ad" +} diff --git a/core/lib/dal/.sqlx/query-d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3.json b/core/lib/dal/.sqlx/query-d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3.json new file mode 100644 index 000000000000..2b5eeec2e638 --- /dev/null +++ b/core/lib/dal/.sqlx/query-d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n vm_runner_protective_reads (l1_batch_number, created_at, updated_at, processing_started_at)\n VALUES\n ($1, NOW(), NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO\n UPDATE\n SET\n updated_at = NOW(),\n processing_started_at = NOW()\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "d3abe74360732659a1a35a176679411ba30ac67080552279d821d66b1b804db3" +} diff --git a/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.down.sql b/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.down.sql new file mode 100644 index 000000000000..3e13998726f7 --- /dev/null +++ b/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.down.sql @@ -0,0 +1 @@ +ALTER TABLE vm_runner_protective_reads DROP COLUMN IF EXISTS processing_started_at; diff --git a/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.up.sql b/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.up.sql new file mode 100644 index 000000000000..e44b16cae441 --- /dev/null +++ b/core/lib/dal/migrations/20240705164305_protective_reads_add_processing_started_at.up.sql @@ -0,0 +1 @@ +ALTER TABLE vm_runner_protective_reads ADD COLUMN IF NOT EXISTS processing_started_at TIME; diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs index d92552cd9cfe..32eac31954b1 100644 --- a/core/lib/dal/src/vm_runner_dal.rs +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -1,9 +1,4 @@ -use std::time::Instant; - -use zksync_db_connection::{ - connection::Connection, error::DalResult, instrument::InstrumentExt, - utils::duration_to_naive_time, -}; +use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; use zksync_types::L1BatchNumber; use crate::Core; @@ -24,6 +19,8 @@ impl VmRunnerDal<'_, '_> { COALESCE(MAX(l1_batch_number), $1) AS "last_processed_l1_batch!" FROM vm_runner_protective_reads + WHERE + time_taken IS NOT NULL "#, default_batch.0 as i32 ) @@ -53,6 +50,8 @@ impl VmRunnerDal<'_, '_> { COALESCE(MAX(l1_batch_number), $1) + $2 AS "last_ready_batch" FROM vm_runner_protective_reads + WHERE + time_taken IS NOT NULL ) SELECT LEAST(last_batch, last_ready_batch) AS "last_ready_batch!" @@ -70,25 +69,54 @@ impl VmRunnerDal<'_, '_> { Ok(L1BatchNumber(row.last_ready_batch as u32)) } - pub async fn mark_protective_reads_batch_as_completed( + pub async fn mark_protective_reads_batch_as_processing( &mut self, l1_batch_number: L1BatchNumber, - started_at: Instant, ) -> DalResult<()> { sqlx::query!( r#" INSERT INTO - vm_runner_protective_reads (l1_batch_number, created_at, updated_at, time_taken) + vm_runner_protective_reads (l1_batch_number, created_at, updated_at, processing_started_at) VALUES - ($1, NOW(), NOW(), $2) + ($1, NOW(), NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO + UPDATE + SET + updated_at = NOW(), + processing_started_at = NOW() + "#, + i64::from(l1_batch_number.0), + ) + .instrument("mark_protective_reads_batch_as_processing") + .report_latency() + .execute(self.storage) + .await?; + Ok(()) + } + + pub async fn mark_protective_reads_batch_as_completed( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + let update_result = sqlx::query!( + r#" + UPDATE vm_runner_protective_reads + SET + time_taken = NOW() - processing_started_at + WHERE + l1_batch_number = $1 "#, i64::from(l1_batch_number.0), - duration_to_naive_time(started_at.elapsed()), ) .instrument("mark_protective_reads_batch_as_completed") .report_latency() .execute(self.storage) .await?; + if update_result.rows_affected() == 0 { + anyhow::bail!( + "Trying to mark an L1 batch as completed while it is not being processed" + ); + } Ok(()) } diff --git a/core/node/metadata_calculator/src/tests.rs b/core/node/metadata_calculator/src/tests.rs index 52095313890b..fae60d47fcd3 100644 --- a/core/node/metadata_calculator/src/tests.rs +++ b/core/node/metadata_calculator/src/tests.rs @@ -1,12 +1,6 @@ //! Tests for the metadata calculator component life cycle. -use std::{ - future::Future, - ops, panic, - path::Path, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration}; use assert_matches::assert_matches; use itertools::Itertools; @@ -551,7 +545,12 @@ async fn test_postgres_backup_recovery( .unwrap(); storage .vm_runner_dal() - .mark_protective_reads_batch_as_completed(batch_without_metadata.number, Instant::now()) + .mark_protective_reads_batch_as_processing(batch_without_metadata.number) + .await + .unwrap(); + storage + .vm_runner_dal() + .mark_protective_reads_batch_as_completed(batch_without_metadata.number) .await .unwrap(); insert_initial_writes_for_batch(&mut storage, batch_without_metadata.number).await; @@ -581,7 +580,11 @@ async fn test_postgres_backup_recovery( .await .unwrap(); txn.vm_runner_dal() - .mark_protective_reads_batch_as_completed(batch_header.number, Instant::now()) + .mark_protective_reads_batch_as_processing(batch_header.number) + .await + .unwrap(); + txn.vm_runner_dal() + .mark_protective_reads_batch_as_completed(batch_header.number) .await .unwrap(); insert_initial_writes_for_batch(&mut txn, batch_header.number).await; @@ -818,7 +821,12 @@ pub(super) async fn extend_db_state_from_l1_batch( .unwrap(); storage .vm_runner_dal() - .mark_protective_reads_batch_as_completed(batch_number, Instant::now()) + .mark_protective_reads_batch_as_processing(batch_number) + .await + .unwrap(); + storage + .vm_runner_dal() + .mark_protective_reads_batch_as_completed(batch_number) .await .unwrap(); insert_initial_writes_for_batch(storage, batch_number).await; diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index 700417746f3a..a2c8da26d371 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Instant}; +use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; @@ -107,17 +107,26 @@ impl VmRunnerIo for ProtectiveReadsIo { .await?) } - async fn mark_l1_batch_as_completed( + async fn mark_l1_batch_as_processing( &self, conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, - started_at: Instant, ) -> anyhow::Result<()> { Ok(conn .vm_runner_dal() - .mark_protective_reads_batch_as_completed(l1_batch_number, started_at) + .mark_protective_reads_batch_as_processing(l1_batch_number) .await?) } + + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + conn.vm_runner_dal() + .mark_protective_reads_batch_as_completed(l1_batch_number) + .await + } } #[derive(Debug)] diff --git a/core/node/vm_runner/src/io.rs b/core/node/vm_runner/src/io.rs index e45627cb0ee6..9130cf97a846 100644 --- a/core/node/vm_runner/src/io.rs +++ b/core/node/vm_runner/src/io.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, time::Instant}; +use std::fmt::Debug; use async_trait::async_trait; use zksync_dal::{Connection, Core}; @@ -31,6 +31,18 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static { conn: &mut Connection<'_, Core>, ) -> anyhow::Result; + /// Marks the specified batch as the latest completed batch. All earlier batches are considered + /// to be completed too. No guarantees about later batches. + /// + /// # Errors + /// + /// Propagates DB errors. + async fn mark_l1_batch_as_processing( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()>; + /// Marks the specified batch as the latest completed batch. All earlier batches are considered /// to be completed too. No guarantees about later batches. /// @@ -41,6 +53,5 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static { &self, conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, - started_at: Instant, ) -> anyhow::Result<()>; } diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 40c65b5aea46..4052c245a44f 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -2,7 +2,7 @@ use std::{ fmt::{Debug, Formatter}, mem, sync::Arc, - time::{Duration, Instant}, + time::Duration, }; use anyhow::Context; @@ -18,7 +18,7 @@ use zksync_types::L1BatchNumber; use crate::{metrics::METRICS, VmRunnerIo}; -type BatchReceiver = oneshot::Receiver>>; +type BatchReceiver = oneshot::Receiver>>; /// Functionality to produce a [`StateKeeperOutputHandler`] implementation for a specific L1 batch. /// @@ -131,7 +131,7 @@ impl OutputHandlerFactory enum AsyncOutputHandler { Running { handler: Box, - sender: oneshot::Sender>>, + sender: oneshot::Sender>>, }, Finished, } @@ -173,10 +173,10 @@ impl StateKeeperOutputHandler for AsyncOutputHandler { } => { sender .send(tokio::task::spawn(async move { - let started_at = Instant::now(); + let latency = METRICS.output_handle_time.start(); let result = handler.handle_l1_batch(updates_manager).await; - METRICS.output_handle_time.observe(started_at.elapsed()); - result.map(|_| started_at) + latency.observe(); + result })) .ok(); Ok(()) @@ -243,13 +243,13 @@ impl ConcurrentOutputHandlerFactoryTask { .context("handler was dropped before the batch was fully processed")?; // Wait until the handle is resolved, meaning that the `handle_l1_batch` // computation has finished, and we can consider this batch to be completed - let started_at = handle + handle .await .context("failed to await for batch to be processed")??; latest_processed_batch += 1; let mut conn = self.pool.connection_tagged(self.io.name()).await?; self.io - .mark_l1_batch_as_completed(&mut conn, latest_processed_batch, started_at) + .mark_l1_batch_as_completed(&mut conn, latest_processed_batch) .await?; METRICS .last_processed_batch diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 8a9ebb4e3dc9..a5c882abf25e 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -194,6 +194,9 @@ impl VmRunner { .create_handler(next_batch) .await?; + self.io + .mark_l1_batch_as_processing(&mut self.pool.connection().await?, next_batch) + .await?; let handle = tokio::task::spawn(Self::process_batch( batch_executor, batch_data.l2_blocks, diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index 8111adf48c81..50acba610ba5 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -1,9 +1,4 @@ -use std::{ - collections::HashMap, - ops, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, ops, sync::Arc, time::Duration}; use async_trait::async_trait; use rand::{prelude::SliceRandom, Rng}; @@ -60,11 +55,18 @@ impl VmRunnerIo for Arc> { Ok(io.current + io.max) } + async fn mark_l1_batch_as_processing( + &self, + _conn: &mut Connection<'_, Core>, + _l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + Ok(()) + } + async fn mark_l1_batch_as_completed( &self, _conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, - _started_at: Instant, ) -> anyhow::Result<()> { self.write().await.current = l1_batch_number; Ok(()) From f6f7f80f7ddd461544bfd1f8feb41a617f58fb17 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Fri, 5 Jul 2024 18:35:32 +1000 Subject: [PATCH 3/4] change `mark_l1_batch_as_processing` comment --- core/node/vm_runner/src/io.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/node/vm_runner/src/io.rs b/core/node/vm_runner/src/io.rs index 9130cf97a846..2e118f6cfd13 100644 --- a/core/node/vm_runner/src/io.rs +++ b/core/node/vm_runner/src/io.rs @@ -31,8 +31,8 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static { conn: &mut Connection<'_, Core>, ) -> anyhow::Result; - /// Marks the specified batch as the latest completed batch. All earlier batches are considered - /// to be completed too. No guarantees about later batches. + /// Marks the specified batch as being in progress. Must be called before a batch can be marked + /// as completed. /// /// # Errors /// From 816bbf90bb7fa32c7aac39c708dec846dfc0f2fe Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Mon, 8 Jul 2024 15:57:05 +1000 Subject: [PATCH 4/4] adapt bwip to the new trait method --- ...6d922fa1fc9c202072fbc04cae1bbf97195aa.json | 14 ++++++ ...373c57d2dc6ec03d84f91a221ab8097e587cc.json | 14 ------ ...08f1ce816a56308fb9fe581b8683f76cbbbc3.json | 14 ++++++ ...312512dfab93fd8f32c94461b7a85e3a410e.json} | 4 +- ...eb148aa6d1dbd69bf3fe48522101a6ea0bcb.json} | 4 +- ...05_bwip_add_processing_started_at.down.sql | 1 + ...2005_bwip_add_processing_started_at.up.sql | 1 + core/lib/dal/src/vm_runner_dal.rs | 43 +++++++++++++++++-- core/node/vm_runner/src/impls/bwip.rs | 14 +++++- 9 files changed, 85 insertions(+), 24 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa.json delete mode 100644 core/lib/dal/.sqlx/query-a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc.json create mode 100644 core/lib/dal/.sqlx/query-aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3.json rename core/lib/dal/.sqlx/{query-a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd.json => query-c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e.json} (69%) rename core/lib/dal/.sqlx/{query-2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4.json => query-e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb.json} (64%) create mode 100644 core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.down.sql create mode 100644 core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.up.sql diff --git a/core/lib/dal/.sqlx/query-1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa.json b/core/lib/dal/.sqlx/query-1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa.json new file mode 100644 index 000000000000..f24a28ffdc28 --- /dev/null +++ b/core/lib/dal/.sqlx/query-1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n vm_runner_bwip (l1_batch_number, created_at, updated_at, processing_started_at)\n VALUES\n ($1, NOW(), NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO\n UPDATE\n SET\n updated_at = NOW(),\n processing_started_at = NOW()\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "1bbfac481c402bcb3bb888b84146d922fa1fc9c202072fbc04cae1bbf97195aa" +} diff --git a/core/lib/dal/.sqlx/query-a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc.json b/core/lib/dal/.sqlx/query-a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc.json deleted file mode 100644 index 617fd4e81ea1..000000000000 --- a/core/lib/dal/.sqlx/query-a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n vm_runner_bwip (l1_batch_number, created_at, updated_at)\n VALUES\n ($1, NOW(), NOW())\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [] - }, - "hash": "a3f24c7f2298398517db009f7e5373c57d2dc6ec03d84f91a221ab8097e587cc" -} diff --git a/core/lib/dal/.sqlx/query-aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3.json b/core/lib/dal/.sqlx/query-aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3.json new file mode 100644 index 000000000000..850dfc675743 --- /dev/null +++ b/core/lib/dal/.sqlx/query-aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE vm_runner_bwip\n SET\n time_taken = NOW() - processing_started_at\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "aab0254e6bf2c109d97e84053cb08f1ce816a56308fb9fe581b8683f76cbbbc3" +} diff --git a/core/lib/dal/.sqlx/query-a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd.json b/core/lib/dal/.sqlx/query-c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e.json similarity index 69% rename from core/lib/dal/.sqlx/query-a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd.json rename to core/lib/dal/.sqlx/query-c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e.json index cf1fad78a462..d32a9867e304 100644 --- a/core/lib/dal/.sqlx/query-a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd.json +++ b/core/lib/dal/.sqlx/query-c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n MAX(l1_batch_number) AS \"last_processed_l1_batch\"\n FROM\n vm_runner_bwip\n ", + "query": "\n SELECT\n MAX(l1_batch_number) AS \"last_processed_l1_batch\"\n FROM\n vm_runner_bwip\n WHERE\n time_taken IS NOT NULL\n ", "describe": { "columns": [ { @@ -16,5 +16,5 @@ null ] }, - "hash": "a85a15aa2e0be1c1f50d15a8354afcf939e8352e21689baf861b61a666bdc1fd" + "hash": "c731b37e17334619d42121e2740c312512dfab93fd8f32c94461b7a85e3a410e" } diff --git a/core/lib/dal/.sqlx/query-2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4.json b/core/lib/dal/.sqlx/query-e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb.json similarity index 64% rename from core/lib/dal/.sqlx/query-2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4.json rename to core/lib/dal/.sqlx/query-e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb.json index b5c9869d1467..576484cd4206 100644 --- a/core/lib/dal/.sqlx/query-2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4.json +++ b/core/lib/dal/.sqlx/query-e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), $1) + $2 AS \"last_ready_batch\"\n FROM\n vm_runner_bwip\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", + "query": "\n WITH\n available_batches AS (\n SELECT\n MAX(number) AS \"last_batch\"\n FROM\n l1_batches\n ),\n processed_batches AS (\n SELECT\n COALESCE(MAX(l1_batch_number), $1) + $2 AS \"last_ready_batch\"\n FROM\n vm_runner_bwip\n WHERE\n time_taken IS NOT NULL\n )\n SELECT\n LEAST(last_batch, last_ready_batch) AS \"last_ready_batch!\"\n FROM\n available_batches\n FULL JOIN processed_batches ON TRUE\n ", "describe": { "columns": [ { @@ -19,5 +19,5 @@ true ] }, - "hash": "2482716de397893c52840eb39391ad3349e4d932d3de64b6dade97481cd171a4" + "hash": "e7d0b7c132b80195dae7cbf50355eb148aa6d1dbd69bf3fe48522101a6ea0bcb" } diff --git a/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.down.sql b/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.down.sql new file mode 100644 index 000000000000..86bd163acbc4 --- /dev/null +++ b/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.down.sql @@ -0,0 +1 @@ +ALTER TABLE vm_runner_bwip DROP COLUMN IF EXISTS processing_started_at; diff --git a/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.up.sql b/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.up.sql new file mode 100644 index 000000000000..244e53b1b8c6 --- /dev/null +++ b/core/lib/dal/migrations/20240708152005_bwip_add_processing_started_at.up.sql @@ -0,0 +1 @@ +ALTER TABLE vm_runner_bwip ADD COLUMN IF NOT EXISTS processing_started_at TIME; diff --git a/core/lib/dal/src/vm_runner_dal.rs b/core/lib/dal/src/vm_runner_dal.rs index efafa5b030fe..64e378926573 100644 --- a/core/lib/dal/src/vm_runner_dal.rs +++ b/core/lib/dal/src/vm_runner_dal.rs @@ -153,6 +153,8 @@ impl VmRunnerDal<'_, '_> { MAX(l1_batch_number) AS "last_processed_l1_batch" FROM vm_runner_bwip + WHERE + time_taken IS NOT NULL "#, ) .instrument("get_bwip_latest_processed_batch") @@ -181,6 +183,8 @@ impl VmRunnerDal<'_, '_> { COALESCE(MAX(l1_batch_number), $1) + $2 AS "last_ready_batch" FROM vm_runner_bwip + WHERE + time_taken IS NOT NULL ) SELECT LEAST(last_batch, last_ready_batch) AS "last_ready_batch!" @@ -198,23 +202,54 @@ impl VmRunnerDal<'_, '_> { Ok(L1BatchNumber(row.last_ready_batch as u32)) } - pub async fn mark_bwip_batch_as_completed( + pub async fn mark_bwip_batch_as_processing( &mut self, l1_batch_number: L1BatchNumber, ) -> DalResult<()> { sqlx::query!( r#" INSERT INTO - vm_runner_bwip (l1_batch_number, created_at, updated_at) + vm_runner_bwip (l1_batch_number, created_at, updated_at, processing_started_at) VALUES - ($1, NOW(), NOW()) + ($1, NOW(), NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO + UPDATE + SET + updated_at = NOW(), + processing_started_at = NOW() "#, i64::from(l1_batch_number.0), ) - .instrument("mark_bwip_batch_as_completed") + .instrument("mark_protective_reads_batch_as_processing") .report_latency() .execute(self.storage) .await?; Ok(()) } + + pub async fn mark_bwip_batch_as_completed( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + let update_result = sqlx::query!( + r#" + UPDATE vm_runner_bwip + SET + time_taken = NOW() - processing_started_at + WHERE + l1_batch_number = $1 + "#, + i64::from(l1_batch_number.0), + ) + .instrument("mark_protective_reads_batch_as_completed") + .report_latency() + .execute(self.storage) + .await?; + if update_result.rows_affected() == 0 { + anyhow::bail!( + "Trying to mark an L1 batch as completed while it is not being processed" + ); + } + Ok(()) + } } diff --git a/core/node/vm_runner/src/impls/bwip.rs b/core/node/vm_runner/src/impls/bwip.rs index f3bdf55400e6..c861273c964d 100644 --- a/core/node/vm_runner/src/impls/bwip.rs +++ b/core/node/vm_runner/src/impls/bwip.rs @@ -119,16 +119,26 @@ impl VmRunnerIo for BasicWitnessInputProducerIo { .await?) } - async fn mark_l1_batch_as_completed( + async fn mark_l1_batch_as_processing( &self, conn: &mut Connection<'_, Core>, l1_batch_number: L1BatchNumber, ) -> anyhow::Result<()> { Ok(conn .vm_runner_dal() - .mark_bwip_batch_as_completed(l1_batch_number) + .mark_bwip_batch_as_processing(l1_batch_number) .await?) } + + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + conn.vm_runner_dal() + .mark_bwip_batch_as_completed(l1_batch_number) + .await + } } #[derive(Debug)]