Skip to content

Commit

Permalink
Merge pull request #4511 from nervosnetwork/zhangsoledad/tx-pool
Browse files Browse the repository at this point in the history
Modify the record scope of tx-pool reject record and fix rule for orphan tx.
  • Loading branch information
zhangsoledad authored Jul 15, 2024
2 parents b962885 + 53c4765 commit 2c3cfb9
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 54 deletions.
5 changes: 1 addition & 4 deletions shared/src/shared_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,7 @@ fn register_tx_pool_callback(tx_pool_builder: &mut TxPoolServiceBuilder, notify:
move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
let tx_hash = entry.transaction().hash();
// record recent reject
if matches!(
reject,
Reject::Resolve(..) | Reject::RBFRejected(..) | Reject::Invalidated(..)
) {
if reject.should_recorded() {
if let Some(ref mut recent_reject) = tx_pool.recent_reject {
if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) {
error!("record recent_reject failed {} {} {}", tx_hash, reject, e);
Expand Down
11 changes: 11 additions & 0 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,17 @@ impl Relayer {
TxVerificationResult::Reject { tx_hash } => {
self.shared.state().remove_from_known_txs(&tx_hash);
}
TxVerificationResult::UnknownParents { peer, parents } => {
let tx_hashes: Vec<_> = {
let mut tx_filter = self.shared.state().tx_filter();
tx_filter.remove_expired();
parents
.into_iter()
.filter(|tx_hash| !tx_filter.contains(tx_hash))
.collect()
};
self.shared.state().add_ask_for_txs(peer, tx_hashes);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(TxPoolOrphanNormal),
Box::new(TxPoolOrphanReverse),
Box::new(TxPoolOrphanUnordered),
Box::new(TxPoolOrphanPartialInputUnknown),
Box::new(TxPoolOrphanDoubleSpend),
Box::new(OrphanTxRejected),
Box::new(GetRawTxPool),
Expand Down
103 changes: 82 additions & 21 deletions test/src/specs/tx_pool/orphan_tx.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::util::transaction::relay_tx;
use crate::util::transaction::{relay_tx, send_tx};
use crate::utils::wait_until;
use crate::{Net, Node, Spec};
use ckb_jsonrpc_types::Status;
use ckb_network::SupportProtocols;
use ckb_types::core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView};
use ckb_types::packed::CellOutputBuilder;
use ckb_types::{
bytes::Bytes,
core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView},
packed,
};
use ckb_types::{
packed::{CellInput, OutPoint},
prelude::*,
Expand Down Expand Up @@ -193,6 +197,30 @@ fn run_replay_tx(
})
}

fn run_send_tx(
net: &Net,
node0: &Node,
tx: TransactionView,
orphan_tx_cnt: u64,
pending_cnt: u64,
) -> bool {
send_tx(net, node0, tx, ALWAYS_SUCCESS_SCRIPT_CYCLE);

wait_until(5, || {
let tx_pool_info = node0.get_tip_tx_pool_info();
tx_pool_info.orphan.value() == orphan_tx_cnt && tx_pool_info.pending.value() == pending_cnt
})
}

fn should_receive_get_relay_transactions(net: &Net, node0: &Node, assert_message: &str) {
let ret = net.should_receive(node0, |data: &Bytes| {
packed::RelayMessage::from_slice(data)
.map(|message| message.to_enum().item_name() == packed::GetRelayTransactions::NAME)
.unwrap_or(false)
});
assert!(ret, "{}", assert_message);
}

pub struct TxPoolOrphanNormal;
impl Spec for TxPoolOrphanNormal {
fn run(&self, nodes: &mut Vec<Node>) {
Expand Down Expand Up @@ -236,23 +264,20 @@ impl Spec for TxPoolOrphanReverse {
run_replay_tx(&net, node0, final_tx, 1, 0),
"expect final_tx is in orphan pool"
);
should_receive_get_relay_transactions(&net, node0, "node should ask for tx11 tx12 tx13");

assert!(run_send_tx(&net, node0, tx13, 2, 0), "tx13 in orphan pool");
should_receive_get_relay_transactions(&net, node0, "node should ask for tx1");

assert!(
run_replay_tx(&net, node0, tx13, 2, 0),
"tx13 in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx12, 3, 0),
run_send_tx(&net, node0, tx12, 3, 0),
"tx12 is in orphan pool"
);
assert!(run_replay_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan");

assert!(run_replay_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan");
assert!(run_send_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan");
assert!(run_send_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan");

assert!(
run_replay_tx(&net, node0, parent, 0, 6),
"all is in pending"
);
should_receive_get_relay_transactions(&net, node0, "node should ask for parent");
assert!(run_send_tx(&net, node0, parent, 0, 6), "all is in pending");
}
}

Expand All @@ -267,13 +292,14 @@ impl Spec for TxPoolOrphanUnordered {
"expect final_tx is in orphan pool"
);

assert!(
run_replay_tx(&net, node0, tx11, 2, 0),
"tx11 in orphan pool"
);
should_receive_get_relay_transactions(&net, node0, "node should ask for tx11 tx12 tx13");

assert!(run_send_tx(&net, node0, tx11, 2, 0), "tx11 in orphan pool");
should_receive_get_relay_transactions(&net, node0, "node should ask for tx1");

let tx12_clone = tx12.clone();
assert!(
run_replay_tx(&net, node0, tx12, 3, 0),
run_send_tx(&net, node0, tx12, 3, 0),
"tx12 is in orphan pool"
);

Expand All @@ -292,12 +318,47 @@ impl Spec for TxPoolOrphanUnordered {
"parent is sent, should be in pending without change orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx1, 1, 4),
run_send_tx(&net, node0, tx1, 1, 4),
"tx1 is sent, orphan pool only contains final_tx"
);

assert!(
run_replay_tx(&net, node0, tx13, 0, 6),
run_send_tx(&net, node0, tx13, 0, 6),
"tx13 is sent, orphan pool is empty"
);
}
}

pub struct TxPoolOrphanPartialInputUnknown;
impl Spec for TxPoolOrphanPartialInputUnknown {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0);

assert!(
run_replay_tx(&net, node0, parent, 0, 1),
"parent sended expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx1, 0, 2),
"tx1 is sent expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx11, 0, 3),
"tx11 is sent expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx12, 0, 4),
"tx12 is sent expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, final_tx, 1, 4),
"expect final_tx is in orphan pool"
);

should_receive_get_relay_transactions(&net, node0, "node should ask for tx13");
assert!(
run_send_tx(&net, node0, tx13, 0, 6),
"tx13 is sent, orphan pool is empty"
);
}
Expand Down
20 changes: 20 additions & 0 deletions test/src/util/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ pub fn always_success_transactions_with_rand_data(
.build()
}

pub fn send_tx(net: &Net, node: &Node, tx: TransactionView, cycles: u64) {
let relay_tx = packed::RelayTransaction::new_builder()
.cycles(cycles.pack())
.transaction(tx.data())
.build();

let tx_msg = packed::RelayMessage::new_builder()
.set(
packed::RelayTransactions::new_builder()
.transactions(
packed::RelayTransactionVec::new_builder()
.set(vec![relay_tx])
.build(),
)
.build(),
)
.build();
net.send(node, SupportProtocols::RelayV3, tx_msg.as_bytes());
}

pub fn relay_tx(net: &Net, node: &Node, tx: TransactionView, cycles: u64) {
let tx_hashes_msg = packed::RelayMessage::new_builder()
.set(
Expand Down
1 change: 1 addition & 0 deletions tx-pool/src/component/orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl OrphanPool {
.insert(tx.proposal_short_id());
}

// DoS prevention: do not allow OrphanPool to grow unbounded
self.limit_size()
}

Expand Down
44 changes: 16 additions & 28 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
use ckb_network::PeerIndex;
use ckb_snapshot::Snapshot;
use ckb_store::data_loader_wrapper::AsDataLoader;
use ckb_store::ChainStore;
use ckb_types::core::error::OutPointError;
use ckb_types::{
core::{cell::ResolvedTransaction, BlockView, Capacity, Cycle, HeaderView, TransactionView},
Expand Down Expand Up @@ -459,7 +458,10 @@ impl TxPoolService {
match remote {
Some((declared_cycle, peer)) => match ret {
Ok(_) => {
debug!("after_process remote send_result_to_relayer {}", tx_hash);
debug!(
"after_process remote send_result_to_relayer {} {}",
tx_hash, peer
);
self.send_result_to_relayer(TxVerificationResult::Ok {
original_peer: Some(peer),
with_vm_2023,
Expand All @@ -468,8 +470,15 @@ impl TxPoolService {
self.process_orphan_tx(&tx).await;
}
Err(reject) => {
debug!("after_process {} remote reject: {} ", tx_hash, reject);
if is_missing_input(reject) && all_inputs_is_unknown(snapshot, &tx) {
debug!(
"after_process {} {} remote reject: {} ",
tx_hash, peer, reject
);
if is_missing_input(reject) {
self.send_result_to_relayer(TxVerificationResult::UnknownParents {
peer,
parents: tx.unique_parents(),
});
self.add_orphan(tx, peer, declared_cycle).await;
} else {
if reject.is_malformed_tx() {
Expand All @@ -480,13 +489,7 @@ impl TxPoolService {
tx_hash: tx_hash.clone(),
});
}

if matches!(
reject,
Reject::Resolve(..)
| Reject::Verification(..)
| Reject::RBFRejected(..)
) {
if reject.should_recorded() {
self.put_recent_reject(&tx_hash, reject).await;
}
}
Expand Down Expand Up @@ -514,12 +517,7 @@ impl TxPoolService {
}
Err(reject) => {
debug!("after_process {} reject: {} ", tx_hash, reject);
if matches!(
reject,
Reject::Resolve(..)
| Reject::Verification(..)
| Reject::RBFRejected(..)
) {
if reject.should_recorded() {
self.put_recent_reject(&tx_hash, reject).await;
}
}
Expand Down Expand Up @@ -628,12 +626,7 @@ impl TxPoolService {
tx_hash: orphan.tx.hash(),
});
}
if matches!(
reject,
Reject::Resolve(..)
| Reject::Verification(..)
| Reject::RBFRejected(..)
) {
if reject.should_recorded() {
self.put_recent_reject(&orphan.tx.hash(), &reject).await;
}
}
Expand Down Expand Up @@ -1259,8 +1252,3 @@ fn _update_tx_pool_for_reorg(
// Remove transactions from the pool until its size <= size_limit.
let _ = tx_pool.limit_size(callbacks, None);
}

pub fn all_inputs_is_unknown(snapshot: &Snapshot, tx: &TransactionView) -> bool {
!tx.input_pts_iter()
.any(|pt| snapshot.transaction_exists(&pt.tx_hash()))
}
13 changes: 12 additions & 1 deletion tx-pool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ impl TxPoolController {
send_message!(self, GetTransactionWithStatus, hash)
}

/// Return txs for network
/// Mainly used for compact block reconstruction and block proposal pre-broadcasting
/// Orphan/conflicted/etc transactions that are returned for compact block reconstruction.
pub fn fetch_txs(
&self,
short_ids: HashSet<ProposalShortId>,
Expand All @@ -304,6 +305,7 @@ impl TxPoolController {
}

/// Return txs with cycles
/// Mainly for relay transactions
pub fn fetch_txs_with_cycles(
&self,
short_ids: HashSet<ProposalShortId>,
Expand Down Expand Up @@ -677,6 +679,13 @@ pub enum TxVerificationResult {
/// transaction hash
tx_hash: Byte32,
},
/// tx parent is unknown
UnknownParents {
/// original peer
peer: PeerIndex,
/// parents hashes
parents: HashSet<Byte32>,
},
/// tx is rejected
Reject {
/// transaction hash
Expand Down Expand Up @@ -852,11 +861,13 @@ async fn process(mut service: TxPoolService, message: Message) {
arguments: short_ids,
}) => {
let tx_pool = service.tx_pool.read().await;
let orphan = service.orphan.read().await;
let txs = short_ids
.into_iter()
.filter_map(|short_id| {
tx_pool
.get_tx_from_pool_or_store(&short_id)
.or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned())
.map(|tx| (short_id, tx))
})
.collect();
Expand Down
5 changes: 5 additions & 0 deletions util/types/src/core/tx_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ impl Reject {
}
}

/// Returns true if the reject should be recorded.
pub fn should_recorded(&self) -> bool {
!matches!(self, Reject::Duplicated(..))
}

/// Returns true if tx can be resubmitted, allowing relay
/// * Declared wrong cycles should allow relay with the correct cycles
/// * Reject but is not malformed and the fee rate reached the threshold,
Expand Down
7 changes: 7 additions & 0 deletions util/types/src/core/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,13 @@ impl TransactionView {
pub fn proposal_short_id(&self) -> packed::ProposalShortId {
packed::ProposalShortId::from_tx_hash(&self.hash())
}

/// return deduplicate parent tx_hashes
pub fn unique_parents(&self) -> HashSet<packed::Byte32> {
self.input_pts_iter()
.map(|outpoint| outpoint.tx_hash())
.collect()
}
}

impl ExtraHashView {
Expand Down

0 comments on commit 2c3cfb9

Please sign in to comment.