From 2a31f651e1d9040682846cfcbffd2c19ed4f686a Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Mon, 24 Oct 2022 14:48:42 -0400 Subject: [PATCH] mirror: make it easier to see what happened in debug logs (#7900) this will print more easy to understand info on which source chain transactions are making it into the target chain. for now we just log them to debug logs but it would be nice to have some HTTP debug page that shows an easy to understand summary --- tools/mirror/src/chain_tracker.rs | 189 ++++++++++++++++++++++++++---- tools/mirror/src/lib.rs | 154 +++++++++++++++++------- 2 files changed, 276 insertions(+), 67 deletions(-) diff --git a/tools/mirror/src/chain_tracker.rs b/tools/mirror/src/chain_tracker.rs index 40d7cfd8635..1fb399d6046 100644 --- a/tools/mirror/src/chain_tracker.rs +++ b/tools/mirror/src/chain_tracker.rs @@ -1,22 +1,73 @@ -use crate::MappedBlock; +use crate::{MappedBlock, MappedTx}; use near_crypto::PublicKey; use near_indexer::StreamerMessage; use near_indexer_primitives::IndexerTransactionWithOutcome; use near_primitives::hash::CryptoHash; -use near_primitives::transaction::SignedTransaction; use near_primitives::types::{AccountId, BlockHeight}; -use near_primitives_core::types::{Nonce, ShardId}; +use near_primitives_core::types::{Gas, Nonce, ShardId}; use std::cmp::Ordering; use std::collections::hash_map; use std::collections::HashMap; use std::collections::{BTreeSet, VecDeque}; +use std::fmt::Write; use std::pin::Pin; use std::time::{Duration, Instant}; +// Information related to a single transaction that we sent in the past. +// We could just forget it and not save any of this, but keeping this info +// makes it easy to print out human-friendly info later on when we find this +// transaction on chain. struct TxSendInfo { sent_at: Instant, source_height: BlockHeight, - target_height: BlockHeight, + source_tx_index: usize, + source_shard_id: ShardId, + source_signer_id: AccountId, + source_receiver_id: AccountId, + target_signer_id: Option, + target_receiver_id: Option, + actions: Vec, + sent_at_target_height: BlockHeight, +} + +impl TxSendInfo { + fn new( + tx: &MappedTx, + source_shard_id: ShardId, + source_height: BlockHeight, + target_height: BlockHeight, + now: Instant, + ) -> Self { + let target_signer_id = if &tx.source_signer_id != &tx.target_tx.transaction.signer_id { + Some(tx.target_tx.transaction.signer_id.clone()) + } else { + None + }; + let target_receiver_id = if &tx.source_receiver_id != &tx.target_tx.transaction.receiver_id + { + Some(tx.target_tx.transaction.receiver_id.clone()) + } else { + None + }; + Self { + source_height, + source_shard_id: source_shard_id, + source_tx_index: tx.source_tx_index, + source_signer_id: tx.source_signer_id.clone(), + source_receiver_id: tx.source_receiver_id.clone(), + target_signer_id, + target_receiver_id, + sent_at: now, + sent_at_target_height: target_height, + actions: tx + .target_tx + .transaction + .actions + .iter() + .map(|a| a.as_ref().to_string()) + .collect::>(), + } + } } #[derive(PartialEq, Eq, Debug)] @@ -103,6 +154,20 @@ impl<'a> Iterator for TxAwaitingNonceIter<'a> { } } +fn gas_pretty(gas: Gas) -> String { + if gas < 1000 { + format!("{} gas", gas) + } else if gas < 1_000_000 { + format!("{} Kgas", gas / 1000) + } else if gas < 1_000_000_000 { + format!("{} Mgas", gas / 1_000_000) + } else if gas < 1_000_000_000_000 { + format!("{} Ggas", gas / 1_000_000_000) + } else { + format!("{} Tgas", gas / 1_000_000_000_000) + } +} + // Keeps the queue of upcoming transactions and provides them in regular intervals via next_batch() // Also keeps track of txs we've sent so far and looks for them on chain, for metrics/logging purposes. #[derive(Default)] @@ -297,19 +362,90 @@ impl TxTracker { } } + fn log_target_block(&self, msg: &StreamerMessage) { + // don't do any work here if we're definitely not gonna log it + if tracing::level_filters::LevelFilter::current() + > tracing::level_filters::LevelFilter::DEBUG + { + return; + } + + // right now we're just logging this, but it would be nice to collect/index this + // and have some HTTP debug page where you can see how close the target chain is + // to the source chain + let mut log_message = String::new(); + let now = Instant::now(); + + for s in msg.shards.iter() { + let mut other_txs = 0; + if let Some(c) = &s.chunk { + if c.header.height_included == msg.block.header.height { + write!( + log_message, + "-------- shard {} gas used: {} ---------\n", + s.shard_id, + gas_pretty(c.header.gas_used) + ) + .unwrap(); + for tx in c.transactions.iter() { + if let Some(info) = self.sent_txs.get(&tx.transaction.hash) { + write!( + log_message, + "source #{}{} tx #{} signer: \"{}\"{} receiver: \"{}\"{} actions: <{}> sent {:?} ago @ target #{}\n", + info.source_height, + if s.shard_id == info.source_shard_id { + String::new() + } else { + format!(" (source shard {})", info.source_shard_id) + }, + info.source_tx_index, + info.source_signer_id, + info.target_signer_id.as_ref().map_or(String::new(), |s| format!(" (mapped to \"{}\")", s)), + info.source_receiver_id, + info.target_receiver_id.as_ref().map_or(String::new(), |s| format!(" (mapped to \"{}\")", s)), + info.actions.join(", "), + now - info.sent_at, + info.sent_at_target_height, + ).unwrap(); + } else { + other_txs += 1; + } + } + } else { + write!( + log_message, + "-------- shard {} old chunk (#{}) ---------\n", + s.shard_id, c.header.height_included + ) + .unwrap(); + } + } else { + write!(log_message, "-------- shard {} chunk missing ---------\n", s.shard_id) + .unwrap(); + } + if other_txs > 0 { + write!(log_message, " ... \n").unwrap(); + write!( + log_message, + "{} other txs (not ours, or sent before a restart)\n", + other_txs + ) + .unwrap(); + write!(log_message, " ... \n").unwrap(); + } + } + tracing::debug!(target: "mirror", "received target block #{}:\n{}", msg.block.header.height, log_message); + } + pub(crate) fn on_target_block(&mut self, msg: &StreamerMessage) { self.record_block_timestamp(msg); + self.log_target_block(msg); + for s in msg.shards.iter() { if let Some(c) = &s.chunk { for tx in c.transactions.iter() { - if let Some(send_info) = self.sent_txs.remove(&tx.transaction.hash) { - let latency = Instant::now() - send_info.sent_at; - tracing::debug!( - target: "mirror", "found my tx {} from source #{} in target #{} {:?} after sending @ target #{}", - tx.transaction.hash, send_info.source_height, msg.block.header.height, latency, send_info.target_height - ); + if self.sent_txs.remove(&tx.transaction.hash).is_some() { crate::metrics::TRANSACTIONS_INCLUDED.inc(); - self.remove_tx(tx); } } @@ -319,11 +455,13 @@ impl TxTracker { fn on_tx_sent( &mut self, - tx: &SignedTransaction, + tx: &MappedTx, + source_shard_id: ShardId, source_height: BlockHeight, target_height: BlockHeight, + now: Instant, ) { - let hash = tx.get_hash(); + let hash = tx.target_tx.get_hash(); if self.sent_txs.contains_key(&hash) { tracing::warn!(target: "mirror", "transaction sent twice: {}", &hash); return; @@ -332,21 +470,24 @@ impl TxTracker { // TODO: don't keep adding txs if we're not ever finding them on chain, since we'll OOM eventually // if that happens. self.sent_txs - .insert(hash, TxSendInfo { sent_at: Instant::now(), source_height, target_height }); + .insert(hash, TxSendInfo::new(tx, source_shard_id, source_height, target_height, now)); let txs = self .txs_by_signer - .entry((tx.transaction.signer_id.clone(), tx.transaction.public_key.clone())) + .entry(( + tx.target_tx.transaction.signer_id.clone(), + tx.target_tx.transaction.public_key.clone(), + )) .or_default(); if let Some(highest_nonce) = txs.iter().next_back() { - if highest_nonce.nonce > tx.transaction.nonce { + if highest_nonce.nonce > tx.target_tx.transaction.nonce { tracing::warn!( target: "mirror", "transaction sent with out of order nonce: {}: {}. Sent so far: {:?}", - &hash, tx.transaction.nonce, txs + &hash, tx.target_tx.transaction.nonce, txs ); } } - if !txs.insert(TxId { hash, nonce: tx.transaction.nonce }) { + if !txs.insert(TxId { hash, nonce: tx.target_tx.transaction.nonce }) { tracing::warn!(target: "mirror", "inserted tx {} twice into txs_by_signer", &hash); } } @@ -405,16 +546,20 @@ impl TxTracker { // We just successfully sent some transactions. Remember them so we can see if they really show up on chain. pub(crate) fn on_txs_sent( &mut self, - txs: &[SignedTransaction], + txs: &[(ShardId, Vec)], source_height: BlockHeight, target_height: BlockHeight, ) { + let num_txs: usize = txs.iter().map(|(_, txs)| txs.len()).sum(); tracing::info!( target: "mirror", "Sent {} transactions from source #{} with target HEAD @ #{}", - txs.len(), source_height, target_height + num_txs, source_height, target_height ); - for tx in txs.iter() { - self.on_tx_sent(tx, source_height, target_height); + let now = Instant::now(); + for (shard_id, txs) in txs.iter() { + for tx in txs.iter() { + self.on_tx_sent(tx, *shard_id, source_height, target_height, now); + } } let block_delay = self diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs index b53f7c200a8..079ffe74472 100644 --- a/tools/mirror/src/lib.rs +++ b/tools/mirror/src/lib.rs @@ -147,19 +147,65 @@ fn open_db>(home: P, config: &NearConfig) -> anyhow::Result { // a transaction that's almost prepared, except that we don't yet know // what nonce to use because the public key was added in an AddKey -// action that we haven't seen on chain yet. The tx field is complete +// action that we haven't seen on chain yet. The target_tx field is complete // except for the nonce field. #[derive(Debug)] struct TxAwaitingNonce { source_public: PublicKey, source_signer_id: AccountId, + source_receiver_id: AccountId, + source_tx_index: usize, target_private: SecretKey, - tx: Transaction, + target_tx: Transaction, +} + +impl TxAwaitingNonce { + fn new( + source_tx: &SignedTransactionView, + source_tx_index: usize, + target_tx: Transaction, + target_private: SecretKey, + ) -> Self { + Self { + source_public: source_tx.public_key.clone(), + source_signer_id: source_tx.signer_id.clone(), + source_receiver_id: source_tx.receiver_id.clone(), + source_tx_index, + target_private, + target_tx, + } + } +} + +// A transaction meant for the target chain that is complete/ready to send. +// We keep some extra info about the transaction for the purposes of logging +// later on when we find it on chain. +#[derive(Debug)] +struct MappedTx { + source_signer_id: AccountId, + source_receiver_id: AccountId, + source_tx_index: usize, + target_tx: SignedTransaction, +} + +impl MappedTx { + fn new( + source_tx: &SignedTransactionView, + source_tx_index: usize, + target_tx: SignedTransaction, + ) -> Self { + Self { + source_signer_id: source_tx.signer_id.clone(), + source_receiver_id: source_tx.receiver_id.clone(), + source_tx_index, + target_tx, + } + } } #[derive(Debug)] enum TargetChainTx { - Ready(SignedTransaction), + Ready(MappedTx), AwaitingNonce(TxAwaitingNonce), } @@ -169,16 +215,21 @@ impl TargetChainTx { fn set_nonce(&mut self, nonce: Nonce) { match self { Self::AwaitingNonce(t) => { - t.tx.nonce = nonce; - let tx = SignedTransaction::new( - t.target_private.sign(&t.tx.get_hash_and_size().0.as_ref()), - t.tx.clone(), + t.target_tx.nonce = nonce; + let target_tx = SignedTransaction::new( + t.target_private.sign(&t.target_tx.get_hash_and_size().0.as_ref()), + t.target_tx.clone(), ); tracing::debug!( target: "mirror", "prepared a transaction for ({:?}, {:?}) that was previously waiting for the access key to appear on chain", - &tx.transaction.signer_id, &tx.transaction.public_key + &t.source_signer_id, &t.source_public, ); - *self = Self::Ready(tx); + *self = Self::Ready(MappedTx { + source_signer_id: t.source_signer_id.clone(), + source_receiver_id: t.source_receiver_id.clone(), + source_tx_index: t.source_tx_index, + target_tx, + }); } Self::Ready(_) => unreachable!(), } @@ -429,18 +480,19 @@ impl TxMirror { async fn send_transactions( &mut self, - block: &MappedBlock, - ) -> anyhow::Result> { + block: MappedBlock, + ) -> anyhow::Result)>> { let mut sent = vec![]; - for chunk in block.chunks.iter() { - for tx in chunk.txs.iter() { + for chunk in block.chunks { + let mut txs = vec![]; + for tx in chunk.txs { match tx { TargetChainTx::Ready(tx) => { match self .target_client .send( NetworkClientMessages::Transaction { - transaction: tx.clone(), + transaction: tx.target_tx.clone(), is_forwarded: false, check_only: false, } @@ -450,7 +502,7 @@ impl TxMirror { { NetworkClientResponses::RequestRouted => { crate::metrics::TRANSACTIONS_SENT.with_label_values(&["ok"]).inc(); - sent.push(tx.clone()); + txs.push(tx); } NetworkClientResponses::InvalidTx(e) => { // TODO: here if we're getting an error because the tx was already included, it is possible @@ -481,6 +533,7 @@ impl TxMirror { } } } + sent.push((chunk.shard_id, txs)); } Ok(sent) } @@ -761,61 +814,71 @@ impl TxMirror { } let mut num_not_ready = 0; - for t in chunk.transactions { - let actions = self.map_actions(&t, &prev_hash).await?; + for (idx, source_tx) in chunk.transactions.into_iter().enumerate() { + let actions = self.map_actions(&source_tx, &prev_hash).await?; if actions.is_empty() { // If this is a tx containing only stake actions, skip it. continue; } - let mapped_key = crate::key_mapping::map_key(&t.public_key, self.secret.as_ref()); + let mapped_key = + crate::key_mapping::map_key(&source_tx.public_key, self.secret.as_ref()); let public_key = mapped_key.public_key(); let target_signer_id = - crate::key_mapping::map_account(&t.signer_id, self.secret.as_ref()); + crate::key_mapping::map_account(&source_tx.signer_id, self.secret.as_ref()); match self - .map_nonce(&t.signer_id, &target_signer_id, &t.public_key, &public_key, t.nonce) + .map_nonce( + &source_tx.signer_id, + &target_signer_id, + &source_tx.public_key, + &public_key, + source_tx.nonce, + ) .await? { Ok(nonce) => { - let mut tx = Transaction::new( + let mut target_tx = Transaction::new( target_signer_id, public_key, - crate::key_mapping::map_account(&t.receiver_id, self.secret.as_ref()), + crate::key_mapping::map_account( + &source_tx.receiver_id, + self.secret.as_ref(), + ), nonce, ref_hash.clone(), ); - tx.actions = actions; - let tx = SignedTransaction::new( - mapped_key.sign(&tx.get_hash_and_size().0.as_ref()), - tx, + target_tx.actions = actions; + let target_tx = SignedTransaction::new( + mapped_key.sign(&target_tx.get_hash_and_size().0.as_ref()), + target_tx, ); - txs.push(TargetChainTx::Ready(tx)); + txs.push(TargetChainTx::Ready(MappedTx::new(&source_tx, idx, target_tx))); } Err(e) => match e { MapNonceError::AddOverflow(..) | MapNonceError::SubOverflow(..) | MapNonceError::SourceKeyNotOnChain => { - tracing::error!(target: "mirror", "error mapping nonce for ({:?}, {:?}): {:?}", &t.signer_id, &public_key, e); + tracing::error!(target: "mirror", "error mapping nonce for ({:?}, {:?}): {:?}", &source_tx.signer_id, &public_key, e); continue; } MapNonceError::TargetKeyNotOnChain => { - let mut tx = Transaction::new( - crate::key_mapping::map_account(&t.signer_id, self.secret.as_ref()), + let mut target_tx = Transaction::new( + crate::key_mapping::map_account( + &source_tx.signer_id, + self.secret.as_ref(), + ), public_key, crate::key_mapping::map_account( - &t.receiver_id, + &source_tx.receiver_id, self.secret.as_ref(), ), - t.nonce, + source_tx.nonce, ref_hash.clone(), ); - tx.actions = actions; - txs.push(TargetChainTx::AwaitingNonce(TxAwaitingNonce { - tx, - source_public: t.public_key.clone(), - source_signer_id: t.signer_id.clone(), - target_private: mapped_key, - })); + target_tx.actions = actions; + txs.push(TargetChainTx::AwaitingNonce(TxAwaitingNonce::new( + &source_tx, idx, target_tx, mapped_key, + ))); num_not_ready += 1; } }, @@ -931,10 +994,10 @@ impl TxMirror { let nonce = self .map_nonce( &tx.source_signer_id, - &tx.tx.signer_id, + &tx.target_tx.signer_id, &tx.source_public, - &tx.tx.public_key, - tx.tx.nonce, + &tx.target_tx.public_key, + tx.target_tx.nonce, ) .await? .unwrap(); @@ -962,13 +1025,14 @@ impl TxMirror { // time to send a batch of transactions mapped_block = tracker.next_batch(), if tracker.num_blocks_queued() > 0 => { let mapped_block = mapped_block.unwrap(); - let sent = self.send_transactions(&mapped_block).await?; - tracker.on_txs_sent(&sent, mapped_block.source_height, target_height); + let source_height = mapped_block.source_height; + let sent = self.send_transactions(mapped_block).await?; + tracker.on_txs_sent(&sent, source_height, target_height); // now we have one second left until we need to send more transactions. In the // meantime, we might as well prepare some more batches of transactions. // TODO: continue in best effort fashion on error - self.set_next_source_height(mapped_block.source_height+1)?; + self.set_next_source_height(source_height+1)?; self.queue_txs(&mut tracker, target_head, true).await?; } msg = self.target_stream.recv() => {