diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 8765d656ca59..475f3cb09bc1 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -5,7 +5,11 @@ use clap::Parser; use futures::{future::FusedFuture, FutureExt as _}; use metrics::EN_METRICS; use prometheus_exporter::PrometheusExporterConfig; -use tokio::{sync::watch, task, time::sleep}; +use tokio::{ + sync::{watch, RwLock}, + task, + time::sleep, +}; use zksync_basic_types::{Address, L2ChainId}; use zksync_concurrency::{ctx, scope}; use zksync_config::configs::database::MerkleTreeMode; @@ -13,7 +17,7 @@ use zksync_core::{ api_server::{ execution_sandbox::VmConcurrencyLimiter, healthcheck::HealthCheckHandle, - tx_sender::{ApiContracts, TxSenderBuilder}, + tx_sender::{ApiContracts, TxCache, TxSenderBuilder}, web3::{ApiBuilder, Namespace}, }, block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert}, @@ -58,6 +62,7 @@ async fn build_state_keeper( miniblock_sealer_handle: MiniblockSealerHandle, stop_receiver: watch::Receiver, chain_id: L2ChainId, + proxy_cache: Arc>, ) -> ZkSyncStateKeeper { // These config values are used on the main node, and depending on these values certain transactions can // be *rejected* (that is, not included into the block). However, external node only mirrors what the main @@ -91,6 +96,7 @@ async fn build_state_keeper( l2_erc20_bridge_addr, validation_computational_gas_limit, chain_id, + proxy_cache, ) .await; @@ -154,6 +160,7 @@ async fn init_tasks( tokio::time::sleep(Duration::from_secs(10)).await; } })); + let tx_cache = Arc::new(RwLock::new(TxCache::default())); let state_keeper = build_state_keeper( action_queue, @@ -165,6 +172,7 @@ async fn init_tasks( miniblock_sealer_handle, stop_receiver.clone(), config.remote.l2_chain_id, + tx_cache.clone(), ) .await; @@ -280,7 +288,7 @@ async fn init_tasks( let tx_sender_builder = TxSenderBuilder::new(config.clone().into(), connection_pool.clone()) .with_main_connection_pool(connection_pool.clone()) - .with_tx_proxy(&main_node_url); + .with_tx_proxy(&main_node_url, tx_cache.clone()); if config.optional.transactions_per_sec_limit.is_some() { tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored"); diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index 7d5f1ab483fe..fb23cc7c1d08 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -8,6 +8,7 @@ use multivm::{ utils::{adjust_pubdata_price_for_tx, derive_base_fee_and_gas_per_pubdata, derive_overhead}, vm_latest::constants::{BLOCK_GAS_LIMIT, MAX_PUBDATA_PER_BLOCK}, }; +use tokio::sync::RwLock; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::StateKeeperConfig}; use zksync_contracts::BaseSystemContracts; use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, StorageProcessor}; @@ -27,6 +28,7 @@ use zksync_types::{ use zksync_utils::h256_to_u256; pub(super) use self::{proxy::TxProxy, result::SubmitTxError}; +pub use crate::api_server::tx_sender::proxy::TxCache; use crate::{ api_server::{ execution_sandbox::{ @@ -163,8 +165,8 @@ impl TxSenderBuilder { self } - pub fn with_tx_proxy(mut self, main_node_url: &str) -> Self { - self.proxy = Some(TxProxy::new(main_node_url)); + pub fn with_tx_proxy(mut self, main_node_url: &str, tx_cache: Arc>) -> Self { + self.proxy = Some(TxProxy::new(main_node_url, tx_cache)); self } @@ -352,10 +354,6 @@ impl TxSender { // Before it reaches the main node. proxy.save_tx(tx.hash(), tx.clone()).await; proxy.submit_tx(&tx).await?; - // Now, after we are sure that the tx is on the main node, remove it from cache - // since we don't want to store txs that might have been replaced or otherwise removed - // from the mempool. - proxy.forget_tx(tx.hash()).await; SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed()); APP_METRICS.processed_txs[&TxStage::Proxied].inc(); return Ok(L2TxSubmissionResult::Proxied); diff --git a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs index fc1b01076254..4ea3ed516932 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/proxy.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use itertools::Itertools; use tokio::sync::RwLock; @@ -14,7 +14,7 @@ use zksync_web3_decl::{ }; #[derive(Debug, Default)] -struct TxCache { +pub struct TxCache { tx_cache: HashMap, tx_cache_by_account: HashMap>, } @@ -47,7 +47,7 @@ impl TxCache { txs.into_iter().sorted_by_key(|tx| tx.nonce()).collect() } - fn remove_tx(&mut self, tx_hash: &H256) { + pub(crate) fn remove_tx(&mut self, tx_hash: &H256) { let tx = self.tx_cache.remove(tx_hash); if let Some(tx) = tx { let account_tx_hashes = self @@ -64,17 +64,14 @@ impl TxCache { /// and store them while they're not synced back yet #[derive(Debug)] pub struct TxProxy { - tx_cache: RwLock, + tx_cache: Arc>, client: HttpClient, } impl TxProxy { - pub fn new(main_node_url: &str) -> Self { + pub fn new(main_node_url: &str, tx_cache: Arc>) -> Self { let client = HttpClientBuilder::default().build(main_node_url).unwrap(); - Self { - client, - tx_cache: RwLock::new(TxCache::default()), - } + Self { client, tx_cache } } pub async fn find_tx(&self, tx_hash: H256) -> Option { diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index 88a23d17ba37..714c63aa70e1 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -375,6 +375,8 @@ impl StateKeeperRunner { let (stop_sender, stop_receiver) = sync::watch::channel(false); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(self.pool.clone(), 5); + let tx_cache = Arc::new(RwLock::new(TxCache::default())); + let io = ExternalIO::new( miniblock_sealer_handle, self.pool.clone(), @@ -384,6 +386,7 @@ impl StateKeeperRunner { Address::repeat_byte(11), u32::MAX, L2ChainId::default(), + tx_cache, ) .await; s.spawn_bg(miniblock_sealer.run()); diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index c800e83703f8..566b6613d5c0 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -1,8 +1,9 @@ -use std::{collections::HashMap, convert::TryInto, iter::FromIterator, time::Duration}; +use std::{collections::HashMap, convert::TryInto, iter::FromIterator, sync::Arc, time::Duration}; use async_trait::async_trait; use futures::future; use multivm::interface::{FinishedL1Batch, L1BatchEnv, SystemEnv}; +use tokio::sync::RwLock; use vm_utils::storage::wait_for_prev_l1_batch_params; use zksync_contracts::{BaseSystemContracts, SystemContractCode}; use zksync_dal::ConnectionPool; @@ -19,6 +20,7 @@ use super::{ SyncState, }; use crate::{ + api_server::tx_sender::TxCache, metrics::{BlockStage, APP_METRICS}, state_keeper::{ io::{ @@ -57,6 +59,7 @@ pub struct ExternalIO { // TODO it's required for system env, probably we have to get rid of getting system env validation_computational_gas_limit: u32, chain_id: L2ChainId, + proxy_cache: Arc>, } impl ExternalIO { @@ -70,6 +73,7 @@ impl ExternalIO { l2_erc20_bridge_addr: Address, validation_computational_gas_limit: u32, chain_id: L2ChainId, + proxy_cache: Arc>, ) -> Self { let mut storage = pool.access_storage_tagged("sync_layer").await.unwrap(); // TODO (PLA-703): Support no L1 batches / miniblocks in the storage @@ -109,6 +113,7 @@ impl ExternalIO { l2_erc20_bridge_addr, validation_computational_gas_limit, chain_id, + proxy_cache, } } @@ -423,6 +428,10 @@ impl StateKeeperIO for ExternalIO { let SyncAction::Tx(tx) = actions.pop_action().unwrap() else { unreachable!() }; + // Now, after we are sure that the tx is on the main node, remove it from cache + // since we don't want to store txs that might have been replaced or otherwise removed + // from the mempool. + self.proxy_cache.write().await.remove_tx(&tx.hash()); return Some(*tx); } _ => { diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index 2fc010f4a78f..15272a9260cb 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -3,10 +3,14 @@ use std::{ collections::{HashMap, VecDeque}, iter, + sync::Arc, time::{Duration, Instant}, }; -use tokio::{sync::watch, task::JoinHandle}; +use tokio::{ + sync::{watch, RwLock}, + task::JoinHandle, +}; use zksync_config::configs::chain::NetworkConfig; use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_types::{ @@ -16,7 +20,10 @@ use zksync_types::{ use super::{fetcher::FetcherCursor, sync_action::SyncAction, *}; use crate::{ - api_server::web3::tests::spawn_http_server, + api_server::{ + tx_sender::proxy::{TxCache, TxProxy}, + web3::tests::spawn_http_server, + }, consensus::testonly::MockMainNodeClient, genesis::{ensure_genesis_state, GenesisParams}, state_keeper::{ @@ -61,7 +68,7 @@ impl StateKeeperHandles { let sync_state = SyncState::new(); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(pool.clone(), 5); tokio::spawn(miniblock_sealer.run()); - + let tx_cache = Arc::new(RwLock::new(TxCache::default())); let io = ExternalIO::new( miniblock_sealer_handle, pool, @@ -71,6 +78,7 @@ impl StateKeeperHandles { Address::repeat_byte(1), u32::MAX, L2ChainId::default(), + tx_cache, ) .await;