Skip to content

Commit

Permalink
mirror: make it easier to see what happened in debug logs (#7900)
Browse files Browse the repository at this point in the history
this will print more easy to understand info on which source chain transactions are making it into the target chain. for now we just log them to debug logs but it would be nice to have some HTTP debug page that shows an easy to understand summary
  • Loading branch information
marcelo-gonzalez authored and nikurt committed Nov 7, 2022
1 parent 7698da9 commit 2a31f65
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 67 deletions.
189 changes: 167 additions & 22 deletions tools/mirror/src/chain_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,73 @@
use crate::MappedBlock;
use crate::{MappedBlock, MappedTx};
use near_crypto::PublicKey;
use near_indexer::StreamerMessage;
use near_indexer_primitives::IndexerTransactionWithOutcome;
use near_primitives::hash::CryptoHash;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, BlockHeight};
use near_primitives_core::types::{Nonce, ShardId};
use near_primitives_core::types::{Gas, Nonce, ShardId};
use std::cmp::Ordering;
use std::collections::hash_map;
use std::collections::HashMap;
use std::collections::{BTreeSet, VecDeque};
use std::fmt::Write;
use std::pin::Pin;
use std::time::{Duration, Instant};

// Information related to a single transaction that we sent in the past.
// We could just forget it and not save any of this, but keeping this info
// makes it easy to print out human-friendly info later on when we find this
// transaction on chain.
struct TxSendInfo {
sent_at: Instant,
source_height: BlockHeight,
target_height: BlockHeight,
source_tx_index: usize,
source_shard_id: ShardId,
source_signer_id: AccountId,
source_receiver_id: AccountId,
target_signer_id: Option<AccountId>,
target_receiver_id: Option<AccountId>,
actions: Vec<String>,
sent_at_target_height: BlockHeight,
}

impl TxSendInfo {
fn new(
tx: &MappedTx,
source_shard_id: ShardId,
source_height: BlockHeight,
target_height: BlockHeight,
now: Instant,
) -> Self {
let target_signer_id = if &tx.source_signer_id != &tx.target_tx.transaction.signer_id {
Some(tx.target_tx.transaction.signer_id.clone())
} else {
None
};
let target_receiver_id = if &tx.source_receiver_id != &tx.target_tx.transaction.receiver_id
{
Some(tx.target_tx.transaction.receiver_id.clone())
} else {
None
};
Self {
source_height,
source_shard_id: source_shard_id,
source_tx_index: tx.source_tx_index,
source_signer_id: tx.source_signer_id.clone(),
source_receiver_id: tx.source_receiver_id.clone(),
target_signer_id,
target_receiver_id,
sent_at: now,
sent_at_target_height: target_height,
actions: tx
.target_tx
.transaction
.actions
.iter()
.map(|a| a.as_ref().to_string())
.collect::<Vec<_>>(),
}
}
}

#[derive(PartialEq, Eq, Debug)]
Expand Down Expand Up @@ -103,6 +154,20 @@ impl<'a> Iterator for TxAwaitingNonceIter<'a> {
}
}

fn gas_pretty(gas: Gas) -> String {
if gas < 1000 {
format!("{} gas", gas)
} else if gas < 1_000_000 {
format!("{} Kgas", gas / 1000)
} else if gas < 1_000_000_000 {
format!("{} Mgas", gas / 1_000_000)
} else if gas < 1_000_000_000_000 {
format!("{} Ggas", gas / 1_000_000_000)
} else {
format!("{} Tgas", gas / 1_000_000_000_000)
}
}

// Keeps the queue of upcoming transactions and provides them in regular intervals via next_batch()
// Also keeps track of txs we've sent so far and looks for them on chain, for metrics/logging purposes.
#[derive(Default)]
Expand Down Expand Up @@ -297,19 +362,90 @@ impl TxTracker {
}
}

