Skip to content

Commit

Permalink
Remove tx only after syncing
Browse files Browse the repository at this point in the history
Signed-off-by: Danil <deniallugo@gmail.com>
  • Loading branch information
Deniallugo committed Feb 2, 2024
1 parent d842cfd commit 2bea009
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 22 deletions.
14 changes: 11 additions & 3 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ use clap::Parser;
use futures::{future::FusedFuture, FutureExt as _};
use metrics::EN_METRICS;
use prometheus_exporter::PrometheusExporterConfig;
use tokio::{sync::watch, task, time::sleep};
use tokio::{
sync::{watch, RwLock},
task,
time::sleep,
};
use zksync_basic_types::{Address, L2ChainId};
use zksync_concurrency::{ctx, scope};
use zksync_config::configs::database::MerkleTreeMode;
use zksync_core::{
api_server::{
execution_sandbox::VmConcurrencyLimiter,
healthcheck::HealthCheckHandle,
tx_sender::{ApiContracts, TxSenderBuilder},
tx_sender::{ApiContracts, TxCache, TxSenderBuilder},
web3::{ApiBuilder, Namespace},
},
block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert},
Expand Down Expand Up @@ -58,6 +62,7 @@ async fn build_state_keeper(
miniblock_sealer_handle: MiniblockSealerHandle,
stop_receiver: watch::Receiver<bool>,
chain_id: L2ChainId,
proxy_cache: Arc<RwLock<TxCache>>,
) -> ZkSyncStateKeeper {
// These config values are used on the main node, and depending on these values certain transactions can
// be *rejected* (that is, not included into the block). However, external node only mirrors what the main
Expand Down Expand Up @@ -91,6 +96,7 @@ async fn build_state_keeper(
l2_erc20_bridge_addr,
validation_computational_gas_limit,
chain_id,
proxy_cache,
)
.await;

Expand Down Expand Up @@ -154,6 +160,7 @@ async fn init_tasks(
tokio::time::sleep(Duration::from_secs(10)).await;
}
}));
let tx_cache = Arc::new(RwLock::new(TxCache::default()));

