From d842cfd90894e6bd0bd833339a249c0861dc7423 Mon Sep 17 00:00:00 2001 From: Danil Date: Thu, 1 Feb 2024 19:04:04 +0100 Subject: [PATCH 01/16] feat(en): Take into account nonce from tx proxy Signed-off-by: Danil --- .../src/api_server/tx_sender/proxy.rs | 67 +++++++++++++++++-- .../src/api_server/web3/namespaces/eth.rs | 10 +++ 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index c9ddede1da05..fc1b01076254 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -1,10 +1,11 @@ use std::collections::HashMap; +use itertools::Itertools; use tokio::sync::RwLock; use zksync_types::{ api::{BlockId, Transaction, TransactionDetails, TransactionId}, l2::L2Tx, - H256, + Address, H256, }; use zksync_web3_decl::{ jsonrpsee::http_client::{HttpClient, HttpClientBuilder}, @@ -12,11 +13,58 @@ use zksync_web3_decl::{ RpcResult, }; +#[derive(Debug, Default)] +struct TxCache { + tx_cache: HashMap, + tx_cache_by_account: HashMap>, +} + +impl TxCache { + fn push(&mut self, tx_hash: H256, tx: L2Tx) { + let account_address = tx.common_data.initiator_address; + self.tx_cache.insert(tx_hash, tx); + self.tx_cache_by_account + .entry(account_address) + .or_default() + .push(tx_hash); + } + + fn get_tx(&self, tx_hash: &H256) -> Option { + self.tx_cache.get(tx_hash).cloned() + } + + fn get_txs_by_account(&self, account_address: Address) -> Vec { + let Some(tx_hashes) = self.tx_cache_by_account.get(&account_address) else { + return Vec::new(); + }; + + let mut txs = Vec::new(); + for tx_hash in tx_hashes { + if let Some(tx) = self.get_tx(tx_hash) { + txs.push(tx); + } + } + txs.into_iter().sorted_by_key(|tx| tx.nonce()).collect() + } + + fn remove_tx(&mut self, tx_hash: &H256) { + let tx = self.tx_cache.remove(tx_hash); + if let Some(tx) = tx { + let account_tx_hashes = self + .tx_cache_by_account + .get_mut(&tx.common_data.initiator_address); + if let Some(account_tx_hashes) = account_tx_hashes { + account_tx_hashes.retain(|&hash| hash != *tx_hash); + } + } + } +} + /// Used by external node to proxy transaction to the main node /// and store them while they're not synced back yet #[derive(Debug)] pub struct TxProxy { - tx_cache: RwLock>, + tx_cache: RwLock, client: HttpClient, } @@ -25,20 +73,27 @@ impl TxProxy { let client = HttpClientBuilder::default().build(main_node_url).unwrap(); Self { client, - tx_cache: RwLock::new(HashMap::new()), + tx_cache: RwLock::new(TxCache::default()), } } pub async fn find_tx(&self, tx_hash: H256) -> Option { - self.tx_cache.read().await.get(&tx_hash).cloned() + self.tx_cache.read().await.get_tx(&tx_hash) } pub async fn forget_tx(&self, tx_hash: H256) { - self.tx_cache.write().await.remove(&tx_hash); + self.tx_cache.write().await.remove_tx(&tx_hash) } pub async fn save_tx(&self, tx_hash: H256, tx: L2Tx) { - self.tx_cache.write().await.insert(tx_hash, tx); + self.tx_cache.write().await.push(tx_hash, tx) + } + + pub async fn get_txs_by_account(&self, account_address: Address) -> Vec { + self.tx_cache + .read() + .await + .get_txs_by_account(account_address) } pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult { diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 0098eacdbf03..94fd82b1ba3e 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -303,6 +303,7 @@ impl EthNamespace { const METHOD_NAME: &str = "get_block_transaction_count"; let method_latency = API_METRICS.start_block_call(METHOD_NAME, block_id); + self.state.start_info.ensure_not_pruned(block_id)?; let tx_count = self .state @@ -490,6 +491,15 @@ impl EthNamespace { .map_err(|err| internal_error(method_name, err))?; } + if let Some(proxy) = &self.state.tx_sender.0.proxy { + for tx in proxy.get_txs_by_account(address).await { + // If nonce is not sequential, then we should not increment nonce. + if tx.nonce().0 == account_nonce.as_u32() + 1 { + account_nonce += U256::one(); + } + } + }; + let block_diff = self.state.last_sealed_miniblock.diff(block_number); method_latency.observe(block_diff); Ok(account_nonce) From 09812ddb0a3bbca86c73178a6caba50cbf9a2d00 Mon Sep 17 00:00:00 2001 From: Danil Date: Fri, 2 Feb 2024 09:48:51 +0100 Subject: [PATCH 02/16] Remove tx only after syncing Signed-off-by: Danil --- core/bin/external_node/src/main.rs | 14 +++++++++++--- .../zksync_core/src/api_server/tx_sender/mod.rs | 10 ++++------ .../zksync_core/src/api_server/tx_sender/proxy.rs | 15 ++++++--------- core/lib/zksync_core/src/consensus/testonly.rs | 7 +++++++ .../lib/zksync_core/src/sync_layer/external_io.rs | 11 ++++++++++- core/lib/zksync_core/src/sync_layer/tests.rs | 11 ++++++++--- 6 files changed, 46 insertions(+), 22 deletions(-) diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 8765d656ca59..475f3cb09bc1 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -5,7 +5,11 @@ use clap::Parser; use futures::{future::FusedFuture, FutureExt as _}; use metrics::EN_METRICS; use prometheus_exporter::PrometheusExporterConfig; -use tokio::{sync::watch, task, time::sleep}; +use tokio::{ + sync::{watch, RwLock}, + task, + time::sleep, +}; use zksync_basic_types::{Address, L2ChainId}; use zksync_concurrency::{ctx, scope}; use zksync_config::configs::database::MerkleTreeMode; @@ -13,7 +17,7 @@ use zksync_core::{ api_server::{ execution_sandbox::VmConcurrencyLimiter, healthcheck::HealthCheckHandle, - tx_sender::{ApiContracts, TxSenderBuilder}, + tx_sender::{ApiContracts, TxCache, TxSenderBuilder}, web3::{ApiBuilder, Namespace}, }, block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert}, @@ -58,6 +62,7 @@ async fn build_state_keeper( miniblock_sealer_handle: MiniblockSealerHandle, stop_receiver: watch::Receiver, chain_id: L2ChainId, + proxy_cache: Arc>, ) -> ZkSyncStateKeeper { // These config values are used on the main node, and depending on these values certain transactions can // be *rejected* (that is, not included into the block). However, external node only mirrors what the main @@ -91,6 +96,7 @@ async fn build_state_keeper( l2_erc20_bridge_addr, validation_computational_gas_limit, chain_id, + proxy_cache, ) .await; @@ -154,6 +160,7 @@ async fn init_tasks( tokio::time::sleep(Duration::from_secs(10)).await; } })); + let tx_cache = Arc::new(RwLock::new(TxCache::default())); let state_keeper = build_state_keeper( action_queue, @@ -165,6 +172,7 @@ async fn init_tasks( miniblock_sealer_handle, stop_receiver.clone(), config.remote.l2_chain_id, + tx_cache.clone(), ) .await; @@ -280,7 +288,7 @@ async fn init_tasks( let tx_sender_builder = TxSenderBuilder::new(config.clone().into(), connection_pool.clone()) .with_main_connection_pool(connection_pool.clone()) - .with_tx_proxy(&main_node_url); + .with_tx_proxy(&main_node_url, tx_cache.clone()); if config.optional.transactions_per_sec_limit.is_some() { tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored"); diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index 7d5f1ab483fe..fb23cc7c1d08 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -8,6 +8,7 @@ use multivm::{ utils::{adjust_pubdata_price_for_tx, derive_base_fee_and_gas_per_pubdata, derive_overhead}, vm_latest::constants::{BLOCK_GAS_LIMIT, MAX_PUBDATA_PER_BLOCK}, }; +use tokio::sync::RwLock; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::StateKeeperConfig}; use zksync_contracts::BaseSystemContracts; use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, StorageProcessor}; @@ -27,6 +28,7 @@ use zksync_types::{ use zksync_utils::h256_to_u256; pub(super) use self::{proxy::TxProxy, result::SubmitTxError}; +pub use crate::api_server::tx_sender::proxy::TxCache; use crate::{ api_server::{ execution_sandbox::{ @@ -163,8 +165,8 @@ impl TxSenderBuilder { self } - pub fn with_tx_proxy(mut self, main_node_url: &str) -> Self { - self.proxy = Some(TxProxy::new(main_node_url)); + pub fn with_tx_proxy(mut self, main_node_url: &str, tx_cache: Arc>) -> Self { + self.proxy = Some(TxProxy::new(main_node_url, tx_cache)); self } @@ -352,10 +354,6 @@ impl TxSender { // Before it reaches the main node. proxy.save_tx(tx.hash(), tx.clone()).await; proxy.submit_tx(&tx).await?; - // Now, after we are sure that the tx is on the main node, remove it from cache - // since we don't want to store txs that might have been replaced or otherwise removed - // from the mempool. - proxy.forget_tx(tx.hash()).await; SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed()); APP_METRICS.processed_txs[&TxStage::Proxied].inc(); return Ok(L2TxSubmissionResult::Proxied); diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index fc1b01076254..4ea3ed516932 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use itertools::Itertools; use tokio::sync::RwLock; @@ -14,7 +14,7 @@ use zksync_web3_decl::{ }; #[derive(Debug, Default)] -struct TxCache { +pub struct TxCache { tx_cache: HashMap, tx_cache_by_account: HashMap>, } @@ -47,7 +47,7 @@ impl TxCache { txs.into_iter().sorted_by_key(|tx| tx.nonce()).collect() } - fn remove_tx(&mut self, tx_hash: &H256) { + pub(crate) fn remove_tx(&mut self, tx_hash: &H256) { let tx = self.tx_cache.remove(tx_hash); if let Some(tx) = tx { let account_tx_hashes = self @@ -64,17 +64,14 @@ impl TxCache { /// and store them while they're not synced back yet #[derive(Debug)] pub struct TxProxy { - tx_cache: RwLock, + tx_cache: Arc>, client: HttpClient, } impl TxProxy { - pub fn new(main_node_url: &str) -> Self { + pub fn new(main_node_url: &str, tx_cache: Arc>) -> Self { let client = HttpClientBuilder::default().build(main_node_url).unwrap(); - Self { - client, - tx_cache: RwLock::new(TxCache::default()), - } + Self { client, tx_cache } } pub async fn find_tx(&self, tx_hash: H256) -> Option { diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index 88a23d17ba37..e481d81d37ff 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -1,9 +1,12 @@ //! Utilities for testing the consensus module. +use std::sync::Arc; + use anyhow::Context as _; use rand::{ distributions::{Distribution, Standard}, Rng, }; +use tokio::sync::RwLock; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_roles::{node, validator}; use zksync_contracts::{BaseSystemContractsHashes, SystemContractCode}; @@ -14,6 +17,7 @@ use zksync_types::{ }; use crate::{ + api_server::tx_sender::TxCache, consensus::{ config::Config, storage::{BlockStore, CtxStorage}, @@ -375,6 +379,8 @@ impl StateKeeperRunner { let (stop_sender, stop_receiver) = sync::watch::channel(false); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(self.pool.clone(), 5); + let tx_cache = Arc::new(RwLock::new(TxCache::default())); + let io = ExternalIO::new( miniblock_sealer_handle, self.pool.clone(), @@ -384,6 +390,7 @@ impl StateKeeperRunner { Address::repeat_byte(11), u32::MAX, L2ChainId::default(), + tx_cache, ) .await; s.spawn_bg(miniblock_sealer.run()); diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index c800e83703f8..566b6613d5c0 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -1,8 +1,9 @@ -use std::{collections::HashMap, convert::TryInto, iter::FromIterator, time::Duration}; +use std::{collections::HashMap, convert::TryInto, iter::FromIterator, sync::Arc, time::Duration}; use async_trait::async_trait; use futures::future; use multivm::interface::{FinishedL1Batch, L1BatchEnv, SystemEnv}; +use tokio::sync::RwLock; use vm_utils::storage::wait_for_prev_l1_batch_params; use zksync_contracts::{BaseSystemContracts, SystemContractCode}; use zksync_dal::ConnectionPool; @@ -19,6 +20,7 @@ use super::{ SyncState, }; use crate::{ + api_server::tx_sender::TxCache, metrics::{BlockStage, APP_METRICS}, state_keeper::{ io::{ @@ -57,6 +59,7 @@ pub struct ExternalIO { // TODO it's required for system env, probably we have to get rid of getting system env validation_computational_gas_limit: u32, chain_id: L2ChainId, + proxy_cache: Arc>, } impl ExternalIO { @@ -70,6 +73,7 @@ impl ExternalIO { l2_erc20_bridge_addr: Address, validation_computational_gas_limit: u32, chain_id: L2ChainId, + proxy_cache: Arc>, ) -> Self { let mut storage = pool.access_storage_tagged("sync_layer").await.unwrap(); // TODO (PLA-703): Support no L1 batches / miniblocks in the storage @@ -109,6 +113,7 @@ impl ExternalIO { l2_erc20_bridge_addr, validation_computational_gas_limit, chain_id, + proxy_cache, } } @@ -423,6 +428,10 @@ impl StateKeeperIO for ExternalIO { let SyncAction::Tx(tx) = actions.pop_action().unwrap() else { unreachable!() }; + // Now, after we are sure that the tx is on the main node, remove it from cache + // since we don't want to store txs that might have been replaced or otherwise removed + // from the mempool. + self.proxy_cache.write().await.remove_tx(&tx.hash()); return Some(*tx); } _ => { diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index 2fc010f4a78f..c14ff66212cb 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -3,10 +3,14 @@ use std::{ collections::{HashMap, VecDeque}, iter, + sync::Arc, time::{Duration, Instant}, }; -use tokio::{sync::watch, task::JoinHandle}; +use tokio::{ + sync::{watch, RwLock}, + task::JoinHandle, +}; use zksync_config::configs::chain::NetworkConfig; use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_types::{ @@ -16,7 +20,7 @@ use zksync_types::{ use super::{fetcher::FetcherCursor, sync_action::SyncAction, *}; use crate::{ - api_server::web3::tests::spawn_http_server, + api_server::{tx_sender::TxCache, web3::tests::spawn_http_server}, consensus::testonly::MockMainNodeClient, genesis::{ensure_genesis_state, GenesisParams}, state_keeper::{ @@ -61,7 +65,7 @@ impl StateKeeperHandles { let sync_state = SyncState::new(); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(pool.clone(), 5); tokio::spawn(miniblock_sealer.run()); - + let tx_cache = Arc::new(RwLock::new(TxCache::default())); let io = ExternalIO::new( miniblock_sealer_handle, pool, @@ -71,6 +75,7 @@ impl StateKeeperHandles { Address::repeat_byte(1), u32::MAX, L2ChainId::default(), + tx_cache, ) .await; From e60b212932860f5037795ae0a2978957c0d10f4f Mon Sep 17 00:00:00 2001 From: Danil Date: Fri, 2 Feb 2024 12:17:38 +0100 Subject: [PATCH 03/16] Move removing txs from the proxy to the additional component Signed-off-by: Danil --- core/bin/external_node/src/main.rs | 8 ++-- ...f6c092de60e6f7180c1f85fbe72ccac09e2fa.json | 22 +++++++++++ core/lib/dal/src/transactions_dal.rs | 21 +++++++++++ .../src/api_server/tx_sender/proxy.rs | 4 ++ .../zksync_core/src/api_server/web3/mod.rs | 25 +++++++++++-- .../zksync_core/src/api_server/web3/state.rs | 37 ++++++++++++++++++- .../lib/zksync_core/src/consensus/testonly.rs | 5 --- .../zksync_core/src/sync_layer/external_io.rs | 11 +----- core/lib/zksync_core/src/sync_layer/tests.rs | 10 +---- 9 files changed, 110 insertions(+), 33 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa.json diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 475f3cb09bc1..acc282df2b23 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -62,7 +62,6 @@ async fn build_state_keeper( miniblock_sealer_handle: MiniblockSealerHandle, stop_receiver: watch::Receiver, chain_id: L2ChainId, - proxy_cache: Arc>, ) -> ZkSyncStateKeeper { // These config values are used on the main node, and depending on these values certain transactions can // be *rejected* (that is, not included into the block). However, external node only mirrors what the main @@ -96,7 +95,6 @@ async fn build_state_keeper( l2_erc20_bridge_addr, validation_computational_gas_limit, chain_id, - proxy_cache, ) .await; @@ -172,7 +170,6 @@ async fn init_tasks( miniblock_sealer_handle, stop_receiver.clone(), config.remote.l2_chain_id, - tx_cache.clone(), ) .await; @@ -284,7 +281,7 @@ async fn init_tasks( let fee_params_fetcher_handle = tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone())); - let (tx_sender, vm_barrier, cache_update_handle) = { + let (tx_sender, vm_barrier, cache_update_handle, tx_cache) = { let tx_sender_builder = TxSenderBuilder::new(config.clone().into(), connection_pool.clone()) .with_main_connection_pool(connection_pool.clone()) @@ -317,7 +314,7 @@ async fn init_tasks( storage_caches, ) .await; - (tx_sender, vm_barrier, cache_update_handle) + (tx_sender, vm_barrier, cache_update_handle, tx_cache) }; let http_server_handles = @@ -328,6 +325,7 @@ async fn init_tasks( .with_response_body_size_limit(config.optional.max_response_body_size()) .with_tx_sender(tx_sender.clone(), vm_barrier.clone()) .with_sync_state(sync_state.clone()) + .with_tx_cache_updater(tx_cache) .enable_api_namespaces(config.optional.api_namespaces()) .build(stop_receiver.clone()) .await diff --git a/core/lib/dal/.sqlx/query-4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa.json b/core/lib/dal/.sqlx/query-4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa.json new file mode 100644 index 000000000000..757a4a71c1de --- /dev/null +++ b/core/lib/dal/.sqlx/query-4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n hash\n FROM\n transactions\n WHERE\n hash = ANY ($1)\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hash", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [ + false + ] + }, + "hash": "4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa" +} diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index b084a1ba01a1..b01a7203b27c 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -1352,6 +1352,27 @@ impl TransactionsDal<'_, '_> { .unwrap() .map(|tx| tx.into()) } + + pub async fn check_tx_hashes(&mut self, tx_hashes: &[H256]) -> sqlx::Result> { + let hashes: Vec<_> = tx_hashes.iter().map(|hash| hash.as_bytes()).collect(); + let res = sqlx::query!( + r#" + SELECT + hash + FROM + transactions + WHERE + hash = ANY ($1) + "#, + &hashes as &[&[u8]], + ) + .fetch_all(self.storage.conn()) + .await? + .into_iter() + .map(|row| H256::from_slice(&row.hash)) + .collect(); + Ok(res) + } } #[cfg(test)] diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index 4ea3ed516932..218ec0ad3cef 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -47,6 +47,10 @@ impl TxCache { txs.into_iter().sorted_by_key(|tx| tx.nonce()).collect() } + pub(crate) fn get_txs_hash(&self) -> Vec { + self.tx_cache.keys().cloned().collect() + } + pub(crate) fn remove_tx(&mut self, tx_hash: &H256) { let tx = self.tx_cache.remove(tx_hash); if let Some(tx) = tx { diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 0f88382a1d18..41652e5230c3 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -5,7 +5,7 @@ use chrono::NaiveDateTime; use futures::future; use serde::Deserialize; use tokio::{ - sync::{mpsc, oneshot, watch, Mutex}, + sync::{mpsc, oneshot, watch, Mutex, RwLock}, task::JoinHandle, }; use tower_http::{cors::CorsLayer, metrics::InFlightRequestsLayer}; @@ -37,8 +37,11 @@ use crate::{ api_server::{ execution_sandbox::{BlockStartInfo, VmConcurrencyBarrier}, tree::TreeApiHttpClient, - tx_sender::TxSender, - web3::backend_jsonrpsee::batch_limiter_middleware::LimitMiddleware, + tx_sender::{TxCache, TxSender}, + web3::{ + backend_jsonrpsee::batch_limiter_middleware::LimitMiddleware, + state::TxProxyCacheUpdater, + }, }, sync_layer::SyncState, }; @@ -114,6 +117,7 @@ struct OptionalApiParams { websocket_requests_per_minute_limit: Option, tree_api_url: Option, pub_sub_events_sender: Option>, + tx_cache: Option>>, } /// Full API server parameters. @@ -216,6 +220,11 @@ impl ApiBuilder { self } + pub fn with_tx_cache_updater(mut self, tx_cache: Arc>) -> Self { + self.optional.tx_cache = Some(tx_cache); + self + } + pub fn with_sync_state(mut self, sync_state: SyncState) -> Self { self.optional.sync_state = Some(sync_state); self @@ -411,8 +420,18 @@ impl FullApiParams { SEALED_MINIBLOCK_UPDATE_INTERVAL, stop_receiver.clone(), ); + let mut tasks = vec![tokio::spawn(update_task)]; + if let Some(tx_cache) = self.optional.tx_cache.clone() { + let task = TxProxyCacheUpdater::run( + self.last_miniblock_pool.clone(), + tx_cache, + Duration::from_secs(10), + stop_receiver.clone(), + ); + tasks.push(tokio::spawn(task)); + } let pub_sub = if matches!(transport, ApiTransport::WebSocket(_)) && self.namespaces.contains(&Namespace::Pubsub) { diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index 180e2de7211f..206d49af2660 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -8,7 +8,7 @@ use std::{ }; use lru::LruCache; -use tokio::sync::{watch, Mutex}; +use tokio::sync::{watch, Mutex, RwLock}; use vise::GaugeGuard; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::NetworkConfig, ContractsConfig}; use zksync_dal::{ConnectionPool, StorageProcessor}; @@ -23,7 +23,7 @@ use crate::{ api_server::{ execution_sandbox::{BlockArgs, BlockArgsError, BlockStartInfo}, tree::TreeApiHttpClient, - tx_sender::TxSender, + tx_sender::{TxCache, TxSender}, web3::{backend_jsonrpsee::internal_error, TypedFilter}, }, sync_layer::SyncState, @@ -114,6 +114,39 @@ impl InternalApiConfig { } } +#[derive(Debug, Clone)] +pub(crate) struct TxProxyCacheUpdater; + +impl TxProxyCacheUpdater { + /// Creates a handle to the last sealed miniblock number together with a task that will update + /// it on a schedule. + pub async fn run( + connection_pool: ConnectionPool, + tx_cache: Arc>, + update_interval: Duration, + stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + loop { + if *stop_receiver.borrow() { + tracing::debug!("Stopping latest sealed miniblock updates"); + return Ok(()); + } + + let hashes = tx_cache.read().await.get_txs_hash(); + let mut connection = connection_pool.access_storage_tagged("api").await?; + let applied_tx_hashes = connection + .transactions_dal() + .check_tx_hashes(&hashes) + .await?; + for tx_hash in applied_tx_hashes { + tx_cache.write().await.remove_tx(&tx_hash); + } + drop(connection); + tokio::time::sleep(update_interval).await; + } + } +} + /// Thread-safe updatable information about the last sealed miniblock number. /// /// The information may be temporarily outdated and thus should only be used where this is OK diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index e481d81d37ff..47bcdbc77be6 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -1,12 +1,10 @@ //! Utilities for testing the consensus module. -use std::sync::Arc; use anyhow::Context as _; use rand::{ distributions::{Distribution, Standard}, Rng, }; -use tokio::sync::RwLock; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_roles::{node, validator}; use zksync_contracts::{BaseSystemContractsHashes, SystemContractCode}; @@ -17,7 +15,6 @@ use zksync_types::{ }; use crate::{ - api_server::tx_sender::TxCache, consensus::{ config::Config, storage::{BlockStore, CtxStorage}, @@ -379,7 +376,6 @@ impl StateKeeperRunner { let (stop_sender, stop_receiver) = sync::watch::channel(false); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(self.pool.clone(), 5); - let tx_cache = Arc::new(RwLock::new(TxCache::default())); let io = ExternalIO::new( miniblock_sealer_handle, @@ -390,7 +386,6 @@ impl StateKeeperRunner { Address::repeat_byte(11), u32::MAX, L2ChainId::default(), - tx_cache, ) .await; s.spawn_bg(miniblock_sealer.run()); diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index 566b6613d5c0..c800e83703f8 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -1,9 +1,8 @@ -use std::{collections::HashMap, convert::TryInto, iter::FromIterator, sync::Arc, time::Duration}; +use std::{collections::HashMap, convert::TryInto, iter::FromIterator, time::Duration}; use async_trait::async_trait; use futures::future; use multivm::interface::{FinishedL1Batch, L1BatchEnv, SystemEnv}; -use tokio::sync::RwLock; use vm_utils::storage::wait_for_prev_l1_batch_params; use zksync_contracts::{BaseSystemContracts, SystemContractCode}; use zksync_dal::ConnectionPool; @@ -20,7 +19,6 @@ use super::{ SyncState, }; use crate::{ - api_server::tx_sender::TxCache, metrics::{BlockStage, APP_METRICS}, state_keeper::{ io::{ @@ -59,7 +57,6 @@ pub struct ExternalIO { // TODO it's required for system env, probably we have to get rid of getting system env validation_computational_gas_limit: u32, chain_id: L2ChainId, - proxy_cache: Arc>, } impl ExternalIO { @@ -73,7 +70,6 @@ impl ExternalIO { l2_erc20_bridge_addr: Address, validation_computational_gas_limit: u32, chain_id: L2ChainId, - proxy_cache: Arc>, ) -> Self { let mut storage = pool.access_storage_tagged("sync_layer").await.unwrap(); // TODO (PLA-703): Support no L1 batches / miniblocks in the storage @@ -113,7 +109,6 @@ impl ExternalIO { l2_erc20_bridge_addr, validation_computational_gas_limit, chain_id, - proxy_cache, } } @@ -428,10 +423,6 @@ impl StateKeeperIO for ExternalIO { let SyncAction::Tx(tx) = actions.pop_action().unwrap() else { unreachable!() }; - // Now, after we are sure that the tx is on the main node, remove it from cache - // since we don't want to store txs that might have been replaced or otherwise removed - // from the mempool. - self.proxy_cache.write().await.remove_tx(&tx.hash()); return Some(*tx); } _ => { diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index c14ff66212cb..0658ac9fda35 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -3,14 +3,10 @@ use std::{ collections::{HashMap, VecDeque}, iter, - sync::Arc, time::{Duration, Instant}, }; -use tokio::{ - sync::{watch, RwLock}, - task::JoinHandle, -}; +use tokio::{sync::watch, task::JoinHandle}; use zksync_config::configs::chain::NetworkConfig; use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_types::{ @@ -20,7 +16,7 @@ use zksync_types::{ use super::{fetcher::FetcherCursor, sync_action::SyncAction, *}; use crate::{ - api_server::{tx_sender::TxCache, web3::tests::spawn_http_server}, + api_server::web3::tests::spawn_http_server, consensus::testonly::MockMainNodeClient, genesis::{ensure_genesis_state, GenesisParams}, state_keeper::{ @@ -65,7 +61,6 @@ impl StateKeeperHandles { let sync_state = SyncState::new(); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(pool.clone(), 5); tokio::spawn(miniblock_sealer.run()); - let tx_cache = Arc::new(RwLock::new(TxCache::default())); let io = ExternalIO::new( miniblock_sealer_handle, pool, @@ -75,7 +70,6 @@ impl StateKeeperHandles { Address::repeat_byte(1), u32::MAX, L2ChainId::default(), - tx_cache, ) .await; From 134d0fa66c9947941ed01b5b208ab01024139564 Mon Sep 17 00:00:00 2001 From: Danil Date: Mon, 5 Feb 2024 15:07:36 +0100 Subject: [PATCH 04/16] Remove tx hash from proxy using nonce Signed-off-by: Danil --- ...f6c092de60e6f7180c1f85fbe72ccac09e2fa.json | 22 --------------- ...bb824c2da5d81d4765fe72a5d9c7994a0d2fe.json | 28 +++++++++++++++++++ core/lib/dal/src/transactions_dal.rs | 15 ++++++++-- .../src/api_server/tx_sender/proxy.rs | 18 +++++++++++- .../src/api_server/web3/namespaces/eth.rs | 7 +++-- .../zksync_core/src/api_server/web3/state.rs | 7 +++-- .../ts-integration/tests/mempool.test.ts | 5 ++-- 7 files changed, 70 insertions(+), 32 deletions(-) delete mode 100644 core/lib/dal/.sqlx/query-4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa.json create mode 100644 core/lib/dal/.sqlx/query-5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe.json diff --git a/core/lib/dal/.sqlx/query-4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa.json b/core/lib/dal/.sqlx/query-4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa.json deleted file mode 100644 index 757a4a71c1de..000000000000 --- a/core/lib/dal/.sqlx/query-4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n hash\n FROM\n transactions\n WHERE\n hash = ANY ($1)\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "hash", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "ByteaArray" - ] - }, - "nullable": [ - false - ] - }, - "hash": "4086b3d538862b8b3b3424bab48f6c092de60e6f7180c1f85fbe72ccac09e2fa" -} diff --git a/core/lib/dal/.sqlx/query-5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe.json b/core/lib/dal/.sqlx/query-5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe.json new file mode 100644 index 000000000000..e314b4e57dec --- /dev/null +++ b/core/lib/dal/.sqlx/query-5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n initiator_address AS \"initiator_address!\",\n nonce AS \"nonce!\"\n FROM\n transactions\n WHERE\n hash = ANY ($1)\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "initiator_address!", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "nonce!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [ + false, + true + ] + }, + "hash": "5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe" +} diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index b01a7203b27c..5223b1da2282 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -1353,12 +1353,16 @@ impl TransactionsDal<'_, '_> { .map(|tx| tx.into()) } - pub async fn check_tx_hashes(&mut self, tx_hashes: &[H256]) -> sqlx::Result> { + pub async fn check_tx_hashes( + &mut self, + tx_hashes: &[H256], + ) -> sqlx::Result> { let hashes: Vec<_> = tx_hashes.iter().map(|hash| hash.as_bytes()).collect(); let res = sqlx::query!( r#" SELECT - hash + initiator_address AS "initiator_address!", + nonce AS "nonce!" FROM transactions WHERE @@ -1369,7 +1373,12 @@ impl TransactionsDal<'_, '_> { .fetch_all(self.storage.conn()) .await? .into_iter() - .map(|row| H256::from_slice(&row.hash)) + .map(|row| { + ( + Address::from_slice(&row.initiator_address), + Nonce(row.nonce as u32), + ) + }) .collect(); Ok(res) } diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index 218ec0ad3cef..c9fa323ad2b4 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -5,7 +5,7 @@ use tokio::sync::RwLock; use zksync_types::{ api::{BlockId, Transaction, TransactionDetails, TransactionId}, l2::L2Tx, - Address, H256, + Address, Nonce, H256, }; use zksync_web3_decl::{ jsonrpsee::http_client::{HttpClient, HttpClientBuilder}, @@ -17,16 +17,20 @@ use zksync_web3_decl::{ pub struct TxCache { tx_cache: HashMap, tx_cache_by_account: HashMap>, + tx_cache_by_account_nonce: HashMap<(Address, Nonce), H256>, } impl TxCache { fn push(&mut self, tx_hash: H256, tx: L2Tx) { let account_address = tx.common_data.initiator_address; + let nonce = tx.common_data.nonce; self.tx_cache.insert(tx_hash, tx); self.tx_cache_by_account .entry(account_address) .or_default() .push(tx_hash); + self.tx_cache_by_account_nonce + .insert((account_address, nonce), tx_hash); } fn get_tx(&self, tx_hash: &H256) -> Option { @@ -51,6 +55,16 @@ impl TxCache { self.tx_cache.keys().cloned().collect() } + pub(crate) fn remove_tx_by_account_nonce(&mut self, account: Address, nonce: Nonce) { + let tx_hash = self + .tx_cache_by_account_nonce + .get(&(account, nonce)) + .cloned(); + if let Some(tx_hash) = tx_hash { + self.remove_tx(&tx_hash); + } + } + pub(crate) fn remove_tx(&mut self, tx_hash: &H256) { let tx = self.tx_cache.remove(tx_hash); if let Some(tx) = tx { @@ -60,6 +74,8 @@ impl TxCache { if let Some(account_tx_hashes) = account_tx_hashes { account_tx_hashes.retain(|&hash| hash != *tx_hash); } + self.tx_cache_by_account_nonce + .remove(&(tx.common_data.initiator_address, tx.common_data.nonce)); } } } diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 94fd82b1ba3e..2071081eb622 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -491,15 +491,18 @@ impl EthNamespace { .map_err(|err| internal_error(method_name, err))?; } + let mut actual_tx_number = account_nonce.as_u32().saturating_sub(1); if let Some(proxy) = &self.state.tx_sender.0.proxy { for tx in proxy.get_txs_by_account(address).await { // If nonce is not sequential, then we should not increment nonce. - if tx.nonce().0 == account_nonce.as_u32() + 1 { - account_nonce += U256::one(); + if tx.nonce().0 == actual_tx_number + 1 { + actual_tx_number += 1; } } }; + let account_nonce = std::cmp::max(account_nonce, U256::from(actual_tx_number)); + let block_diff = self.state.last_sealed_miniblock.diff(block_number); method_latency.observe(block_diff); Ok(account_nonce) diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index 206d49af2660..e2fe05641f5f 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -138,8 +138,11 @@ impl TxProxyCacheUpdater { .transactions_dal() .check_tx_hashes(&hashes) .await?; - for tx_hash in applied_tx_hashes { - tx_cache.write().await.remove_tx(&tx_hash); + for (account, nonce) in applied_tx_hashes { + tx_cache + .write() + .await + .remove_tx_by_account_nonce(account, nonce); } drop(connection); tokio::time::sleep(update_interval).await; diff --git a/core/tests/ts-integration/tests/mempool.test.ts b/core/tests/ts-integration/tests/mempool.test.ts index 160b2a2b81a8..b4fdafd54a61 100644 --- a/core/tests/ts-integration/tests/mempool.test.ts +++ b/core/tests/ts-integration/tests/mempool.test.ts @@ -81,14 +81,15 @@ describe('Tests for the mempool behavior', () => { nonce: startNonce + 1, to: bob.address }); - // First transaction should disappear from the server. - await expect(alice.provider.getTransaction(tx2.hash)).resolves.toBeNull(); // Now fill the gap and see what gets executed await sendTxWithNonce(alice, startNonce).then((tx) => tx.wait()); const replacedReceipt = await replacedTx2.wait(); expect(replacedReceipt.to).toEqual(bob.address); + + // First transaction should disappear from the server. + await expect(alice.provider.getTransaction(tx2.hash)).resolves.toBeNull(); }); test('Should reject a pre-sent transaction with not enough balance', async () => { From 3ea9375aea6b5aef2976dcdc76c6c2be87c061a0 Mon Sep 17 00:00:00 2001 From: Danil Date: Mon, 5 Feb 2024 15:41:24 +0100 Subject: [PATCH 05/16] Reduce update interval Signed-off-by: Danil --- core/lib/zksync_core/src/api_server/web3/mod.rs | 2 +- core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 41652e5230c3..02caa3f68921 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -427,7 +427,7 @@ impl FullApiParams { let task = TxProxyCacheUpdater::run( self.last_miniblock_pool.clone(), tx_cache, - Duration::from_secs(10), + Duration::from_secs(1), stop_receiver.clone(), ); tasks.push(tokio::spawn(task)); diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 2071081eb622..8b5ad494e05c 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -491,6 +491,10 @@ impl EthNamespace { .map_err(|err| internal_error(method_name, err))?; } + // Number of txs accepted by the node. + // We should add the nonces from txs in our cache for making the nonce + // aligned with the main node. If the nonce is not sequential, + // then we should not increment nonce. let mut actual_tx_number = account_nonce.as_u32().saturating_sub(1); if let Some(proxy) = &self.state.tx_sender.0.proxy { for tx in proxy.get_txs_by_account(address).await { @@ -501,6 +505,8 @@ impl EthNamespace { } }; + // We should take the maximum nonce from the cache and the account nonce. + // If there are no txs in the cache the account nonce will be higher. let account_nonce = std::cmp::max(account_nonce, U256::from(actual_tx_number)); let block_diff = self.state.last_sealed_miniblock.diff(block_number); From b152cd082b3766ca24b415ce0f4b3a2e1ad3bf3b Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 5 Feb 2024 18:54:57 +0200 Subject: [PATCH 06/16] Simplify --- core/bin/external_node/src/main.rs | 16 +- core/lib/dal/src/storage_web3_dal.rs | 14 +- .../src/api_server/tx_sender/mod.rs | 8 +- .../src/api_server/tx_sender/proxy.rs | 151 +++++++++++------- .../zksync_core/src/api_server/web3/mod.rs | 26 +-- .../src/api_server/web3/namespaces/eth.rs | 4 +- .../zksync_core/src/api_server/web3/state.rs | 40 +---- 7 files changed, 120 insertions(+), 139 deletions(-) diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index acc282df2b23..8765d656ca59 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -5,11 +5,7 @@ use clap::Parser; use futures::{future::FusedFuture, FutureExt as _}; use metrics::EN_METRICS; use prometheus_exporter::PrometheusExporterConfig; -use tokio::{ - sync::{watch, RwLock}, - task, - time::sleep, -}; +use tokio::{sync::watch, task, time::sleep}; use zksync_basic_types::{Address, L2ChainId}; use zksync_concurrency::{ctx, scope}; use zksync_config::configs::database::MerkleTreeMode; @@ -17,7 +13,7 @@ use zksync_core::{ api_server::{ execution_sandbox::VmConcurrencyLimiter, healthcheck::HealthCheckHandle, - tx_sender::{ApiContracts, TxCache, TxSenderBuilder}, + tx_sender::{ApiContracts, TxSenderBuilder}, web3::{ApiBuilder, Namespace}, }, block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert}, @@ -158,7 +154,6 @@ async fn init_tasks( tokio::time::sleep(Duration::from_secs(10)).await; } })); - let tx_cache = Arc::new(RwLock::new(TxCache::default())); let state_keeper = build_state_keeper( action_queue, @@ -281,11 +276,11 @@ async fn init_tasks( let fee_params_fetcher_handle = tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone())); - let (tx_sender, vm_barrier, cache_update_handle, tx_cache) = { + let (tx_sender, vm_barrier, cache_update_handle) = { let tx_sender_builder = TxSenderBuilder::new(config.clone().into(), connection_pool.clone()) .with_main_connection_pool(connection_pool.clone()) - .with_tx_proxy(&main_node_url, tx_cache.clone()); + .with_tx_proxy(&main_node_url); if config.optional.transactions_per_sec_limit.is_some() { tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored"); @@ -314,7 +309,7 @@ async fn init_tasks( storage_caches, ) .await; - (tx_sender, vm_barrier, cache_update_handle, tx_cache) + (tx_sender, vm_barrier, cache_update_handle) }; let http_server_handles = @@ -325,7 +320,6 @@ async fn init_tasks( .with_response_body_size_limit(config.optional.max_response_body_size()) .with_tx_sender(tx_sender.clone(), vm_barrier.clone()) .with_sync_state(sync_state.clone()) - .with_tx_cache_updater(tx_cache) .enable_api_namespaces(config.optional.api_namespaces()) .build(stop_receiver.clone()) .await diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index 312c46acba23..1a64a157026e 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -1,9 +1,9 @@ -use std::ops; +use std::{collections::HashMap, ops}; use zksync_types::{ get_code_key, get_nonce_key, utils::{decompose_full_nonce, storage_key_for_standard_token_balance}, - AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, + AccountTreeId, Address, L1BatchNumber, MiniblockNumber, Nonce, StorageKey, FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, U256, }; use zksync_utils::h256_to_u256; @@ -32,6 +32,16 @@ impl StorageWeb3Dal<'_, '_> { Ok(decompose_full_nonce(full_nonce).0) } + /// Returns the current *stored* nonces (i.e., w/o accounting for pending transactions) for the specified accounts. + pub async fn get_nonces_for_addresses( + &mut self, + _addresses: &[Address], + ) -> sqlx::Result> { + todo!() + // You should be able to implement this. This method could be reused in `MempoolFetcher`, esp. after + // https://github.com/matter-labs/zksync-era/pull/982 is merged + } + pub async fn standard_token_historical_balance( &mut self, token_id: AccountTreeId, diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index fb23cc7c1d08..3aa216108faf 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -8,7 +8,6 @@ use multivm::{ utils::{adjust_pubdata_price_for_tx, derive_base_fee_and_gas_per_pubdata, derive_overhead}, vm_latest::constants::{BLOCK_GAS_LIMIT, MAX_PUBDATA_PER_BLOCK}, }; -use tokio::sync::RwLock; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::StateKeeperConfig}; use zksync_contracts::BaseSystemContracts; use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, StorageProcessor}; @@ -28,7 +27,6 @@ use zksync_types::{ use zksync_utils::h256_to_u256; pub(super) use self::{proxy::TxProxy, result::SubmitTxError}; -pub use crate::api_server::tx_sender::proxy::TxCache; use crate::{ api_server::{ execution_sandbox::{ @@ -165,8 +163,8 @@ impl TxSenderBuilder { self } - pub fn with_tx_proxy(mut self, main_node_url: &str, tx_cache: Arc>) -> Self { - self.proxy = Some(TxProxy::new(main_node_url, tx_cache)); + pub fn with_tx_proxy(mut self, main_node_url: &str) -> Self { + self.proxy = Some(TxProxy::new(main_node_url)); self } @@ -352,7 +350,7 @@ impl TxSender { // We're running an external node: we have to proxy the transaction to the main node. // But before we do that, save the tx to cache in case someone will request it // Before it reaches the main node. - proxy.save_tx(tx.hash(), tx.clone()).await; + proxy.save_tx(tx.clone()).await; proxy.submit_tx(&tx).await?; SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed()); APP_METRICS.processed_txs[&TxStage::Proxied].inc(); diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index c9fa323ad2b4..a81a7fc2f88b 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -1,7 +1,12 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{BTreeSet, HashMap}, + future::Future, + sync::Arc, + time::Duration, +}; -use itertools::Itertools; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; +use zksync_dal::ConnectionPool; use zksync_types::{ api::{BlockId, Transaction, TransactionDetails, TransactionId}, l2::L2Tx, @@ -13,69 +18,84 @@ use zksync_web3_decl::{ RpcResult, }; +#[derive(Debug, Clone, Default)] +pub(crate) struct TxCache { + inner: Arc>, +} + #[derive(Debug, Default)] -pub struct TxCache { +struct TxCacheInner { tx_cache: HashMap, - tx_cache_by_account: HashMap>, - tx_cache_by_account_nonce: HashMap<(Address, Nonce), H256>, + nonces_by_account: HashMap>, } impl TxCache { - fn push(&mut self, tx_hash: H256, tx: L2Tx) { - let account_address = tx.common_data.initiator_address; - let nonce = tx.common_data.nonce; - self.tx_cache.insert(tx_hash, tx); - self.tx_cache_by_account - .entry(account_address) + async fn push(&self, tx: L2Tx) { + let mut inner = self.inner.write().await; + inner + .nonces_by_account + .entry(tx.initiator_account()) .or_default() - .push(tx_hash); - self.tx_cache_by_account_nonce - .insert((account_address, nonce), tx_hash); + .insert(tx.nonce()); + inner.tx_cache.insert(tx.hash(), tx); } - fn get_tx(&self, tx_hash: &H256) -> Option { - self.tx_cache.get(tx_hash).cloned() + async fn get_tx(&self, tx_hash: H256) -> Option { + self.inner.read().await.tx_cache.get(&tx_hash).cloned() } - fn get_txs_by_account(&self, account_address: Address) -> Vec { - let Some(tx_hashes) = self.tx_cache_by_account.get(&account_address) else { - return Vec::new(); - }; - - let mut txs = Vec::new(); - for tx_hash in tx_hashes { - if let Some(tx) = self.get_tx(tx_hash) { - txs.push(tx); - } + async fn get_nonces_for_account(&self, account_address: Address) -> Vec { + let inner = self.inner.read().await; + if let Some(nonces) = inner.nonces_by_account.get(&account_address) { + nonces.iter().copied().collect() + } else { + vec![] } - txs.into_iter().sorted_by_key(|tx| tx.nonce()).collect() } - pub(crate) fn get_txs_hash(&self) -> Vec { - self.tx_cache.keys().cloned().collect() + async fn remove_tx(&self, tx_hash: H256) { + self.inner.write().await.tx_cache.remove(&tx_hash); + // We intentionally don't change `nonces_by_account`; they should only be changed in response to new miniblocks } - pub(crate) fn remove_tx_by_account_nonce(&mut self, account: Address, nonce: Nonce) { - let tx_hash = self - .tx_cache_by_account_nonce - .get(&(account, nonce)) - .cloned(); - if let Some(tx_hash) = tx_hash { - self.remove_tx(&tx_hash); - } - } + async fn run_updates( + self, + pool: ConnectionPool, + stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + const UPDATE_INTERVAL: Duration = Duration::from_secs(1); - pub(crate) fn remove_tx(&mut self, tx_hash: &H256) { - let tx = self.tx_cache.remove(tx_hash); - if let Some(tx) = tx { - let account_tx_hashes = self - .tx_cache_by_account - .get_mut(&tx.common_data.initiator_address); - if let Some(account_tx_hashes) = account_tx_hashes { - account_tx_hashes.retain(|&hash| hash != *tx_hash); + loop { + if *stop_receiver.borrow() { + return Ok(()); } - self.tx_cache_by_account_nonce - .remove(&(tx.common_data.initiator_address, tx.common_data.nonce)); + + let addresses: Vec<_> = { + // Split into 2 statements for readability. + let inner = self.inner.read().await; + inner.nonces_by_account.keys().copied().collect() + }; + let mut storage = pool.access_storage_tagged("api").await?; + let nonces_for_accounts = storage + .storage_web3_dal() + .get_nonces_for_addresses(&addresses) + .await?; + drop(storage); // Don't hold both `storage` and lock on `inner` at the same time. + + let mut inner = self.inner.write().await; + inner.nonces_by_account.retain(|address, account_nonces| { + let stored_nonce = nonces_for_accounts + .get(address) + .copied() + .unwrap_or(Nonce(0)); + // Retain only nonces exceeding the stored one. + *account_nonces = account_nonces.split_off(&(stored_nonce + 1)); + // If we've removed all nonces, drop the account entry so we don't request stored nonces for it later. + !account_nonces.is_empty() + }); + drop(inner); + + tokio::time::sleep(UPDATE_INTERVAL).await; } } } @@ -84,33 +104,33 @@ impl TxCache { /// and store them while they're not synced back yet #[derive(Debug)] pub struct TxProxy { - tx_cache: Arc>, + tx_cache: TxCache, client: HttpClient, } impl TxProxy { - pub fn new(main_node_url: &str, tx_cache: Arc>) -> Self { + pub fn new(main_node_url: &str) -> Self { let client = HttpClientBuilder::default().build(main_node_url).unwrap(); - Self { client, tx_cache } + Self { + client, + tx_cache: TxCache::default(), + } } pub async fn find_tx(&self, tx_hash: H256) -> Option { - self.tx_cache.read().await.get_tx(&tx_hash) + self.tx_cache.get_tx(tx_hash).await } pub async fn forget_tx(&self, tx_hash: H256) { - self.tx_cache.write().await.remove_tx(&tx_hash) + self.tx_cache.remove_tx(tx_hash).await; } - pub async fn save_tx(&self, tx_hash: H256, tx: L2Tx) { - self.tx_cache.write().await.push(tx_hash, tx) + pub async fn save_tx(&self, tx: L2Tx) { + self.tx_cache.push(tx).await; } - pub async fn get_txs_by_account(&self, account_address: Address) -> Vec { - self.tx_cache - .read() - .await - .get_txs_by_account(account_address) + pub async fn get_nonces_by_account(&self, account_address: Address) -> Vec { + self.tx_cache.get_nonces_for_account(account_address).await } pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult { @@ -139,4 +159,13 @@ impl TxProxy { pub async fn request_tx_details(&self, hash: H256) -> RpcResult> { self.client.get_transaction_details(hash).await } + + pub fn run_account_nonce_sweeper( + &self, + pool: ConnectionPool, + stop_receiver: watch::Receiver, + ) -> impl Future> { + let tx_cache = self.tx_cache.clone(); + tx_cache.run_updates(pool, stop_receiver) + } } diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 02caa3f68921..a77c8c4e3ab5 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -5,7 +5,7 @@ use chrono::NaiveDateTime; use futures::future; use serde::Deserialize; use tokio::{ - sync::{mpsc, oneshot, watch, Mutex, RwLock}, + sync::{mpsc, oneshot, watch, Mutex}, task::JoinHandle, }; use tower_http::{cors::CorsLayer, metrics::InFlightRequestsLayer}; @@ -37,11 +37,8 @@ use crate::{ api_server::{ execution_sandbox::{BlockStartInfo, VmConcurrencyBarrier}, tree::TreeApiHttpClient, - tx_sender::{TxCache, TxSender}, - web3::{ - backend_jsonrpsee::batch_limiter_middleware::LimitMiddleware, - state::TxProxyCacheUpdater, - }, + tx_sender::TxSender, + web3::backend_jsonrpsee::batch_limiter_middleware::LimitMiddleware, }, sync_layer::SyncState, }; @@ -117,7 +114,6 @@ struct OptionalApiParams { websocket_requests_per_minute_limit: Option, tree_api_url: Option, pub_sub_events_sender: Option>, - tx_cache: Option>>, } /// Full API server parameters. @@ -220,11 +216,6 @@ impl ApiBuilder { self } - pub fn with_tx_cache_updater(mut self, tx_cache: Arc>) -> Self { - self.optional.tx_cache = Some(tx_cache); - self - } - pub fn with_sync_state(mut self, sync_state: SyncState) -> Self { self.optional.sync_state = Some(sync_state); self @@ -422,14 +413,9 @@ impl FullApiParams { ); let mut tasks = vec![tokio::spawn(update_task)]; - - if let Some(tx_cache) = self.optional.tx_cache.clone() { - let task = TxProxyCacheUpdater::run( - self.last_miniblock_pool.clone(), - tx_cache, - Duration::from_secs(1), - stop_receiver.clone(), - ); + if let Some(tx_proxy) = &self.tx_sender.0.proxy { + let task = tx_proxy + .run_account_nonce_sweeper(self.last_miniblock_pool.clone(), stop_receiver.clone()); tasks.push(tokio::spawn(task)); } let pub_sub = if matches!(transport, ApiTransport::WebSocket(_)) diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 8b5ad494e05c..350c7f8c2d05 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -497,9 +497,9 @@ impl EthNamespace { // then we should not increment nonce. let mut actual_tx_number = account_nonce.as_u32().saturating_sub(1); if let Some(proxy) = &self.state.tx_sender.0.proxy { - for tx in proxy.get_txs_by_account(address).await { + for pending_nonce in proxy.get_nonces_by_account(address).await { // If nonce is not sequential, then we should not increment nonce. - if tx.nonce().0 == actual_tx_number + 1 { + if pending_nonce.0 == actual_tx_number + 1 { actual_tx_number += 1; } } diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index e2fe05641f5f..180e2de7211f 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -8,7 +8,7 @@ use std::{ }; use lru::LruCache; -use tokio::sync::{watch, Mutex, RwLock}; +use tokio::sync::{watch, Mutex}; use vise::GaugeGuard; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::NetworkConfig, ContractsConfig}; use zksync_dal::{ConnectionPool, StorageProcessor}; @@ -23,7 +23,7 @@ use crate::{ api_server::{ execution_sandbox::{BlockArgs, BlockArgsError, BlockStartInfo}, tree::TreeApiHttpClient, - tx_sender::{TxCache, TxSender}, + tx_sender::TxSender, web3::{backend_jsonrpsee::internal_error, TypedFilter}, }, sync_layer::SyncState, @@ -114,42 +114,6 @@ impl InternalApiConfig { } } -#[derive(Debug, Clone)] -pub(crate) struct TxProxyCacheUpdater; - -impl TxProxyCacheUpdater { - /// Creates a handle to the last sealed miniblock number together with a task that will update - /// it on a schedule. - pub async fn run( - connection_pool: ConnectionPool, - tx_cache: Arc>, - update_interval: Duration, - stop_receiver: watch::Receiver, - ) -> anyhow::Result<()> { - loop { - if *stop_receiver.borrow() { - tracing::debug!("Stopping latest sealed miniblock updates"); - return Ok(()); - } - - let hashes = tx_cache.read().await.get_txs_hash(); - let mut connection = connection_pool.access_storage_tagged("api").await?; - let applied_tx_hashes = connection - .transactions_dal() - .check_tx_hashes(&hashes) - .await?; - for (account, nonce) in applied_tx_hashes { - tx_cache - .write() - .await - .remove_tx_by_account_nonce(account, nonce); - } - drop(connection); - tokio::time::sleep(update_interval).await; - } - } -} - /// Thread-safe updatable information about the last sealed miniblock number. /// /// The information may be temporarily outdated and thus should only be used where this is OK From 292b0361c679f613b68dbb512d73526255200543 Mon Sep 17 00:00:00 2001 From: Danil Date: Tue, 6 Feb 2024 10:36:03 +0100 Subject: [PATCH 07/16] Implement getting the last nonce Signed-off-by: Danil --- ...4e13d46b5ea54a144ec2345a76b865c85f84b.json | 28 ++++++++++++++ core/lib/dal/src/storage_web3_dal.rs | 38 +++++++++++++++++-- 2 files changed, 62 insertions(+), 4 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b.json diff --git a/core/lib/dal/.sqlx/query-24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b.json b/core/lib/dal/.sqlx/query-24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b.json new file mode 100644 index 000000000000..d28ef3efdc2e --- /dev/null +++ b/core/lib/dal/.sqlx/query-24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n initiator_address,\n MAX(nonce)\n FROM\n transactions\n WHERE\n initiator_address = ANY ($1)\n AND is_priority = FALSE\n AND (\n miniblock_number IS NOT NULL\n OR error IS NULL\n )\n GROUP BY\n initiator_address\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "initiator_address", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "max", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "24fa393b7c3a0178806a223ad474e13d46b5ea54a144ec2345a76b865c85f84b" +} diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index 1a64a157026e..25f24c245e09 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -35,11 +35,41 @@ impl StorageWeb3Dal<'_, '_> { /// Returns the current *stored* nonces (i.e., w/o accounting for pending transactions) for the specified accounts. pub async fn get_nonces_for_addresses( &mut self, - _addresses: &[Address], + addresses: &[Address], ) -> sqlx::Result> { - todo!() - // You should be able to implement this. This method could be reused in `MempoolFetcher`, esp. after - // https://github.com/matter-labs/zksync-era/pull/982 is merged + let res = sqlx::query!( + r#" + SELECT + initiator_address, + MAX(nonce) + FROM + transactions + WHERE + initiator_address = ANY ($1) + AND is_priority = FALSE + AND ( + miniblock_number IS NOT NULL + OR error IS NULL + ) + GROUP BY + initiator_address + "#, + &addresses + .iter() + .map(|address| address.as_bytes().to_vec()) + .collect::>() + ) + .fetch_all(self.storage.conn()) + .await? + .into_iter() + .map(|row| { + ( + Address::from_slice(&row.initiator_address), + Nonce(row.max.unwrap_or_default() as u32), + ) + }) + .collect(); + Ok(res) } pub async fn standard_token_historical_balance( From 2121d598f649dce75cc0cf59b94eea0d263feebc Mon Sep 17 00:00:00 2001 From: Danil Date: Tue, 6 Feb 2024 17:31:28 +0100 Subject: [PATCH 08/16] Return removing txs back Signed-off-by: Danil --- core/lib/zksync_core/src/api_server/tx_sender/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index 48e862a6cc57..49cdbc3de41c 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -352,6 +352,10 @@ impl TxSender { // Before it reaches the main node. proxy.save_tx(tx.clone()).await; proxy.submit_tx(&tx).await?; + // Now, after we are sure that the tx is on the main node, remove it from cache + // since we don't want to store txs that might have been replaced or otherwise removed + // from the mempool. + proxy.forget_tx(tx.hash()).await; SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed()); APP_METRICS.processed_txs[&TxStage::Proxied].inc(); return Ok(L2TxSubmissionResult::Proxied); From bc189ba8ce225d8c9c59765af3b534d881eb15df Mon Sep 17 00:00:00 2001 From: Danil Date: Wed, 7 Feb 2024 10:45:16 +0100 Subject: [PATCH 09/16] Rename last_miniblock_pool Signed-off-by: Danil --- .../zksync_core/src/api_server/web3/mod.rs | 24 +++++++++---------- core/lib/zksync_core/src/lib.rs | 6 ++--- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index a77c8c4e3ab5..63320c1f6c06 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -120,7 +120,7 @@ struct OptionalApiParams { #[derive(Debug)] struct FullApiParams { pool: ConnectionPool, - last_miniblock_pool: ConnectionPool, + updaters_pool: ConnectionPool, config: InternalApiConfig, transport: ApiTransport, tx_sender: TxSender, @@ -133,7 +133,7 @@ struct FullApiParams { #[derive(Debug)] pub struct ApiBuilder { pool: ConnectionPool, - last_miniblock_pool: ConnectionPool, + updaters_pool: ConnectionPool, config: InternalApiConfig, polling_interval: Duration, // Mandatory params that must be set using builder methods. @@ -151,7 +151,7 @@ impl ApiBuilder { pub fn jsonrpsee_backend(config: InternalApiConfig, pool: ConnectionPool) -> Self { Self { - last_miniblock_pool: pool.clone(), + updaters_pool: pool.clone(), pool, config, polling_interval: Self::DEFAULT_POLLING_INTERVAL, @@ -173,11 +173,12 @@ impl ApiBuilder { self } - /// Configures a dedicated DB pool to be used for updating the latest miniblock information + /// Configures a dedicated DB pool to be used for updating different information, + /// such as last mined block number or account nonces. This pool is used to execute /// in a background task. If not called, the main pool will be used. If the API server is under high load, /// it may make sense to supply a single-connection pool to reduce pool contention with the API methods. - pub fn with_last_miniblock_pool(mut self, pool: ConnectionPool) -> Self { - self.last_miniblock_pool = pool; + pub fn with_updaters_pool(mut self, pool: ConnectionPool) -> Self { + self.updaters_pool = pool; self } @@ -245,7 +246,7 @@ impl ApiBuilder { fn into_full_params(self) -> anyhow::Result { Ok(FullApiParams { pool: self.pool, - last_miniblock_pool: self.last_miniblock_pool, + updaters_pool: self.updaters_pool, config: self.config, transport: self.transport.context("API transport not set")?, tx_sender: self.tx_sender.context("Transaction sender not set")?, @@ -276,10 +277,7 @@ impl FullApiParams { self, last_sealed_miniblock: SealedMiniblockNumber, ) -> anyhow::Result { - let mut storage = self - .last_miniblock_pool - .access_storage_tagged("api") - .await?; + let mut storage = self.updaters_pool.access_storage_tagged("api").await?; let start_info = BlockStartInfo::new(&mut storage).await?; drop(storage); @@ -407,7 +405,7 @@ impl FullApiParams { let (health_check, health_updater) = ReactiveHealthCheck::new(health_check_name); let (last_sealed_miniblock, update_task) = SealedMiniblockNumber::new( - self.last_miniblock_pool.clone(), + self.updaters_pool.clone(), SEALED_MINIBLOCK_UPDATE_INTERVAL, stop_receiver.clone(), ); @@ -415,7 +413,7 @@ impl FullApiParams { let mut tasks = vec![tokio::spawn(update_task)]; if let Some(tx_proxy) = &self.tx_sender.0.proxy { let task = tx_proxy - .run_account_nonce_sweeper(self.last_miniblock_pool.clone(), stop_receiver.clone()); + .run_account_nonce_sweeper(self.updaters_pool.clone(), stop_receiver.clone()); tasks.push(tokio::spawn(task)); } let pub_sub = if matches!(transport, ApiTransport::WebSocket(_)) diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 3f9a1f690d42..62030810b8d9 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -1100,7 +1100,7 @@ async fn run_http_api( } namespaces.push(Namespace::Snapshots); - let last_miniblock_pool = ConnectionPool::singleton(postgres_config.replica_url()?) + let updaters_pool = ConnectionPool::singleton(postgres_config.replica_url()?) .build() .await .context("failed to build last_miniblock_pool")?; @@ -1108,7 +1108,7 @@ async fn run_http_api( let api_builder = web3::ApiBuilder::jsonrpsee_backend(internal_api.clone(), replica_connection_pool) .http(api_config.web3_json_rpc.http_port) - .with_last_miniblock_pool(last_miniblock_pool) + .with_updaters_pool(updaters_pool) .with_filter_limit(api_config.web3_json_rpc.filters_limit()) .with_tree_api(api_config.web3_json_rpc.tree_api_url()) .with_batch_request_size_limit(api_config.web3_json_rpc.max_batch_request_size()) @@ -1152,7 +1152,7 @@ async fn run_ws_api( let api_builder = web3::ApiBuilder::jsonrpsee_backend(internal_api.clone(), replica_connection_pool) .ws(api_config.web3_json_rpc.ws_port) - .with_last_miniblock_pool(last_miniblock_pool) + .with_updaters_pool(last_miniblock_pool) .with_filter_limit(api_config.web3_json_rpc.filters_limit()) .with_subscriptions_limit(api_config.web3_json_rpc.subscriptions_limit()) .with_batch_request_size_limit(api_config.web3_json_rpc.max_batch_request_size()) From b9fb3ee6bd7d9e13c16ac5ac0fa5d1b5e6393d75 Mon Sep 17 00:00:00 2001 From: Danil Date: Wed, 7 Feb 2024 16:43:54 +0100 Subject: [PATCH 10/16] Calculate pending nonce only for pending block Signed-off-by: Danil --- ...bb824c2da5d81d4765fe72a5d9c7994a0d2fe.json | 28 ------------ core/lib/dal/src/transactions_dal.rs | 34 +------------- .../src/api_server/tx_sender/proxy.rs | 15 +++++++ .../src/api_server/web3/namespaces/eth.rs | 45 +++++++++---------- 4 files changed, 37 insertions(+), 85 deletions(-) delete mode 100644 core/lib/dal/.sqlx/query-5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe.json diff --git a/core/lib/dal/.sqlx/query-5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe.json b/core/lib/dal/.sqlx/query-5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe.json deleted file mode 100644 index e314b4e57dec..000000000000 --- a/core/lib/dal/.sqlx/query-5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n initiator_address AS \"initiator_address!\",\n nonce AS \"nonce!\"\n FROM\n transactions\n WHERE\n hash = ANY ($1)\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "initiator_address!", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "nonce!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "ByteaArray" - ] - }, - "nullable": [ - false, - true - ] - }, - "hash": "5e0a5ed220014ae1c1f16bf25afbb824c2da5d81d4765fe72a5d9c7994a0d2fe" -} diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index fd3f48867e12..f86af45a36aa 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -12,8 +12,8 @@ use zksync_types::{ protocol_version::ProtocolUpgradeTx, tx::{tx_execution_info::TxExecutionStatus, TransactionExecutionResult}, vm_trace::{Call, VmExecutionTrace}, - Address, ExecuteTransactionCommon, L1BatchNumber, L1BlockNumber, MiniblockNumber, Nonce, - PriorityOpId, Transaction, H256, PROTOCOL_UPGRADE_TX_TYPE, U256, + Address, ExecuteTransactionCommon, L1BatchNumber, L1BlockNumber, MiniblockNumber, PriorityOpId, + Transaction, H256, PROTOCOL_UPGRADE_TX_TYPE, U256, }; use zksync_utils::u256_to_big_decimal; @@ -1344,36 +1344,6 @@ impl TransactionsDal<'_, '_> { .unwrap() .map(|tx| tx.into()) } - - pub async fn check_tx_hashes( - &mut self, - tx_hashes: &[H256], - ) -> sqlx::Result> { - let hashes: Vec<_> = tx_hashes.iter().map(|hash| hash.as_bytes()).collect(); - let res = sqlx::query!( - r#" - SELECT - initiator_address AS "initiator_address!", - nonce AS "nonce!" - FROM - transactions - WHERE - hash = ANY ($1) - "#, - &hashes as &[&[u8]], - ) - .fetch_all(self.storage.conn()) - .await? - .into_iter() - .map(|row| { - ( - Address::from_slice(&row.initiator_address), - Nonce(row.nonce as u32), - ) - }) - .collect(); - Ok(res) - } } #[cfg(test)] diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index a81a7fc2f88b..309478bdbb95 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -133,6 +133,21 @@ impl TxProxy { self.tx_cache.get_nonces_for_account(account_address).await } + pub async fn next_nonce_by_initiator_account( + &self, + account_address: Address, + mut current_nonce: u32, + ) -> Nonce { + let nonces = self.get_nonces_by_account(account_address).await; + for pending_nonce in nonces { + // If nonce is not sequential, then we should not increment nonce. + if pending_nonce.0 == current_nonce + 1 { + current_nonce += 1; + } + } + Nonce(current_nonce + 1) + } + pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult { let input_data = tx.common_data.input_data().expect("raw tx is absent"); let raw_tx = zksync_types::Bytes(input_data.to_vec()); diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 350c7f8c2d05..ded0f7bd0be0 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -7,8 +7,10 @@ use zksync_types::{ l2::{L2Tx, TransactionType}, transaction_request::CallRequest, utils::decompose_full_nonce, - web3, - web3::types::{FeeHistory, SyncInfo, SyncState}, + web3::{ + self, + types::{FeeHistory, SyncInfo, SyncState}, + }, AccountTreeId, Bytes, MiniblockNumber, StorageKey, H256, L2_ETH_TOKEN_ADDRESS, U256, }; use zksync_utils::u256_to_h256; @@ -484,31 +486,24 @@ impl EthNamespace { if matches!(block_id, BlockId::Number(BlockNumber::Pending)) { let account_nonce_u64 = u64::try_from(account_nonce) .map_err(|err| internal_error(method_name, anyhow::anyhow!(err)))?; - account_nonce = connection - .transactions_web3_dal() - .next_nonce_by_initiator_account(address, account_nonce_u64) - .await - .map_err(|err| internal_error(method_name, err))?; + account_nonce = if let Some(proxy) = &self.state.tx_sender.0.proxy { + // EN: get pending nonces from the transaction cache + // We don't have mempool in EN, it's safe to use the proxy cache as a mempool + proxy + .next_nonce_by_initiator_account(address, account_nonce_u64 as u32) + .await + .0 + .into() + } else { + // Main node: get pending nonces from the mempool + connection + .transactions_web3_dal() + .next_nonce_by_initiator_account(address, account_nonce_u64) + .await + .map_err(|err| internal_error(method_name, err))? + }; } - // Number of txs accepted by the node. - // We should add the nonces from txs in our cache for making the nonce - // aligned with the main node. If the nonce is not sequential, - // then we should not increment nonce. - let mut actual_tx_number = account_nonce.as_u32().saturating_sub(1); - if let Some(proxy) = &self.state.tx_sender.0.proxy { - for pending_nonce in proxy.get_nonces_by_account(address).await { - // If nonce is not sequential, then we should not increment nonce. - if pending_nonce.0 == actual_tx_number + 1 { - actual_tx_number += 1; - } - } - }; - - // We should take the maximum nonce from the cache and the account nonce. - // If there are no txs in the cache the account nonce will be higher. - let account_nonce = std::cmp::max(account_nonce, U256::from(actual_tx_number)); - let block_diff = self.state.last_sealed_miniblock.diff(block_number); method_latency.observe(block_diff); Ok(account_nonce) From 7b829e9f80b48a970134fc6b3e1c92aaf43b1a38 Mon Sep 17 00:00:00 2001 From: Danil Date: Thu, 8 Feb 2024 17:43:33 +0100 Subject: [PATCH 11/16] Fix expected nonce Signed-off-by: Danil --- core/lib/zksync_core/src/api_server/tx_sender/proxy.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index 309478bdbb95..959621002f91 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -136,16 +136,17 @@ impl TxProxy { pub async fn next_nonce_by_initiator_account( &self, account_address: Address, - mut current_nonce: u32, + current_nonce: u32, ) -> Nonce { + let mut exprected_nonce = current_nonce; let nonces = self.get_nonces_by_account(account_address).await; for pending_nonce in nonces { // If nonce is not sequential, then we should not increment nonce. - if pending_nonce.0 == current_nonce + 1 { - current_nonce += 1; + if pending_nonce.0 == exprected_nonce { + exprected_nonce += 1; } } - Nonce(current_nonce + 1) + Nonce(exprected_nonce) } pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult { From c89533e12d4c12e89ebe86dafc3ba103697857e5 Mon Sep 17 00:00:00 2001 From: Danil Date: Fri, 9 Feb 2024 11:11:34 +0100 Subject: [PATCH 12/16] Fix nonces Signed-off-by: Danil --- .../lib/zksync_core/src/api_server/tx_sender/proxy.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index 959621002f91..1a8f22afffb4 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -89,7 +89,7 @@ impl TxCache { .copied() .unwrap_or(Nonce(0)); // Retain only nonces exceeding the stored one. - *account_nonces = account_nonces.split_off(&(stored_nonce + 1)); + *account_nonces = account_nonces.split_off(&(stored_nonce)); // If we've removed all nonces, drop the account entry so we don't request stored nonces for it later. !account_nonces.is_empty() }); @@ -138,15 +138,16 @@ impl TxProxy { account_address: Address, current_nonce: u32, ) -> Nonce { - let mut exprected_nonce = current_nonce; + let mut expected_nonce = current_nonce.saturating_sub(1); let nonces = self.get_nonces_by_account(account_address).await; for pending_nonce in nonces { // If nonce is not sequential, then we should not increment nonce. - if pending_nonce.0 == exprected_nonce { - exprected_nonce += 1; + if pending_nonce.0 == expected_nonce + 1 { + expected_nonce += 1; } } - Nonce(exprected_nonce) + + Nonce(std::cmp::max(expected_nonce, current_nonce)) } pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult { From 1fb86dd3fd28eb87859cf54ebe7a393c502bcf25 Mon Sep 17 00:00:00 2001 From: Danil Date: Fri, 9 Feb 2024 11:21:32 +0100 Subject: [PATCH 13/16] Use nonces from storage and not from transactions table Signed-off-by: Danil --- core/lib/dal/src/storage_web3_dal.rs | 48 ++++++++++------------------ 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index e0c3679c503b..0c41181e41cd 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -37,38 +37,22 @@ impl StorageWeb3Dal<'_, '_> { &mut self, addresses: &[Address], ) -> sqlx::Result> { - let res = sqlx::query!( - r#" - SELECT - initiator_address, - MAX(nonce) - FROM - transactions - WHERE - initiator_address = ANY ($1) - AND is_priority = FALSE - AND ( - miniblock_number IS NOT NULL - OR error IS NULL - ) - GROUP BY - initiator_address - "#, - &addresses - .iter() - .map(|address| address.as_bytes().to_vec()) - .collect::>() - ) - .fetch_all(self.storage.conn()) - .await? - .into_iter() - .map(|row| { - ( - Address::from_slice(&row.initiator_address), - Nonce(row.max.unwrap_or_default() as u32), - ) - }) - .collect(); + let nonce_keys: HashMap<_, _> = addresses + .iter() + .map(|address| (get_nonce_key(address).hashed_key(), *address)) + .collect(); + + let res = self + .get_values(&nonce_keys.keys().cloned().collect::>()) + .await? + .into_iter() + .filter_map(|(hashed_key, value)| { + let address = nonce_keys.get(&hashed_key)?; + let full_nonce = h256_to_u256(value); + let (nonce, _) = decompose_full_nonce(full_nonce); + Some((*address, Nonce(nonce.as_u32()))) + }) + .collect(); Ok(res) } From 5d872d0cae9c01aa5b24598b551dcca131654568 Mon Sep 17 00:00:00 2001 From: Danil Date: Tue, 13 Feb 2024 12:38:20 +0100 Subject: [PATCH 14/16] Use btree set for filtering small values Signed-off-by: Danil --- core/lib/dal/src/storage_web3_dal.rs | 2 +- .../src/api_server/tx_sender/proxy.rs | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index 0c41181e41cd..fca9a0e1c415 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -43,7 +43,7 @@ impl StorageWeb3Dal<'_, '_> { .collect(); let res = self - .get_values(&nonce_keys.keys().cloned().collect::>()) + .get_values(&nonce_keys.keys().copied().collect::>()) .await? .into_iter() .filter_map(|(hashed_key, value)| { diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index 1a8f22afffb4..9c62e3cb86ee 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -88,8 +88,8 @@ impl TxCache { .get(address) .copied() .unwrap_or(Nonce(0)); - // Retain only nonces exceeding the stored one. - *account_nonces = account_nonces.split_off(&(stored_nonce)); + // Retain only nonces starting from the stored one. + *account_nonces = account_nonces.split_off(&stored_nonce); // If we've removed all nonces, drop the account entry so we don't request stored nonces for it later. !account_nonces.is_empty() }); @@ -138,16 +138,20 @@ impl TxProxy { account_address: Address, current_nonce: u32, ) -> Nonce { - let mut expected_nonce = current_nonce.saturating_sub(1); - let nonces = self.get_nonces_by_account(account_address).await; - for pending_nonce in nonces { + let mut expected_nonce = Nonce(current_nonce); + let nonces = BTreeSet::from_iter( + self.get_nonces_by_account(account_address) + .await + .into_iter(), + ); + for pending_nonce in nonces.range(expected_nonce + 1..) { // If nonce is not sequential, then we should not increment nonce. - if pending_nonce.0 == expected_nonce + 1 { + if pending_nonce.0 == expected_nonce.0 { expected_nonce += 1; } } - Nonce(std::cmp::max(expected_nonce, current_nonce)) + expected_nonce } pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult { From 451a0a41de5f1a012355b930fd15b94d0b50ab5f Mon Sep 17 00:00:00 2001 From: Danil Date: Wed, 14 Feb 2024 16:57:11 +0100 Subject: [PATCH 15/16] refactor btree set usage Signed-off-by: Danil --- .../src/api_server/tx_sender/proxy.rs | 26 +++++++++---------- etc/tokens/sepolia.json | 14 ---------- 2 files changed, 12 insertions(+), 28 deletions(-) diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index 9c62e3cb86ee..18580c2f0831 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -44,12 +44,12 @@ impl TxCache { self.inner.read().await.tx_cache.get(&tx_hash).cloned() } - async fn get_nonces_for_account(&self, account_address: Address) -> Vec { + async fn get_nonces_for_account(&self, account_address: Address) -> BTreeSet { let inner = self.inner.read().await; if let Some(nonces) = inner.nonces_by_account.get(&account_address) { - nonces.iter().copied().collect() + nonces.clone() } else { - vec![] + BTreeSet::new() } } @@ -129,7 +129,7 @@ impl TxProxy { self.tx_cache.push(tx).await; } - pub async fn get_nonces_by_account(&self, account_address: Address) -> Vec { + pub async fn get_nonces_by_account(&self, account_address: Address) -> BTreeSet { self.tx_cache.get_nonces_for_account(account_address).await } @@ -138,20 +138,18 @@ impl TxProxy { account_address: Address, current_nonce: u32, ) -> Nonce { - let mut expected_nonce = Nonce(current_nonce); - let nonces = BTreeSet::from_iter( - self.get_nonces_by_account(account_address) - .await - .into_iter(), - ); - for pending_nonce in nonces.range(expected_nonce + 1..) { + let mut pending_nonce = Nonce(current_nonce); + let nonces = self.get_nonces_by_account(account_address).await; + for nonce in nonces.range(pending_nonce + 1..) { // If nonce is not sequential, then we should not increment nonce. - if pending_nonce.0 == expected_nonce.0 { - expected_nonce += 1; + if nonce == &pending_nonce { + pending_nonce += 1; + } else { + break; } } - expected_nonce + pending_nonce } pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult { diff --git a/etc/tokens/sepolia.json b/etc/tokens/sepolia.json index 899571df9e5a..e69de29bb2d1 100644 --- a/etc/tokens/sepolia.json +++ b/etc/tokens/sepolia.json @@ -1,14 +0,0 @@ -[ - { - "name": "DAI", - "symbol": "DAI", - "decimals": 18, - "address": "0x35EfF6eA96571ff475136117FdD92A9ba25b1f37" - }, - { - "name": "Wrapped Ether", - "symbol": "WETH", - "decimals": 18, - "address": "0x7b79995e5f793A07Bc00c21412e50Ecae098E7f9" - } -] From 7dd5ea8804e840b429afff329e1ddceaa779dbf9 Mon Sep 17 00:00:00 2001 From: Danil Date: Mon, 19 Feb 2024 17:04:22 +0100 Subject: [PATCH 16/16] Add more connections to updaters pool Signed-off-by: Danil --- core/lib/zksync_core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 62030810b8d9..159915937954 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -1100,7 +1100,7 @@ async fn run_http_api( } namespaces.push(Namespace::Snapshots); - let updaters_pool = ConnectionPool::singleton(postgres_config.replica_url()?) + let updaters_pool = ConnectionPool::builder(postgres_config.replica_url()?, 2) .build() .await .context("failed to build last_miniblock_pool")?;