Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Take into account nonce from tx proxy #995

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion core/lib/dal/src/storage_web3_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, ops};
use zksync_types::{
get_code_key, get_nonce_key,
utils::{decompose_full_nonce, storage_key_for_standard_token_balance},
AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey,
AccountTreeId, Address, L1BatchNumber, MiniblockNumber, Nonce, StorageKey,
FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, U256,
};
use zksync_utils::h256_to_u256;
Expand Down Expand Up @@ -32,6 +32,30 @@ impl StorageWeb3Dal<'_, '_> {
Ok(decompose_full_nonce(full_nonce).0)
}

/// Returns the current *stored* nonces (i.e., w/o accounting for pending transactions) for the specified accounts.
pub async fn get_nonces_for_addresses(
&mut self,
addresses: &[Address],
) -> sqlx::Result<HashMap<Address, Nonce>> {
let nonce_keys: HashMap<_, _> = addresses
.iter()
.map(|address| (get_nonce_key(address).hashed_key(), *address))
.collect();

let res = self
.get_values(&nonce_keys.keys().copied().collect::<Vec<_>>())
.await?
.into_iter()
.filter_map(|(hashed_key, value)| {
let address = nonce_keys.get(&hashed_key)?;
let full_nonce = h256_to_u256(value);
let (nonce, _) = decompose_full_nonce(full_nonce);
Some((*address, Nonce(nonce.as_u32())))
})
.collect();
Ok(res)
}