let state_keeper = build_state_keeper(
action_queue,
Expand All @@ -165,6 +172,7 @@ async fn init_tasks(
miniblock_sealer_handle,
stop_receiver.clone(),
config.remote.l2_chain_id,
tx_cache.clone(),
)
.await;

Expand Down Expand Up @@ -280,7 +288,7 @@ async fn init_tasks(
let tx_sender_builder =
TxSenderBuilder::new(config.clone().into(), connection_pool.clone())
.with_main_connection_pool(connection_pool.clone())
.with_tx_proxy(&main_node_url);
.with_tx_proxy(&main_node_url, tx_cache.clone());

if config.optional.transactions_per_sec_limit.is_some() {
tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored");
Expand Down
10 changes: 4 additions & 6 deletions core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use multivm::{
utils::{adjust_pubdata_price_for_tx, derive_base_fee_and_gas_per_pubdata, derive_overhead},
vm_latest::constants::{BLOCK_GAS_LIMIT, MAX_PUBDATA_PER_BLOCK},
};
use tokio::sync::RwLock;
use zksync_config::configs::{api::Web3JsonRpcConfig, chain::StateKeeperConfig};
use zksync_contracts::BaseSystemContracts;
use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, StorageProcessor};
Expand All @@ -27,6 +28,7 @@ use zksync_types::{
use zksync_utils::h256_to_u256;

pub(super) use self::{proxy::TxProxy, result::SubmitTxError};
pub use crate::api_server::tx_sender::proxy::TxCache;
use crate::{
api_server::{
execution_sandbox::{
Expand Down Expand Up @@ -163,8 +165,8 @@ impl TxSenderBuilder {
self
}

pub fn with_tx_proxy(mut self, main_node_url: &str) -> Self {
self.proxy = Some(TxProxy::new(main_node_url));
pub fn with_tx_proxy(mut self, main_node_url: &str, tx_cache: Arc<RwLock<TxCache>>) -> Self {
self.proxy = Some(TxProxy::new(main_node_url, tx_cache));
self
}

Expand Down Expand Up @@ -352,10 +354,6 @@ impl TxSender {
// Before it reaches the main node.
proxy.save_tx(tx.hash(), tx.clone()).await;
proxy.submit_tx(&tx).await?;
// Now, after we are sure that the tx is on the main node, remove it from cache
// since we don't want to store txs that might have been replaced or otherwise removed
// from the mempool.
proxy.forget_tx(tx.hash()).await;
SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed());
APP_METRICS.processed_txs[&TxStage::Proxied].inc();
return Ok(L2TxSubmissionResult::Proxied);
Expand Down
15 changes: 6 additions & 9 deletions core/lib/zksync_core/src/api_server/tx_sender/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use itertools::Itertools;
use tokio::sync::RwLock;
Expand All @@ -14,7 +14,7 @@ use zksync_web3_decl::{
};

#[derive(Debug, Default)]
struct TxCache {
pub struct TxCache {
tx_cache: HashMap<H256, L2Tx>,
tx_cache_by_account: HashMap<Address, Vec<H256>>,
}
Expand Down Expand Up @@ -47,7 +47,7 @@ impl TxCache {
txs.into_iter().sorted_by_key(|tx| tx.nonce()).collect()
}

fn remove_tx(&mut self, tx_hash: &H256) {
pub(crate) fn remove_tx(&mut self, tx_hash: &H256) {
let tx = self.tx_cache.remove(tx_hash);
if let Some(tx) = tx {
let account_tx_hashes = self
Expand All @@ -64,17 +64,14 @@ impl TxCache {
/// and store them while they're not synced back yet
#[derive(Debug)]
pub struct TxProxy {
tx_cache: RwLock<TxCache>,
tx_cache: Arc<RwLock<TxCache>>,
client: HttpClient,
}

impl TxProxy {
pub fn new(main_node_url: &str) -> Self {
pub fn new(main_node_url: &str, tx_cache: Arc<RwLock<TxCache>>) -> Self {
let client = HttpClientBuilder::default().build(main_node_url).unwrap();
Self {
client,
tx_cache: RwLock::new(TxCache::default()),
}
Self { client, tx_cache }
}

pub async fn find_tx(&self, tx_hash: H256) -> Option<L2Tx> {
Expand Down
3 changes: 3 additions & 0 deletions core/lib/zksync_core/src/consensus/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ impl StateKeeperRunner {
let (stop_sender, stop_receiver) = sync::watch::channel(false);
let (miniblock_sealer, miniblock_sealer_handle) =
MiniblockSealer::new(self.pool.clone(), 5);
let tx_cache = Arc::new(RwLock::new(TxCache::default()));

let io = ExternalIO::new(
miniblock_sealer_handle,
self.pool.clone(),
Expand All @@ -384,6 +386,7 @@ impl StateKeeperRunner {
Address::repeat_byte(11),
u32::MAX,
L2ChainId::default(),
tx_cache,
)
.await;
s.spawn_bg(miniblock_sealer.run());
Expand Down
11 changes: 10 additions & 1 deletion core/lib/zksync_core/src/sync_layer/external_io.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{collections::HashMap, convert::TryInto, iter::FromIterator, time::Duration};
use std::{collections::HashMap, convert::TryInto, iter::FromIterator, sync::Arc, time::Duration};

use async_trait::async_trait;
use futures::future;
use multivm::interface::{FinishedL1Batch, L1BatchEnv, SystemEnv};
use tokio::sync::RwLock;
use vm_utils::storage::wait_for_prev_l1_batch_params;
use zksync_contracts::{BaseSystemContracts, SystemContractCode};
use zksync_dal::ConnectionPool;
Expand All @@ -19,6 +20,7 @@ use super::{
SyncState,
};
use crate::{
api_server::tx_sender::TxCache,
metrics::{BlockStage, APP_METRICS},
state_keeper::{
io::{
Expand Down Expand Up @@ -57,6 +59,7 @@ pub struct ExternalIO {
// TODO it's required for system env, probably we have to get rid of getting system env
validation_computational_gas_limit: u32,
chain_id: L2ChainId,
proxy_cache: Arc<RwLock<TxCache>>,
}

impl ExternalIO {
Expand All @@ -70,6 +73,7 @@ impl ExternalIO {
l2_erc20_bridge_addr: Address,
validation_computational_gas_limit: u32,
chain_id: L2ChainId,
proxy_cache: Arc<RwLock<TxCache>>,
) -> Self {
let mut storage = pool.access_storage_tagged("sync_layer").await.unwrap();
// TODO (PLA-703): Support no L1 batches / miniblocks in the storage
Expand Down Expand Up @@ -109,6 +113,7 @@ impl ExternalIO {
l2_erc20_bridge_addr,
validation_computational_gas_limit,
chain_id,
proxy_cache,
}
}

Expand Down Expand Up @@ -423,6 +428,10 @@ impl StateKeeperIO for ExternalIO {
let SyncAction::Tx(tx) = actions.pop_action().unwrap() else {
unreachable!()
};
// Now, after we are sure that the tx is on the main node, remove it from cache
// since we don't want to store txs that might have been replaced or otherwise removed
// from the mempool.
self.proxy_cache.write().await.remove_tx(&tx.hash());
return Some(*tx);
}
_ => {
Expand Down
14 changes: 11 additions & 3 deletions core/lib/zksync_core/src/sync_layer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
use std::{
collections::{HashMap, VecDeque},
iter,
sync::Arc,
time::{Duration, Instant},
};

use tokio::{sync::watch, task::JoinHandle};
use tokio::{
sync::{watch, RwLock},
task::JoinHandle,
};
use zksync_config::configs::chain::NetworkConfig;
use zksync_dal::{ConnectionPool, StorageProcessor};
use zksync_types::{
Expand All @@ -16,7 +20,10 @@ use zksync_types::{

use super::{fetcher::FetcherCursor, sync_action::SyncAction, *};
use crate::{
api_server::web3::tests::spawn_http_server,
api_server::{
tx_sender::proxy::{TxCache, TxProxy},
web3::tests::spawn_http_server,
},
consensus::testonly::MockMainNodeClient,
genesis::{ensure_genesis_state, GenesisParams},
state_keeper::{
Expand Down Expand Up @@ -61,7 +68,7 @@ impl StateKeeperHandles {
let sync_state = SyncState::new();
let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(pool.clone(), 5);
tokio::spawn(miniblock_sealer.run());

let tx_cache = Arc::new(RwLock::new(TxCache::default()));
let io = ExternalIO::new(
miniblock_sealer_handle,
pool,
Expand All @@ -71,6 +78,7 @@ impl StateKeeperHandles {
Address::repeat_byte(1),
u32::MAX,
L2ChainId::default(),
tx_cache,
)
.await;

Expand Down

0 comments on commit 2bea009

Please sign in to comment.