Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify #1019

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ use clap::Parser;
use futures::{future::FusedFuture, FutureExt as _};
use metrics::EN_METRICS;
use prometheus_exporter::PrometheusExporterConfig;
use tokio::{
sync::{watch, RwLock},
task,
time::sleep,
};
use tokio::{sync::watch, task, time::sleep};
use zksync_basic_types::{Address, L2ChainId};
use zksync_concurrency::{ctx, scope};
use zksync_config::configs::database::MerkleTreeMode;
use zksync_core::{
api_server::{
execution_sandbox::VmConcurrencyLimiter,
healthcheck::HealthCheckHandle,
tx_sender::{ApiContracts, TxCache, TxSenderBuilder},
tx_sender::{ApiContracts, TxSenderBuilder},
web3::{ApiBuilder, Namespace},
},
block_reverter::{BlockReverter, BlockReverterFlags, L1ExecutedBatchesRevert},
Expand Down Expand Up @@ -158,7 +154,6 @@ async fn init_tasks(
tokio::time::sleep(Duration::from_secs(10)).await;
}
}));
let tx_cache = Arc::new(RwLock::new(TxCache::default()));

let state_keeper = build_state_keeper(
action_queue,
Expand Down Expand Up @@ -281,11 +276,11 @@ async fn init_tasks(
let fee_params_fetcher_handle =
tokio::spawn(fee_params_fetcher.clone().run(stop_receiver.clone()));

let (tx_sender, vm_barrier, cache_update_handle, tx_cache) = {
let (tx_sender, vm_barrier, cache_update_handle) = {
let tx_sender_builder =
TxSenderBuilder::new(config.clone().into(), connection_pool.clone())
.with_main_connection_pool(connection_pool.clone())
.with_tx_proxy(&main_node_url, tx_cache.clone());
.with_tx_proxy(&main_node_url);

if config.optional.transactions_per_sec_limit.is_some() {
tracing::warn!("`transactions_per_sec_limit` option is deprecated and ignored");
Expand Down Expand Up @@ -314,7 +309,7 @@ async fn init_tasks(
storage_caches,
)
.await;
(tx_sender, vm_barrier, cache_update_handle, tx_cache)
(tx_sender, vm_barrier, cache_update_handle)
};

let http_server_handles =
Expand All @@ -325,7 +320,6 @@ async fn init_tasks(
.with_response_body_size_limit(config.optional.max_response_body_size())
.with_tx_sender(tx_sender.clone(), vm_barrier.clone())
.with_sync_state(sync_state.clone())
.with_tx_cache_updater(tx_cache)
.enable_api_namespaces(config.optional.api_namespaces())
.build(stop_receiver.clone())
.await
Expand Down
14 changes: 12 additions & 2 deletions core/lib/dal/src/storage_web3_dal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::ops;
use std::{collections::HashMap, ops};

use zksync_types::{
get_code_key, get_nonce_key,
utils::{decompose_full_nonce, storage_key_for_standard_token_balance},
AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey,
AccountTreeId, Address, L1BatchNumber, MiniblockNumber, Nonce, StorageKey,
FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, U256,
};
use zksync_utils::h256_to_u256;
Expand Down Expand Up @@ -32,6 +32,16 @@ impl StorageWeb3Dal<'_, '_> {
Ok(decompose_full_nonce(full_nonce).0)
}

/// Returns the current *stored* nonces (i.e., w/o accounting for pending transactions) for the specified accounts.
pub async fn get_nonces_for_addresses(
&mut self,
_addresses: &[Address],
) -> sqlx::Result<HashMap<Address, Nonce>> {
todo!()
// You should be able to implement this. This method could be reused in `MempoolFetcher`, esp. after
// https://github.com/matter-labs/zksync-era/pull/982 is merged
}

pub async fn standard_token_historical_balance(
&mut self,
token_id: AccountTreeId,
Expand Down
8 changes: 3 additions & 5 deletions core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use multivm::{
utils::{adjust_pubdata_price_for_tx, derive_base_fee_and_gas_per_pubdata, derive_overhead},
vm_latest::constants::{BLOCK_GAS_LIMIT, MAX_PUBDATA_PER_BLOCK},
};
use tokio::sync::RwLock;
use zksync_config::configs::{api::Web3JsonRpcConfig, chain::StateKeeperConfig};
use zksync_contracts::BaseSystemContracts;
use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, StorageProcessor};
Expand All @@ -28,7 +27,6 @@ use zksync_types::{
use zksync_utils::h256_to_u256;

pub(super) use self::{proxy::TxProxy, result::SubmitTxError};
pub use crate::api_server::tx_sender::proxy::TxCache;
use crate::{
api_server::{
execution_sandbox::{
Expand Down Expand Up @@ -165,8 +163,8 @@ impl TxSenderBuilder {
self
}

pub fn with_tx_proxy(mut self, main_node_url: &str, tx_cache: Arc<RwLock<TxCache>>) -> Self {
self.proxy = Some(TxProxy::new(main_node_url, tx_cache));
pub fn with_tx_proxy(mut self, main_node_url: &str) -> Self {
self.proxy = Some(TxProxy::new(main_node_url));
self
}

Expand Down Expand Up @@ -352,7 +350,7 @@ impl TxSender {
// We're running an external node: we have to proxy the transaction to the main node.
// But before we do that, save the tx to cache in case someone will request it
// Before it reaches the main node.
proxy.save_tx(tx.hash(), tx.clone()).await;
proxy.save_tx(tx.clone()).await;
proxy.submit_tx(&tx).await?;
SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy].observe(stage_started_at.elapsed());
APP_METRICS.processed_txs[&TxStage::Proxied].inc();
Expand Down
151 changes: 90 additions & 61 deletions core/lib/zksync_core/src/api_server/tx_sender/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{BTreeSet, HashMap},
future::Future,
sync::Arc,
time::Duration,
};

use itertools::Itertools;
use tokio::sync::RwLock;
use tokio::sync::{watch, RwLock};
use zksync_dal::ConnectionPool;
use zksync_types::{
api::{BlockId, Transaction, TransactionDetails, TransactionId},
l2::L2Tx,
Expand All @@ -13,69 +18,84 @@ use zksync_web3_decl::{
RpcResult,
};

#[derive(Debug, Clone, Default)]
pub(crate) struct TxCache {
inner: Arc<RwLock<TxCacheInner>>,
}

#[derive(Debug, Default)]
pub struct TxCache {
struct TxCacheInner {
tx_cache: HashMap<H256, L2Tx>,
tx_cache_by_account: HashMap<Address, Vec<H256>>,
tx_cache_by_account_nonce: HashMap<(Address, Nonce), H256>,
nonces_by_account: HashMap<Address, BTreeSet<Nonce>>,
}

impl TxCache {
fn push(&mut self, tx_hash: H256, tx: L2Tx) {
let account_address = tx.common_data.initiator_address;
let nonce = tx.common_data.nonce;
self.tx_cache.insert(tx_hash, tx);
self.tx_cache_by_account
.entry(account_address)
async fn push(&self, tx: L2Tx) {
let mut inner = self.inner.write().await;
inner
.nonces_by_account
.entry(tx.initiator_account())
.or_default()
.push(tx_hash);
self.tx_cache_by_account_nonce
.insert((account_address, nonce), tx_hash);
.insert(tx.nonce());
inner.tx_cache.insert(tx.hash(), tx);
}

fn get_tx(&self, tx_hash: &H256) -> Option<L2Tx> {
self.tx_cache.get(tx_hash).cloned()
async fn get_tx(&self, tx_hash: H256) -> Option<L2Tx> {
self.inner.read().await.tx_cache.get(&tx_hash).cloned()
}

fn get_txs_by_account(&self, account_address: Address) -> Vec<L2Tx> {
let Some(tx_hashes) = self.tx_cache_by_account.get(&account_address) else {
return Vec::new();
};

let mut txs = Vec::new();
for tx_hash in tx_hashes {
if let Some(tx) = self.get_tx(tx_hash) {
txs.push(tx);
}
async fn get_nonces_for_account(&self, account_address: Address) -> Vec<Nonce> {
let inner = self.inner.read().await;
if let Some(nonces) = inner.nonces_by_account.get(&account_address) {
nonces.iter().copied().collect()
} else {
vec![]
}
txs.into_iter().sorted_by_key(|tx| tx.nonce()).collect()
}

pub(crate) fn get_txs_hash(&self) -> Vec<H256> {
self.tx_cache.keys().cloned().collect()
async fn remove_tx(&self, tx_hash: H256) {
self.inner.write().await.tx_cache.remove(&tx_hash);
// We intentionally don't change `nonces_by_account`; they should only be changed in response to new miniblocks
}

pub(crate) fn remove_tx_by_account_nonce(&mut self, account: Address, nonce: Nonce) {
let tx_hash = self
.tx_cache_by_account_nonce
.get(&(account, nonce))
.cloned();
if let Some(tx_hash) = tx_hash {
self.remove_tx(&tx_hash);
}
}
async fn run_updates(
self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);

pub(crate) fn remove_tx(&mut self, tx_hash: &H256) {
let tx = self.tx_cache.remove(tx_hash);
if let Some(tx) = tx {
let account_tx_hashes = self
.tx_cache_by_account
.get_mut(&tx.common_data.initiator_address);
if let Some(account_tx_hashes) = account_tx_hashes {
account_tx_hashes.retain(|&hash| hash != *tx_hash);
loop {
if *stop_receiver.borrow() {
return Ok(());
}
self.tx_cache_by_account_nonce
.remove(&(tx.common_data.initiator_address, tx.common_data.nonce));

let addresses: Vec<_> = {
// Split into 2 statements for readability.
let inner = self.inner.read().await;
inner.nonces_by_account.keys().copied().collect()
};
let mut storage = pool.access_storage_tagged("api").await?;
let nonces_for_accounts = storage
.storage_web3_dal()
.get_nonces_for_addresses(&addresses)
.await?;
drop(storage); // Don't hold both `storage` and lock on `inner` at the same time.

let mut inner = self.inner.write().await;
inner.nonces_by_account.retain(|address, account_nonces| {
let stored_nonce = nonces_for_accounts
.get(address)
.copied()
.unwrap_or(Nonce(0));
// Retain only nonces exceeding the stored one.
*account_nonces = account_nonces.split_off(&(stored_nonce + 1));
// If we've removed all nonces, drop the account entry so we don't request stored nonces for it later.
!account_nonces.is_empty()
});
drop(inner);

tokio::time::sleep(UPDATE_INTERVAL).await;
}
}
}
Expand All @@ -84,33 +104,33 @@ impl TxCache {
/// and store them while they're not synced back yet
#[derive(Debug)]
pub struct TxProxy {
tx_cache: Arc<RwLock<TxCache>>,
tx_cache: TxCache,
client: HttpClient,
}

impl TxProxy {
pub fn new(main_node_url: &str, tx_cache: Arc<RwLock<TxCache>>) -> Self {
pub fn new(main_node_url: &str) -> Self {
let client = HttpClientBuilder::default().build(main_node_url).unwrap();
Self { client, tx_cache }
Self {
client,
tx_cache: TxCache::default(),
}
}

pub async fn find_tx(&self, tx_hash: H256) -> Option<L2Tx> {
self.tx_cache.read().await.get_tx(&tx_hash)
self.tx_cache.get_tx(tx_hash).await
}

pub async fn forget_tx(&self, tx_hash: H256) {
self.tx_cache.write().await.remove_tx(&tx_hash)
self.tx_cache.remove_tx(tx_hash).await;
}

pub async fn save_tx(&self, tx_hash: H256, tx: L2Tx) {
self.tx_cache.write().await.push(tx_hash, tx)
pub async fn save_tx(&self, tx: L2Tx) {
self.tx_cache.push(tx).await;
}

pub async fn get_txs_by_account(&self, account_address: Address) -> Vec<L2Tx> {
self.tx_cache
.read()
.await
.get_txs_by_account(account_address)
pub async fn get_nonces_by_account(&self, account_address: Address) -> Vec<Nonce> {
self.tx_cache.get_nonces_for_account(account_address).await
}

pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult<H256> {
Expand Down Expand Up @@ -139,4 +159,13 @@ impl TxProxy {
pub async fn request_tx_details(&self, hash: H256) -> RpcResult<Option<TransactionDetails>> {
self.client.get_transaction_details(hash).await
}

pub fn run_account_nonce_sweeper(
&self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> impl Future<Output = anyhow::Result<()>> {
let tx_cache = self.tx_cache.clone();
tx_cache.run_updates(pool, stop_receiver)
}
}
26 changes: 6 additions & 20 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use chrono::NaiveDateTime;
use futures::future;
use serde::Deserialize;
use tokio::{
sync::{mpsc, oneshot, watch, Mutex, RwLock},
sync::{mpsc, oneshot, watch, Mutex},
task::JoinHandle,
};
use tower_http::{cors::CorsLayer, metrics::InFlightRequestsLayer};
Expand Down Expand Up @@ -37,11 +37,8 @@ use crate::{
api_server::{
execution_sandbox::{BlockStartInfo, VmConcurrencyBarrier},
tree::TreeApiHttpClient,
tx_sender::{TxCache, TxSender},
web3::{
backend_jsonrpsee::batch_limiter_middleware::LimitMiddleware,
state::TxProxyCacheUpdater,
},
tx_sender::TxSender,
web3::backend_jsonrpsee::batch_limiter_middleware::LimitMiddleware,
},
sync_layer::SyncState,
};
Expand Down Expand Up @@ -117,7 +114,6 @@ struct OptionalApiParams {
websocket_requests_per_minute_limit: Option<NonZeroU32>,
tree_api_url: Option<String>,
pub_sub_events_sender: Option<mpsc::UnboundedSender<PubSubEvent>>,
tx_cache: Option<Arc<RwLock<TxCache>>>,
}

/// Full API server parameters.
Expand Down Expand Up @@ -220,11 +216,6 @@ impl ApiBuilder {
self
}

pub fn with_tx_cache_updater(mut self, tx_cache: Arc<RwLock<TxCache>>) -> Self {
self.optional.tx_cache = Some(tx_cache);
self
}

pub fn with_sync_state(mut self, sync_state: SyncState) -> Self {
self.optional.sync_state = Some(sync_state);
self
Expand Down Expand Up @@ -422,14 +413,9 @@ impl FullApiParams {
);

let mut tasks = vec![tokio::spawn(update_task)];

if let Some(tx_cache) = self.optional.tx_cache.clone() {
let task = TxProxyCacheUpdater::run(
self.last_miniblock_pool.clone(),
tx_cache,
Duration::from_secs(1),
stop_receiver.clone(),
);
if let Some(tx_proxy) = &self.tx_sender.0.proxy {
let task = tx_proxy
.run_account_nonce_sweeper(self.last_miniblock_pool.clone(), stop_receiver.clone());
tasks.push(tokio::spawn(task));
}
let pub_sub = if matches!(transport, ApiTransport::WebSocket(_))
Expand Down
Loading
Loading