Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tx-pool potential inconsistent after reorg occurs #3706

Merged
merged 3 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(GetRawTxPool),
Box::new(PoolReconcile),
Box::new(PoolResurrect),
Box::new(PoolResolveConflictAfterReorg),
Box::new(InvalidHeaderDep),
#[cfg(not(target_os = "windows"))]
Box::new(PoolPersisted),
Expand Down
130 changes: 130 additions & 0 deletions test/src/specs/tx_pool/pool_reconcile.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use crate::node::waiting_for_sync;
use crate::node::{connect_all, disconnect_all};
use crate::util::check::is_transaction_proposed;
use crate::util::mining::out_ibd_mode;
use crate::{Node, Spec};
use ckb_jsonrpc_types::ProposalShortId;
use ckb_logger::info;
use ckb_types::core::{capacity_bytes, Capacity, FeeRate};
use ckb_types::packed::CellOutputBuilder;
use ckb_types::{
packed::{self, CellInput, OutPoint},
prelude::*,
};

pub struct PoolReconcile;

Expand Down Expand Up @@ -47,3 +57,123 @@ impl Spec for PoolReconcile {
.is_none());
}
}

pub struct PoolResolveConflictAfterReorg;

impl Spec for PoolResolveConflictAfterReorg {
crate::setup!(num_nodes: 2);

fn run(&self, nodes: &mut Vec<Node>) {
out_ibd_mode(nodes);
connect_all(nodes);

let node0 = &nodes[0];
let node1 = &nodes[1];

node0.mine_until_out_bootstrap_period();
waiting_for_sync(nodes);

info!("Use generated block's cellbase as tx input");
let tx1 = node0.new_transaction_spend_tip_cellbase();
// build txs chain
let mut txs = vec![tx1.clone()];
while txs.len() < 3 {
let parent = txs.last().unwrap();
let child = parent
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
.previous_output(OutPoint::new(parent.hash(), 0))
.build()
}])
.set_outputs(vec![parent.output(0).unwrap()])
.build();
txs.push(child);
}
assert_eq!(txs.len(), 3);

info!("Disconnect nodes");
disconnect_all(nodes);

info!("submit tx1 chain to node0");
let ret = node0
.rpc_client()
.send_transaction_result(tx1.data().into());
assert!(ret.is_ok());

let target: ProposalShortId = packed::ProposalShortId::from_tx_hash(&tx1.hash()).into();
let last =
node0.mine_with_blocking(|template| !template.proposals.iter().any(|id| id == &target));
node0.mine_with_blocking(|template| template.number.value() != (last + 1));
for tx in txs[1..].iter() {
let ret = node0.rpc_client().send_transaction_result(tx.data().into());
assert!(ret.is_ok());
}
let block = node0
.new_block_builder_with_blocking(|template| {
!(template
.transactions
.iter()
.any(|tx| tx.hash == tx1.hash().unpack()))
})
.set_proposals(txs.iter().map(|tx| tx.proposal_short_id()).collect())
.build();
node0.submit_block(&block);

node0.mine_with_blocking(|template| template.number.value() != (block.number() + 1));
for tx in txs[1..].iter() {
assert!(is_transaction_proposed(node0, tx));
}

info!("Tx1 mined by node1");
assert!(node0
.rpc_client()
.get_transaction(tx1.hash())
.unwrap()
.tx_status
.block_hash
.is_some());

let tip_number0 = node0.get_tip_block_number();
info!("Mine until node1 > node0");
while node1.get_tip_block_number() < tip_number0 + 1 {
let proposed_block = node1
.new_block_builder(None, None, None)
.set_proposals(txs.iter().map(|tx| tx.proposal_short_id()).collect())
.build();
node1.submit_block(&proposed_block);
}

info!("Connect node0 to node1");
node0.connect(node1);

waiting_for_sync(nodes);

for tx in txs.iter() {
assert!(is_transaction_proposed(node0, tx));
}

let conflict_tx = tx1
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
.previous_output(OutPoint::new(tx1.hash(), 0))
.build()
}])
.set_outputs(vec![CellOutputBuilder::default()
.capacity(capacity_bytes!(99).pack())
.build()])
.build();