fn log_target_block(&self, msg: &StreamerMessage) {
// don't do any work here if we're definitely not gonna log it
if tracing::level_filters::LevelFilter::current()
> tracing::level_filters::LevelFilter::DEBUG
{
return;
}

// right now we're just logging this, but it would be nice to collect/index this
// and have some HTTP debug page where you can see how close the target chain is
// to the source chain
let mut log_message = String::new();
let now = Instant::now();

for s in msg.shards.iter() {
let mut other_txs = 0;
if let Some(c) = &s.chunk {
if c.header.height_included == msg.block.header.height {
write!(
log_message,
"-------- shard {} gas used: {} ---------\n",
s.shard_id,
gas_pretty(c.header.gas_used)
)
.unwrap();
for tx in c.transactions.iter() {
if let Some(info) = self.sent_txs.get(&tx.transaction.hash) {
write!(
log_message,
"source #{}{} tx #{} signer: \"{}\"{} receiver: \"{}\"{} actions: <{}> sent {:?} ago @ target #{}\n",
info.source_height,
if s.shard_id == info.source_shard_id {
String::new()
} else {
format!(" (source shard {})", info.source_shard_id)
},
info.source_tx_index,
info.source_signer_id,
info.target_signer_id.as_ref().map_or(String::new(), |s| format!(" (mapped to \"{}\")", s)),
info.source_receiver_id,
info.target_receiver_id.as_ref().map_or(String::new(), |s| format!(" (mapped to \"{}\")", s)),
info.actions.join(", "),
now - info.sent_at,
info.sent_at_target_height,
).unwrap();
} else {
other_txs += 1;
}
}
} else {
write!(
log_message,
"-------- shard {} old chunk (#{}) ---------\n",
s.shard_id, c.header.height_included
)
.unwrap();
}
} else {
write!(log_message, "-------- shard {} chunk missing ---------\n", s.shard_id)
.unwrap();
}
if other_txs > 0 {
write!(log_message, " ... \n").unwrap();
write!(
log_message,
"{} other txs (not ours, or sent before a restart)\n",
other_txs
)
.unwrap();
write!(log_message, " ... \n").unwrap();
}
}
tracing::debug!(target: "mirror", "received target block #{}:\n{}", msg.block.header.height, log_message);
}

pub(crate) fn on_target_block(&mut self, msg: &StreamerMessage) {
self.record_block_timestamp(msg);
self.log_target_block(msg);

for s in msg.shards.iter() {
if let Some(c) = &s.chunk {
for tx in c.transactions.iter() {
if let Some(send_info) = self.sent_txs.remove(&tx.transaction.hash) {
let latency = Instant::now() - send_info.sent_at;
tracing::debug!(
target: "mirror", "found my tx {} from source #{} in target #{} {:?} after sending @ target #{}",
tx.transaction.hash, send_info.source_height, msg.block.header.height, latency, send_info.target_height
);
if self.sent_txs.remove(&tx.transaction.hash).is_some() {
crate::metrics::TRANSACTIONS_INCLUDED.inc();

self.remove_tx(tx);
}
}
Expand All @@ -319,11 +455,13 @@ impl TxTracker {

fn on_tx_sent(
&mut self,
tx: &SignedTransaction,
tx: &MappedTx,
source_shard_id: ShardId,
source_height: BlockHeight,
target_height: BlockHeight,
now: Instant,
) {
let hash = tx.get_hash();
let hash = tx.target_tx.get_hash();
if self.sent_txs.contains_key(&hash) {
tracing::warn!(target: "mirror", "transaction sent twice: {}", &hash);
return;
Expand All @@ -332,21 +470,24 @@ impl TxTracker {
// TODO: don't keep adding txs if we're not ever finding them on chain, since we'll OOM eventually
// if that happens.
self.sent_txs
.insert(hash, TxSendInfo { sent_at: Instant::now(), source_height, target_height });
.insert(hash, TxSendInfo::new(tx, source_shard_id, source_height, target_height, now));
let txs = self
.txs_by_signer
.entry((tx.transaction.signer_id.clone(), tx.transaction.public_key.clone()))
.entry((
tx.target_tx.transaction.signer_id.clone(),
tx.target_tx.transaction.public_key.clone(),
))
.or_default();

if let Some(highest_nonce) = txs.iter().next_back() {
if highest_nonce.nonce > tx.transaction.nonce {
if highest_nonce.nonce > tx.target_tx.transaction.nonce {
tracing::warn!(
target: "mirror", "transaction sent with out of order nonce: {}: {}. Sent so far: {:?}",
&hash, tx.transaction.nonce, txs
&hash, tx.target_tx.transaction.nonce, txs
);
}
}
if !txs.insert(TxId { hash, nonce: tx.transaction.nonce }) {
if !txs.insert(TxId { hash, nonce: tx.target_tx.transaction.nonce }) {
tracing::warn!(target: "mirror", "inserted tx {} twice into txs_by_signer", &hash);
}
}
Expand Down Expand Up @@ -405,16 +546,20 @@ impl TxTracker {
// We just successfully sent some transactions. Remember them so we can see if they really show up on chain.
pub(crate) fn on_txs_sent(
&mut self,
txs: &[SignedTransaction],
txs: &[(ShardId, Vec<MappedTx>)],
source_height: BlockHeight,
target_height: BlockHeight,
) {
let num_txs: usize = txs.iter().map(|(_, txs)| txs.len()).sum();
tracing::info!(
target: "mirror", "Sent {} transactions from source #{} with target HEAD @ #{}",
txs.len(), source_height, target_height
num_txs, source_height, target_height
);
for tx in txs.iter() {
self.on_tx_sent(tx, source_height, target_height);
let now = Instant::now();
for (shard_id, txs) in txs.iter() {
for tx in txs.iter() {
self.on_tx_sent(tx, *shard_id, source_height, target_height, now);
}
}

let block_delay = self
Expand Down
Loading

0 comments on commit 2a31f65

Please sign in to comment.