pub async fn standard_token_historical_balance(
&mut self,
token_id: AccountTreeId,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl TxSender {
// We're running an external node: we have to proxy the transaction to the main node.
// But before we do that, save the tx to cache in case someone will request it
// Before it reaches the main node.
proxy.save_tx(tx.hash(), tx.clone()).await;
proxy.save_tx(tx.clone()).await;
proxy.submit_tx(&tx).await?;
// Now, after we are sure that the tx is on the main node, remove it from cache
// since we don't want to store txs that might have been replaced or otherwise removed
Expand Down
138 changes: 129 additions & 9 deletions core/lib/zksync_core/src/api_server/tx_sender/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,110 @@
use std::collections::HashMap;
use std::{
collections::{BTreeSet, HashMap},
future::Future,
sync::Arc,
time::Duration,
};

use tokio::sync::RwLock;
use tokio::sync::{watch, RwLock};
use zksync_dal::ConnectionPool;
use zksync_types::{
api::{BlockId, Transaction, TransactionDetails, TransactionId},
l2::L2Tx,
H256,
Address, Nonce, H256,
};
use zksync_web3_decl::{
jsonrpsee::http_client::{HttpClient, HttpClientBuilder},
namespaces::{EthNamespaceClient, ZksNamespaceClient},
RpcResult,
};

#[derive(Debug, Clone, Default)]
pub(crate) struct TxCache {
inner: Arc<RwLock<TxCacheInner>>,
}

#[derive(Debug, Default)]
struct TxCacheInner {
tx_cache: HashMap<H256, L2Tx>,
nonces_by_account: HashMap<Address, BTreeSet<Nonce>>,
}
Deniallugo marked this conversation as resolved.
Show resolved Hide resolved

impl TxCache {
async fn push(&self, tx: L2Tx) {
let mut inner = self.inner.write().await;
inner
.nonces_by_account
.entry(tx.initiator_account())
.or_default()
.insert(tx.nonce());
inner.tx_cache.insert(tx.hash(), tx);
}

async fn get_tx(&self, tx_hash: H256) -> Option<L2Tx> {
self.inner.read().await.tx_cache.get(&tx_hash).cloned()
}

async fn get_nonces_for_account(&self, account_address: Address) -> BTreeSet<Nonce> {
let inner = self.inner.read().await;
if let Some(nonces) = inner.nonces_by_account.get(&account_address) {
nonces.clone()
} else {
BTreeSet::new()
}
}

async fn remove_tx(&self, tx_hash: H256) {
self.inner.write().await.tx_cache.remove(&tx_hash);
// We intentionally don't change `nonces_by_account`; they should only be changed in response to new miniblocks
}

async fn run_updates(
self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);

loop {
if *stop_receiver.borrow() {
return Ok(());
}

let addresses: Vec<_> = {
// Split into 2 statements for readability.
let inner = self.inner.read().await;
inner.nonces_by_account.keys().copied().collect()
};
let mut storage = pool.access_storage_tagged("api").await?;
let nonces_for_accounts = storage
.storage_web3_dal()
.get_nonces_for_addresses(&addresses)
.await?;
drop(storage); // Don't hold both `storage` and lock on `inner` at the same time.

let mut inner = self.inner.write().await;
inner.nonces_by_account.retain(|address, account_nonces| {
let stored_nonce = nonces_for_accounts
.get(address)
.copied()
.unwrap_or(Nonce(0));
// Retain only nonces starting from the stored one.
*account_nonces = account_nonces.split_off(&stored_nonce);
// If we've removed all nonces, drop the account entry so we don't request stored nonces for it later.
!account_nonces.is_empty()
});
drop(inner);

tokio::time::sleep(UPDATE_INTERVAL).await;
}
}
}

/// Used by external node to proxy transaction to the main node
/// and store them while they're not synced back yet
#[derive(Debug)]
pub struct TxProxy {
tx_cache: RwLock<HashMap<H256, L2Tx>>,
tx_cache: TxCache,
client: HttpClient,
}

Expand All @@ -25,20 +113,43 @@ impl TxProxy {
let client = HttpClientBuilder::default().build(main_node_url).unwrap();
Self {
client,
tx_cache: RwLock::new(HashMap::new()),
tx_cache: TxCache::default(),
}
}

pub async fn find_tx(&self, tx_hash: H256) -> Option<L2Tx> {
self.tx_cache.read().await.get(&tx_hash).cloned()
self.tx_cache.get_tx(tx_hash).await
}

pub async fn forget_tx(&self, tx_hash: H256) {
self.tx_cache.write().await.remove(&tx_hash);
self.tx_cache.remove_tx(tx_hash).await;
}

pub async fn save_tx(&self, tx: L2Tx) {
self.tx_cache.push(tx).await;
}

pub async fn get_nonces_by_account(&self, account_address: Address) -> BTreeSet<Nonce> {
self.tx_cache.get_nonces_for_account(account_address).await
}

pub async fn save_tx(&self, tx_hash: H256, tx: L2Tx) {
self.tx_cache.write().await.insert(tx_hash, tx);
pub async fn next_nonce_by_initiator_account(
&self,
account_address: Address,
current_nonce: u32,
) -> Nonce {
let mut pending_nonce = Nonce(current_nonce);
let nonces = self.get_nonces_by_account(account_address).await;
for nonce in nonces.range(pending_nonce + 1..) {
Deniallugo marked this conversation as resolved.
Show resolved Hide resolved
// If nonce is not sequential, then we should not increment nonce.
if nonce == &pending_nonce {
pending_nonce += 1;
} else {
break;
}
}

pending_nonce
}

pub async fn submit_tx(&self, tx: &L2Tx) -> RpcResult<H256> {
Expand Down Expand Up @@ -67,4 +178,13 @@ impl TxProxy {
pub async fn request_tx_details(&self, hash: H256) -> RpcResult<Option<TransactionDetails>> {
self.client.get_transaction_details(hash).await
}

pub fn run_account_nonce_sweeper(
&self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> impl Future<Output = anyhow::Result<()>> {
let tx_cache = self.tx_cache.clone();
tx_cache.run_updates(pool, stop_receiver)
}
}
29 changes: 16 additions & 13 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ struct OptionalApiParams {
#[derive(Debug)]
struct FullApiParams {
pool: ConnectionPool,
last_miniblock_pool: ConnectionPool,
updaters_pool: ConnectionPool,
config: InternalApiConfig,
transport: ApiTransport,
tx_sender: TxSender,
Expand All @@ -133,7 +133,7 @@ struct FullApiParams {
#[derive(Debug)]
pub struct ApiBuilder {
pool: ConnectionPool,
last_miniblock_pool: ConnectionPool,
updaters_pool: ConnectionPool,
config: InternalApiConfig,
polling_interval: Duration,
// Mandatory params that must be set using builder methods.
Expand All @@ -151,7 +151,7 @@ impl ApiBuilder {

pub fn jsonrpsee_backend(config: InternalApiConfig, pool: ConnectionPool) -> Self {
Self {
last_miniblock_pool: pool.clone(),
updaters_pool: pool.clone(),
pool,
config,
polling_interval: Self::DEFAULT_POLLING_INTERVAL,
Expand All @@ -173,11 +173,12 @@ impl ApiBuilder {
self
}

/// Configures a dedicated DB pool to be used for updating the latest miniblock information
/// Configures a dedicated DB pool to be used for updating different information,
/// such as last mined block number or account nonces. This pool is used to execute
/// in a background task. If not called, the main pool will be used. If the API server is under high load,
/// it may make sense to supply a single-connection pool to reduce pool contention with the API methods.
pub fn with_last_miniblock_pool(mut self, pool: ConnectionPool) -> Self {
self.last_miniblock_pool = pool;
pub fn with_updaters_pool(mut self, pool: ConnectionPool) -> Self {
self.updaters_pool = pool;
self
}

Expand Down Expand Up @@ -245,7 +246,7 @@ impl ApiBuilder {
fn into_full_params(self) -> anyhow::Result<FullApiParams> {
Ok(FullApiParams {
pool: self.pool,
last_miniblock_pool: self.last_miniblock_pool,
updaters_pool: self.updaters_pool,
config: self.config,
transport: self.transport.context("API transport not set")?,
tx_sender: self.tx_sender.context("Transaction sender not set")?,
Expand Down Expand Up @@ -276,10 +277,7 @@ impl FullApiParams {
self,
last_sealed_miniblock: SealedMiniblockNumber,
) -> anyhow::Result<RpcState> {
let mut storage = self
.last_miniblock_pool
.access_storage_tagged("api")
.await?;
let mut storage = self.updaters_pool.access_storage_tagged("api").await?;
let start_info = BlockStartInfo::new(&mut storage).await?;
drop(storage);

Expand Down Expand Up @@ -407,12 +405,17 @@ impl FullApiParams {
let (health_check, health_updater) = ReactiveHealthCheck::new(health_check_name);

let (last_sealed_miniblock, update_task) = SealedMiniblockNumber::new(
self.last_miniblock_pool.clone(),
self.updaters_pool.clone(),
SEALED_MINIBLOCK_UPDATE_INTERVAL,
stop_receiver.clone(),
);
let mut tasks = vec![tokio::spawn(update_task)];

let mut tasks = vec![tokio::spawn(update_task)];
if let Some(tx_proxy) = &self.tx_sender.0.proxy {
let task = tx_proxy
.run_account_nonce_sweeper(self.updaters_pool.clone(), stop_receiver.clone());
tasks.push(tokio::spawn(task));
}
let pub_sub = if matches!(transport, ApiTransport::WebSocket(_))
&& self.namespaces.contains(&Namespace::Pubsub)
{
Expand Down
28 changes: 21 additions & 7 deletions core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use zksync_types::{
l2::{L2Tx, TransactionType},
transaction_request::CallRequest,
utils::decompose_full_nonce,
web3,
web3::types::{FeeHistory, SyncInfo, SyncState},
web3::{
self,
types::{FeeHistory, SyncInfo, SyncState},
},
AccountTreeId, Bytes, MiniblockNumber, StorageKey, H256, L2_ETH_TOKEN_ADDRESS, U256,
};
use zksync_utils::u256_to_h256;
Expand Down Expand Up @@ -303,6 +305,7 @@ impl EthNamespace {
const METHOD_NAME: &str = "get_block_transaction_count";

let method_latency = API_METRICS.start_block_call(METHOD_NAME, block_id);

self.state.start_info.ensure_not_pruned(block_id)?;
let tx_count = self
.state
Expand Down Expand Up @@ -483,11 +486,22 @@ impl EthNamespace {
if matches!(block_id, BlockId::Number(BlockNumber::Pending)) {
let account_nonce_u64 = u64::try_from(account_nonce)
.map_err(|err| internal_error(method_name, anyhow::anyhow!(err)))?;
account_nonce = connection
.transactions_web3_dal()
.next_nonce_by_initiator_account(address, account_nonce_u64)
.await
.map_err(|err| internal_error(method_name, err))?;
account_nonce = if let Some(proxy) = &self.state.tx_sender.0.proxy {
// EN: get pending nonces from the transaction cache
// We don't have mempool in EN, it's safe to use the proxy cache as a mempool
proxy
.next_nonce_by_initiator_account(address, account_nonce_u64 as u32)
.await
.0
.into()
} else {
// Main node: get pending nonces from the mempool
connection
.transactions_web3_dal()
.next_nonce_by_initiator_account(address, account_nonce_u64)
.await
.map_err(|err| internal_error(method_name, err))?
};
}

let block_diff = self.state.last_sealed_miniblock.diff(block_number);
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/consensus/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ impl StateKeeperRunner {
let (stop_sender, stop_receiver) = sync::watch::channel(false);
let (miniblock_sealer, miniblock_sealer_handle) =
MiniblockSealer::new(self.pool.clone(), 5);

let io = ExternalIO::new(
miniblock_sealer_handle,
self.pool.clone(),
Expand Down
Loading
Loading