let ret = node0
.rpc_client()
.send_transaction_result(conflict_tx.data().into());
assert!(ret.is_err());
let err_msg = ret.err().unwrap().to_string();
assert!(err_msg.contains("Resolve failed Dead"));
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.tx_pool.min_fee_rate = FeeRate::from_u64(0);
}
}
3 changes: 2 additions & 1 deletion tx-pool/src/block_assembler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,10 @@ impl BlockAssembler {
{
error!(
"resolve transactions when build block template, \
tip_number: {}, tip_hash: {}, error: {:?}",
tip_number: {}, tip_hash: {}, tx_hash: {}, error: {:?}",
tip_header.number(),
tip_header.hash(),
entry.transaction().hash(),
err
);
None
Expand Down
44 changes: 42 additions & 2 deletions tx-pool/src/component/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ impl TxLinksMap {
.map(|links| links.children.insert(child))
}

fn add_parent(&mut self, short_id: &ProposalShortId, parent: ProposalShortId) -> Option<bool> {
self.inner
.get_mut(short_id)
.map(|links| links.parents.insert(parent))
}

fn clear(&mut self) {
self.inner.clear();
}
Expand All @@ -193,10 +199,10 @@ impl TxLinksMap {
#[derive(Debug, Clone)]
pub(crate) struct SortedTxMap {
entries: HashMap<ProposalShortId, TxEntry>,
sorted_index: BTreeSet<AncestorsScoreSortKey>,
pub(crate) sorted_index: BTreeSet<AncestorsScoreSortKey>,
deps: HashMap<OutPoint, HashSet<ProposalShortId>>,
/// A map track transaction ancestors and descendants
links: TxLinksMap,
pub(crate) links: TxLinksMap,
max_ancestors_count: usize,
}

Expand All @@ -219,6 +225,11 @@ impl SortedTxMap {
self.entries.iter()
}

// Usually when a new transaction is added to the pool, it has no in-pool
// children (because any such children would be an orphan). So in add_entry(), we:
// - update a new entry's parents set to include all in-pool parents
// - update the new entry's parents to include the new tx as a child
// - update all ancestors of the transaction to include the new tx's size/fee
pub fn add_entry(&mut self, mut entry: TxEntry) -> Result<bool, Reject> {
let short_id = entry.proposal_short_id();

Expand Down Expand Up @@ -287,6 +298,35 @@ impl SortedTxMap {
Ok(true)
}

// update_descendants_from_detached is used to update
// the descendants for a single transaction that has been added to the
// pool but may have child transactions in the pool, eg during a
// chain reorg.
pub fn update_descendants_from_detached(
&mut self,
id: &ProposalShortId,
children: HashSet<ProposalShortId>,
) {
if let Some(entry) = self.entries.get(id).cloned() {
for child in &children {
self.links.add_parent(child, id.clone());
}
if let Some(links) = self.links.inner.get_mut(id) {
links.children.extend(children);
}

let descendants = self.calc_descendants(id);
for desc_id in &descendants {
if let Some(desc_entry) = self.entries.get_mut(desc_id) {
let deleted = self.sorted_index.remove(&desc_entry.as_sorted_key());
debug_assert!(deleted, "pool inconsistent");
desc_entry.add_entry_weight(&entry);
self.sorted_index.insert(desc_entry.as_sorted_key());
}
}
}
}

pub fn contains_key(&self, id: &ProposalShortId) -> bool {
self.entries.contains_key(id)
}
Expand Down
90 changes: 84 additions & 6 deletions tx-pool/src/component/proposed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl Edges {
self.outputs.insert(out_point, None);
}

pub(crate) fn insert_consumed_output(&mut self, out_point: OutPoint, id: ProposalShortId) {
self.outputs.insert(out_point, Some(id));
}

pub(crate) fn get_output_ref(&self, out_point: &OutPoint) -> Option<&Option<ProposalShortId>> {
self.outputs.get(out_point)
}
Expand All @@ -63,6 +67,10 @@ impl Edges {
self.inputs.get(out_point)
}

pub(crate) fn get_deps_ref(&self, out_point: &OutPoint) -> Option<&HashSet<ProposalShortId>> {
self.deps.get(out_point)
}

pub(crate) fn get_mut_output(
&mut self,
out_point: &OutPoint,
Expand Down Expand Up @@ -107,6 +115,9 @@ pub struct ProposedPool {

impl CellProvider for ProposedPool {
fn cell(&self, out_point: &OutPoint, _eager_load: bool) -> CellStatus {
if self.edges.get_input_ref(out_point).is_some() {
return CellStatus::Dead;
}
if let Some(x) = self.edges.get_output_ref(out_point) {
// output consumed
if x.is_some() {
Expand All @@ -119,15 +130,15 @@ impl CellProvider for ProposedPool {
return CellStatus::live_cell(cell_meta);
}
}
if self.edges.get_input_ref(out_point).is_some() {
return CellStatus::Dead;
}
CellStatus::Unknown
}
}

impl CellChecker for ProposedPool {
fn is_live(&self, out_point: &OutPoint) -> Option<bool> {
if self.edges.get_input_ref(out_point).is_some() {
return Some(false);
}
if let Some(x) = self.edges.get_output_ref(out_point) {
// output consumed
if x.is_some() {
Expand All @@ -136,9 +147,6 @@ impl CellChecker for ProposedPool {
return Some(true);
}
}
if self.edges.get_input_ref(out_point).is_some() {
return Some(false);
}
None
}
}
Expand Down Expand Up @@ -281,6 +289,76 @@ impl ProposedPool {
})
}

// In the event of a reorg, the assumption that a newly added tx has no
// in-pool children is false. In particular, the pool is in an
// inconsistent state while new transactions are being added, because there may
// be descendant transactions of a tx coming from a disconnected block that are
// unreachable from just looking at transactions in the pool (the linking
// transactions may also be in the disconnected block, waiting to be added).
// Because of this, there's not much benefit in trying to search for in-pool
// children in add_entry(). Instead, in the special case of transactions
// being added from a disconnected block, out-of-block descendants for all the
// in-block transactions by calling update_descendants_from_detached(). Note that
// until this is called, the pool state is not consistent, and in particular
// TxLinks may not be correct (and therefore functions like
// calc_ancestors() and calc_descendants() that rely
// on them to walk the pool are not generally safe to use).
pub(crate) fn add_entry_from_detached(&mut self, entry: TxEntry) -> Result<bool, Reject> {
let tx_short_id = entry.proposal_short_id();

if self.inner.contains_key(&tx_short_id) {
return Ok(false);
}

let inputs = entry.transaction().input_pts_iter();
let outputs = entry.transaction().output_pts();
let related_dep_out_points: Vec<_> = entry.related_dep_out_points().cloned().collect();
let header_deps = entry.transaction().header_deps();

self.inner.add_entry(entry).map(|inserted| {
if inserted {
let mut children = HashSet::new();
// if input reference a in-pool output, connect it
// otherwise, record input for conflict check
for i in inputs {
if let Some(id) = self.edges.get_mut_output(&i) {
*id = Some(tx_short_id.clone());
}
self.edges.insert_input(i.to_owned(), tx_short_id.clone());
}

// record dep-txid
for d in related_dep_out_points {
self.edges.insert_deps(d.to_owned(), tx_short_id.clone());
}

// record tx output
for o in outputs {
if let Some(ids) = self.edges.get_deps_ref(&o).cloned() {
children.extend(ids);
}
if let Some(id) = self.edges.get_input_ref(&o).cloned() {
self.edges.insert_consumed_output(o, id.clone());
children.insert(id);
} else {
self.edges.insert_output(o);
}
}

// record header_deps
if !header_deps.is_empty() {
self.edges
.header_deps
.insert(tx_short_id.clone(), header_deps.into_iter().collect());
}

self.inner
.update_descendants_from_detached(&tx_short_id, children);
}
inserted
})
}

pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec<ConflictEntry> {
let inputs = tx.input_pts_iter();
let mut conflicts = Vec::new();
Expand Down
Loading