diff --git a/test/src/main.rs b/test/src/main.rs index eb13237448..9319c462dc 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -402,6 +402,7 @@ fn all_specs() -> Vec> { Box::new(GetRawTxPool), Box::new(PoolReconcile), Box::new(PoolResurrect), + Box::new(PoolResolveConflictAfterReorg), Box::new(InvalidHeaderDep), #[cfg(not(target_os = "windows"))] Box::new(PoolPersisted), diff --git a/test/src/specs/tx_pool/pool_reconcile.rs b/test/src/specs/tx_pool/pool_reconcile.rs index 9935fdcd0b..d45a505c86 100644 --- a/test/src/specs/tx_pool/pool_reconcile.rs +++ b/test/src/specs/tx_pool/pool_reconcile.rs @@ -1,6 +1,16 @@ use crate::node::waiting_for_sync; +use crate::node::{connect_all, disconnect_all}; +use crate::util::check::is_transaction_proposed; +use crate::util::mining::out_ibd_mode; use crate::{Node, Spec}; +use ckb_jsonrpc_types::ProposalShortId; use ckb_logger::info; +use ckb_types::core::{capacity_bytes, Capacity, FeeRate}; +use ckb_types::packed::CellOutputBuilder; +use ckb_types::{ + packed::{self, CellInput, OutPoint}, + prelude::*, +}; pub struct PoolReconcile; @@ -47,3 +57,123 @@ impl Spec for PoolReconcile { .is_none()); } } + +pub struct PoolResolveConflictAfterReorg; + +impl Spec for PoolResolveConflictAfterReorg { + crate::setup!(num_nodes: 2); + + fn run(&self, nodes: &mut Vec) { + out_ibd_mode(nodes); + connect_all(nodes); + + let node0 = &nodes[0]; + let node1 = &nodes[1]; + + node0.mine_until_out_bootstrap_period(); + waiting_for_sync(nodes); + + info!("Use generated block's cellbase as tx input"); + let tx1 = node0.new_transaction_spend_tip_cellbase(); + // build txs chain + let mut txs = vec![tx1.clone()]; + while txs.len() < 3 { + let parent = txs.last().unwrap(); + let child = parent + .as_advanced_builder() + .set_inputs(vec![{ + CellInput::new_builder() + .previous_output(OutPoint::new(parent.hash(), 0)) + .build() + }]) + .set_outputs(vec![parent.output(0).unwrap()]) + .build(); + txs.push(child); + } + assert_eq!(txs.len(), 3); + + info!("Disconnect nodes"); + disconnect_all(nodes); + + info!("submit tx1 chain to node0"); + let ret = node0 + .rpc_client() + .send_transaction_result(tx1.data().into()); + assert!(ret.is_ok()); + + let target: ProposalShortId = packed::ProposalShortId::from_tx_hash(&tx1.hash()).into(); + let last = + node0.mine_with_blocking(|template| !template.proposals.iter().any(|id| id == &target)); + node0.mine_with_blocking(|template| template.number.value() != (last + 1)); + for tx in txs[1..].iter() { + let ret = node0.rpc_client().send_transaction_result(tx.data().into()); + assert!(ret.is_ok()); + } + let block = node0 + .new_block_builder_with_blocking(|template| { + !(template + .transactions + .iter() + .any(|tx| tx.hash == tx1.hash().unpack())) + }) + .set_proposals(txs.iter().map(|tx| tx.proposal_short_id()).collect()) + .build(); + node0.submit_block(&block); + + node0.mine_with_blocking(|template| template.number.value() != (block.number() + 1)); + for tx in txs[1..].iter() { + assert!(is_transaction_proposed(node0, tx)); + } + + info!("Tx1 mined by node1"); + assert!(node0 + .rpc_client() + .get_transaction(tx1.hash()) + .unwrap() + .tx_status + .block_hash + .is_some()); + + let tip_number0 = node0.get_tip_block_number(); + info!("Mine until node1 > node0"); + while node1.get_tip_block_number() < tip_number0 + 1 { + let proposed_block = node1 + .new_block_builder(None, None, None) + .set_proposals(txs.iter().map(|tx| tx.proposal_short_id()).collect()) + .build(); + node1.submit_block(&proposed_block); + } + + info!("Connect node0 to node1"); + node0.connect(node1); + + waiting_for_sync(nodes); + + for tx in txs.iter() { + assert!(is_transaction_proposed(node0, tx)); + } + + let conflict_tx = tx1 + .as_advanced_builder() + .set_inputs(vec![{ + CellInput::new_builder() + .previous_output(OutPoint::new(tx1.hash(), 0)) + .build() + }]) + .set_outputs(vec![CellOutputBuilder::default() + .capacity(capacity_bytes!(99).pack()) + .build()]) + .build(); + + let ret = node0 + .rpc_client() + .send_transaction_result(conflict_tx.data().into()); + assert!(ret.is_err()); + let err_msg = ret.err().unwrap().to_string(); + assert!(err_msg.contains("Resolve failed Dead")); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_fee_rate = FeeRate::from_u64(0); + } +} diff --git a/tx-pool/src/block_assembler/mod.rs b/tx-pool/src/block_assembler/mod.rs index 27e7e0a5cc..5fb74b0dd7 100644 --- a/tx-pool/src/block_assembler/mod.rs +++ b/tx-pool/src/block_assembler/mod.rs @@ -572,9 +572,10 @@ impl BlockAssembler { { error!( "resolve transactions when build block template, \ - tip_number: {}, tip_hash: {}, error: {:?}", + tip_number: {}, tip_hash: {}, tx_hash: {}, error: {:?}", tip_header.number(), tip_header.hash(), + entry.transaction().hash(), err ); None diff --git a/tx-pool/src/component/container.rs b/tx-pool/src/component/container.rs index 1ecdcf27bb..dbc73cd7ab 100644 --- a/tx-pool/src/component/container.rs +++ b/tx-pool/src/component/container.rs @@ -185,6 +185,12 @@ impl TxLinksMap { .map(|links| links.children.insert(child)) } + fn add_parent(&mut self, short_id: &ProposalShortId, parent: ProposalShortId) -> Option { + self.inner + .get_mut(short_id) + .map(|links| links.parents.insert(parent)) + } + fn clear(&mut self) { self.inner.clear(); } @@ -193,10 +199,10 @@ impl TxLinksMap { #[derive(Debug, Clone)] pub(crate) struct SortedTxMap { entries: HashMap, - sorted_index: BTreeSet, + pub(crate) sorted_index: BTreeSet, deps: HashMap>, /// A map track transaction ancestors and descendants - links: TxLinksMap, + pub(crate) links: TxLinksMap, max_ancestors_count: usize, } @@ -219,6 +225,11 @@ impl SortedTxMap { self.entries.iter() } + // Usually when a new transaction is added to the pool, it has no in-pool + // children (because any such children would be an orphan). So in add_entry(), we: + // - update a new entry's parents set to include all in-pool parents + // - update the new entry's parents to include the new tx as a child + // - update all ancestors of the transaction to include the new tx's size/fee pub fn add_entry(&mut self, mut entry: TxEntry) -> Result { let short_id = entry.proposal_short_id(); @@ -287,6 +298,35 @@ impl SortedTxMap { Ok(true) } + // update_descendants_from_detached is used to update + // the descendants for a single transaction that has been added to the + // pool but may have child transactions in the pool, eg during a + // chain reorg. + pub fn update_descendants_from_detached( + &mut self, + id: &ProposalShortId, + children: HashSet, + ) { + if let Some(entry) = self.entries.get(id).cloned() { + for child in &children { + self.links.add_parent(child, id.clone()); + } + if let Some(links) = self.links.inner.get_mut(id) { + links.children.extend(children); + } + + let descendants = self.calc_descendants(id); + for desc_id in &descendants { + if let Some(desc_entry) = self.entries.get_mut(desc_id) { + let deleted = self.sorted_index.remove(&desc_entry.as_sorted_key()); + debug_assert!(deleted, "pool inconsistent"); + desc_entry.add_entry_weight(&entry); + self.sorted_index.insert(desc_entry.as_sorted_key()); + } + } + } + } + pub fn contains_key(&self, id: &ProposalShortId) -> bool { self.entries.contains_key(id) } diff --git a/tx-pool/src/component/proposed.rs b/tx-pool/src/component/proposed.rs index 966a2dec7d..e72150aea5 100644 --- a/tx-pool/src/component/proposed.rs +++ b/tx-pool/src/component/proposed.rs @@ -55,6 +55,10 @@ impl Edges { self.outputs.insert(out_point, None); } + pub(crate) fn insert_consumed_output(&mut self, out_point: OutPoint, id: ProposalShortId) { + self.outputs.insert(out_point, Some(id)); + } + pub(crate) fn get_output_ref(&self, out_point: &OutPoint) -> Option<&Option> { self.outputs.get(out_point) } @@ -63,6 +67,10 @@ impl Edges { self.inputs.get(out_point) } + pub(crate) fn get_deps_ref(&self, out_point: &OutPoint) -> Option<&HashSet> { + self.deps.get(out_point) + } + pub(crate) fn get_mut_output( &mut self, out_point: &OutPoint, @@ -107,6 +115,9 @@ pub struct ProposedPool { impl CellProvider for ProposedPool { fn cell(&self, out_point: &OutPoint, _eager_load: bool) -> CellStatus { + if self.edges.get_input_ref(out_point).is_some() { + return CellStatus::Dead; + } if let Some(x) = self.edges.get_output_ref(out_point) { // output consumed if x.is_some() { @@ -119,15 +130,15 @@ impl CellProvider for ProposedPool { return CellStatus::live_cell(cell_meta); } } - if self.edges.get_input_ref(out_point).is_some() { - return CellStatus::Dead; - } CellStatus::Unknown } } impl CellChecker for ProposedPool { fn is_live(&self, out_point: &OutPoint) -> Option { + if self.edges.get_input_ref(out_point).is_some() { + return Some(false); + } if let Some(x) = self.edges.get_output_ref(out_point) { // output consumed if x.is_some() { @@ -136,9 +147,6 @@ impl CellChecker for ProposedPool { return Some(true); } } - if self.edges.get_input_ref(out_point).is_some() { - return Some(false); - } None } } @@ -281,6 +289,76 @@ impl ProposedPool { }) } + // In the event of a reorg, the assumption that a newly added tx has no + // in-pool children is false. In particular, the pool is in an + // inconsistent state while new transactions are being added, because there may + // be descendant transactions of a tx coming from a disconnected block that are + // unreachable from just looking at transactions in the pool (the linking + // transactions may also be in the disconnected block, waiting to be added). + // Because of this, there's not much benefit in trying to search for in-pool + // children in add_entry(). Instead, in the special case of transactions + // being added from a disconnected block, out-of-block descendants for all the + // in-block transactions by calling update_descendants_from_detached(). Note that + // until this is called, the pool state is not consistent, and in particular + // TxLinks may not be correct (and therefore functions like + // calc_ancestors() and calc_descendants() that rely + // on them to walk the pool are not generally safe to use). + pub(crate) fn add_entry_from_detached(&mut self, entry: TxEntry) -> Result { + let tx_short_id = entry.proposal_short_id(); + + if self.inner.contains_key(&tx_short_id) { + return Ok(false); + } + + let inputs = entry.transaction().input_pts_iter(); + let outputs = entry.transaction().output_pts(); + let related_dep_out_points: Vec<_> = entry.related_dep_out_points().cloned().collect(); + let header_deps = entry.transaction().header_deps(); + + self.inner.add_entry(entry).map(|inserted| { + if inserted { + let mut children = HashSet::new(); + // if input reference a in-pool output, connect it + // otherwise, record input for conflict check + for i in inputs { + if let Some(id) = self.edges.get_mut_output(&i) { + *id = Some(tx_short_id.clone()); + } + self.edges.insert_input(i.to_owned(), tx_short_id.clone()); + } + + // record dep-txid + for d in related_dep_out_points { + self.edges.insert_deps(d.to_owned(), tx_short_id.clone()); + } + + // record tx output + for o in outputs { + if let Some(ids) = self.edges.get_deps_ref(&o).cloned() { + children.extend(ids); + } + if let Some(id) = self.edges.get_input_ref(&o).cloned() { + self.edges.insert_consumed_output(o, id.clone()); + children.insert(id); + } else { + self.edges.insert_output(o); + } + } + + // record header_deps + if !header_deps.is_empty() { + self.edges + .header_deps + .insert(tx_short_id.clone(), header_deps.into_iter().collect()); + } + + self.inner + .update_descendants_from_detached(&tx_short_id, children); + } + inserted + }) + } + pub(crate) fn resolve_conflict(&mut self, tx: &TransactionView) -> Vec { let inputs = tx.input_pts_iter(); let mut conflicts = Vec::new(); diff --git a/tx-pool/src/component/tests/proposed.rs b/tx-pool/src/component/tests/proposed.rs index cbd3fa491d..3c72432238 100644 --- a/tx-pool/src/component/tests/proposed.rs +++ b/tx-pool/src/component/tests/proposed.rs @@ -74,6 +74,115 @@ fn test_add_entry() { assert_eq!(pool.edges.inputs_len(), 1); } +#[test] +fn test_add_entry_from_detached() { + let tx1 = build_tx(vec![(&Byte32::zero(), 1), (&Byte32::zero(), 2)], 1); + let tx1_hash = tx1.hash(); + let tx2 = build_tx(vec![(&tx1_hash, 0)], 1); + let tx2_hash = tx2.hash(); + let tx3 = build_tx_with_dep(vec![(&Byte32::zero(), 0)], vec![(&tx2_hash, 0)], 1); + + let entry1 = TxEntry::new(dummy_resolve(tx1.clone(), |_| None), 1, MOCK_FEE, 1); + let entry2 = TxEntry::new(dummy_resolve(tx2, |_| None), 1, MOCK_FEE, 1); + let entry3 = TxEntry::new(dummy_resolve(tx3, |_| None), 1, MOCK_FEE, 1); + + let id1 = entry1.proposal_short_id(); + let id2 = entry2.proposal_short_id(); + let id3 = entry3.proposal_short_id(); + + let mut pool = ProposedPool::new(DEFAULT_MAX_ANCESTORS_COUNT); + pool.add_entry(entry1.clone()).unwrap(); + pool.add_entry(entry2.clone()).unwrap(); + pool.add_entry(entry3).unwrap(); + + assert_eq!(pool.size(), 3); + assert_eq!(pool.edges.outputs_len(), 3); + assert_eq!(pool.edges.inputs_len(), 4); + + assert_eq!(pool.inner().sorted_index.len(), 3); + + let expected = vec![(id1.clone(), 1), (id2.clone(), 2), (id3.clone(), 3)]; + for (idx, key) in pool.inner().sorted_index.iter().enumerate() { + assert_eq!(key.id, expected[idx].0); + assert_eq!(key.ancestors_size, expected[idx].1); + } + + // check link + { + assert!(pool.inner().links.get_parents(&id1).unwrap().is_empty()); + assert_eq!( + pool.inner().links.get_children(&id1).unwrap(), + &HashSet::from_iter(vec![id2.clone()].into_iter()) + ); + + assert_eq!( + pool.inner().links.get_parents(&id2).unwrap(), + &HashSet::from_iter(vec![id1.clone()].into_iter()) + ); + assert_eq!( + pool.inner() + .links + .get_children(&entry2.proposal_short_id()) + .unwrap(), + &HashSet::from_iter(vec![id3.clone()].into_iter()) + ); + + assert_eq!( + pool.inner().links.get_parents(&id3).unwrap(), + &HashSet::from_iter(vec![id2.clone()].into_iter()) + ); + assert!(pool.inner().links.get_children(&id3).unwrap().is_empty()); + } + + pool.remove_committed_tx(&tx1); + assert_eq!(pool.edges.outputs_len(), 2); + assert_eq!(pool.edges.inputs_len(), 2); + assert_eq!(pool.inner().sorted_index.len(), 2); + + let removed_expected = vec![(id2.clone(), 1), (id3.clone(), 2)]; + for (idx, key) in pool.inner().sorted_index.iter().enumerate() { + assert_eq!(key.id, removed_expected[idx].0); + assert_eq!(key.ancestors_size, removed_expected[idx].1); + } + assert!(pool + .inner() + .links + .get_parents(&entry2.proposal_short_id()) + .unwrap() + .is_empty()); + + assert!(pool.add_entry_from_detached(entry1).unwrap()); + for (idx, key) in pool.inner().sorted_index.iter().enumerate() { + assert_eq!(key.id, expected[idx].0); + assert_eq!(key.ancestors_size, expected[idx].1); + } + { + assert!(pool.inner().links.get_parents(&id1).unwrap().is_empty()); + assert_eq!( + pool.inner().links.get_children(&id1).unwrap(), + &HashSet::from_iter(vec![id2.clone()].into_iter()) + ); + + assert_eq!( + pool.inner().links.get_parents(&id2).unwrap(), + &HashSet::from_iter(vec![id1].into_iter()) + ); + assert_eq!( + pool.inner() + .links + .get_children(&entry2.proposal_short_id()) + .unwrap(), + &HashSet::from_iter(vec![id3.clone()].into_iter()) + ); + + assert_eq!( + pool.inner().links.get_parents(&id3).unwrap(), + &HashSet::from_iter(vec![id2].into_iter()) + ); + assert!(pool.inner().links.get_children(&id3).unwrap().is_empty()); + } +} + #[test] fn test_add_roots() { let tx1 = build_tx(vec![(&Byte32::zero(), 1), (&Byte32::zero(), 2)], 1); diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index c0aabb3ee6..9693598c4f 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -169,6 +169,12 @@ impl TxPool { self.proposed.add_entry(entry) } + /// Add detached transactions back to the proposed. + pub fn add_proposed_from_detached(&mut self, entry: TxEntry) -> Result { + trace!("add_proposed_from_detached {}", entry.transaction().hash()); + self.proposed.add_entry_from_detached(entry) + } + /// Returns true if the tx-pool contains a tx with specified id. pub fn contains_proposal_id(&self, id: &ProposalShortId) -> bool { self.pending.contains_key(id) || self.gap.contains_key(id) || self.proposed.contains_key(id) diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 3fc296fd5e..8f83132890 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -752,7 +752,9 @@ impl TxPoolService { verify_rtx(snapshot, &rtx, &tx_env, &verify_cache, max_cycles) { let entry = TxEntry::new(rtx, verified.cycles, fee, tx_size); - if let Err(e) = _submit_entry(tx_pool, status, entry, &self.callbacks) { + if let Err(e) = + _submit_entry_from_detached(tx_pool, status, entry, &self.callbacks) + { error!("readd_detached_tx submit_entry {} error {}", tx_hash, e); } else { debug!("readd_detached_tx submit_entry {}", tx_hash); @@ -867,6 +869,42 @@ fn _submit_entry( Ok(()) } +fn _submit_entry_from_detached( + tx_pool: &mut TxPool, + status: TxStatus, + entry: TxEntry, + callbacks: &Callbacks, +) -> Result<(), Reject> { + let tx_hash = entry.transaction().hash(); + match status { + TxStatus::Fresh => { + if tx_pool.add_pending(entry.clone()) { + debug!("_submit_entry_from_detached pending {}", tx_hash); + callbacks.call_pending(tx_pool, &entry); + } else { + return Err(Reject::Duplicated(tx_hash)); + } + } + TxStatus::Gap => { + if tx_pool.add_gap(entry.clone()) { + debug!("_submit_entry_from_detached gap {}", tx_hash); + callbacks.call_pending(tx_pool, &entry); + } else { + return Err(Reject::Duplicated(tx_hash)); + } + } + TxStatus::Proposed => { + if tx_pool.add_proposed_from_detached(entry.clone())? { + debug!("_submit_entry_from_detached proposed {}", tx_hash); + callbacks.call_proposed(tx_pool, &entry, true); + } else { + return Err(Reject::Duplicated(tx_hash)); + } + } + } + Ok(()) +} + fn _update_tx_pool_for_reorg( tx_pool: &mut TxPool, attached: &LinkedHashSet,