Skip to content

Commit

Permalink
add fix_entries_by_invalidators with topological_sort to fix order is…
Browse files Browse the repository at this point in the history
…sues
  • Loading branch information
chenyukang committed Feb 19, 2024
1 parent 61d97fc commit 0702a20
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 20 deletions.
2 changes: 1 addition & 1 deletion test/src/specs/tx_pool/dead_cell_deps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ impl Spec for SendTxCellRefCellConsume {
let details = node0.get_pool_tx_detail_info(tx_b2.hash());
let invalidated_tx_count: u64 = details.invalidated_tx_count.into();
let ancestors_count: u64 = details.ancestors_count.into();
assert_eq!(invalidated_tx_count, 3); // child's invalidated_tx_count >= parent tx's invalidated_tx_count
assert_eq!(invalidated_tx_count, 0); // child's invalidated_tx_count is not inherit from parent
assert_eq!(ancestors_count, 1);
}

Expand Down
78 changes: 72 additions & 6 deletions tx-pool/src/component/commit_txs_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::component::{entry::TxEntry, sort_key::AncestorsScoreSortKey};
use ckb_types::{core::Cycle, packed::ProposalShortId};
use ckb_util::LinkedHashMap;
use multi_index_map::MultiIndexMap;
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};

// A template data struct used to store modified entries when package txs
#[derive(MultiIndexMap, Clone)]
Expand Down Expand Up @@ -176,11 +176,8 @@ impl<'a> CommitTxsScanner<'a> {

self.update_modified_entries(&ancestors);
}
// sort by invalid_tx_count, from small to large
// for A, B, if A.invalidated_tx_count < B.invalidated_tx_count
// then B is the tx consumed a cell dep, and A is the tx that cell dep
// so the result in order will be [A, B]
self.entries.sort_by_key(|entry| entry.invalidated_tx_count);

self.fix_entries_by_invalidators();
(self.entries, size, cycles)
}

Expand All @@ -190,6 +187,75 @@ impl<'a> CommitTxsScanner<'a> {
.or_else(|| self.pool_map.get_proposed(short_id))
}

// If there is any transaction may invalidate other transactions
// we may need to fix the order so that manimize the number of failed transactions
// caused by order, for instance, B cell dep A, C consume A, the better order is
// [A, B, C], while the order [A, C, B] will make B failed
fn fix_entries_by_invalidators(&mut self) {
if self
.entries
.iter()
.all(|entry| entry.invalidated_tx_count == 0)
{
return;
}

let mut edges = vec![];
for i in 0..self.entries.len() {
for j in (i + 1)..self.entries.len() {
let a = &self.entries[i].proposal_short_id();
let b = &self.entries[j].proposal_short_id();
if self.pool_map.can_invalidate(a, b) {
edges.push((j, i));
} else if self.pool_map.is_parent_child(a, b) {
edges.push((i, j));
}
}
}

if let Some(order) = Self::topological_sort(self.entries.len(), edges) {
let mut new_entries = Vec::with_capacity(self.entries.len());
for i in order {
new_entries.push(self.entries[i].clone());
}
self.entries = new_entries;
}
}

fn topological_sort(n: usize, edges: Vec<(usize, usize)>) -> Option<Vec<usize>> {
let mut graph = vec![vec![]; n];
let mut in_degree = vec![0; n];

for (src, dst) in edges {
graph[src].push(dst);
in_degree[dst] += 1;
}

let mut queue = VecDeque::new();
for (i, &degree) in in_degree.iter().enumerate() {
if degree == 0 {
queue.push_back(i);
}
}

let mut result = Vec::new();
while let Some(node) = queue.pop_front() {
result.push(node);
for &neighbour in &graph[node] {
in_degree[neighbour] -= 1;
if in_degree[neighbour] == 0 {
queue.push_back(neighbour);
}
}
}

if result.len() == n {
Some(result)
} else {
None // graph contains a cycle
}
}

// Skip entries in `proposed` that are already in a block or are present
// in `modified_entries` (which implies that the mapTx ancestor state is
// stale due to ancestor inclusion in the block)
Expand Down
21 changes: 21 additions & 0 deletions tx-pool/src/component/links.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ use std::collections::{HashMap, HashSet};
pub struct TxLinks {
pub parents: HashSet<ProposalShortId>,
pub children: HashSet<ProposalShortId>,
pub invalidators: HashSet<ProposalShortId>,
}

#[derive(Clone, Copy, Eq, PartialEq)]
pub enum Relation {
Parents,
Children,
#[allow(dead_code)]
Invalidators,
}

impl TxLinks {
fn get_direct_ids(&self, relation: Relation) -> &HashSet<ProposalShortId> {
match relation {
Relation::Parents => &self.parents,
Relation::Children => &self.children,
Relation::Invalidators => &self.invalidators,
}
}
}
Expand Down Expand Up @@ -91,6 +95,13 @@ impl TxLinksMap {
self.inner.get(short_id).map(|link| &link.parents)
}

pub fn get_invalidators(
&self,
short_id: &ProposalShortId,
) -> Option<&HashSet<ProposalShortId>> {
self.inner.get(short_id).map(|link| &link.invalidators)
}

pub fn remove(&mut self, short_id: &ProposalShortId) -> Option<TxLinks> {
self.inner.remove(short_id)
}
Expand Down Expand Up @@ -125,6 +136,16 @@ impl TxLinksMap {
.map(|links| links.children.insert(child))
}

pub fn add_invalidator(
&mut self,
short_id: &ProposalShortId,
invalidator: ProposalShortId,
) -> Option<bool> {
self.inner
.get_mut(short_id)
.map(|links| links.invalidators.insert(invalidator))
}

pub fn add_parent(
&mut self,
short_id: &ProposalShortId,
Expand Down
36 changes: 25 additions & 11 deletions tx-pool/src/component/pool_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ impl PoolMap {
self.score_sorted_iter_by(vec![Status::Proposed])
}

pub(crate) fn can_invalidate(
&self,
invalidator: &ProposalShortId,
invalidatee: &ProposalShortId,
) -> bool {
self.links
.get_invalidators(invalidatee)
.map_or(false, |invalidators| invalidators.contains(invalidator))
}

pub(crate) fn is_parent_child(
&self,
parent: &ProposalShortId,
child: &ProposalShortId,
) -> bool {
self.links
.get_children(parent)
.map_or(false, |children| children.contains(child))
}

pub(crate) fn get(&self, id: &ProposalShortId) -> Option<&TxEntry> {
self.get_by_id(id).map(|entry| &entry.inner)
}
Expand Down Expand Up @@ -373,7 +393,6 @@ impl PoolMap {
fn update_ancestors_index_key(&mut self, child: &TxEntry, op: EntryOp) {
let ancestors: HashSet<ProposalShortId> =
self.links.calc_ancestors(&child.proposal_short_id());
let mut invalidate_count = 0;
for anc_id in &ancestors {
// update parent score
self.entries.modify_by_id(anc_id, |e| {
Expand All @@ -382,17 +401,8 @@ impl PoolMap {
EntryOp::Add => e.inner.add_descendant_weight(child),
};
e.evict_key = e.inner.as_evict_key();
invalidate_count += e.inner.invalidated_tx_count;
});
}
// update invalidate count from ancestors
if invalidate_count > 0 {
self.entries
.modify_by_id(&child.proposal_short_id(), |e| {
e.inner.invalidated_tx_count += invalidate_count;
})
.expect("unconsistent pool");
}
}

fn update_descendants_index_key(&mut self, parent: &TxEntry, op: EntryOp) {
Expand Down Expand Up @@ -516,13 +526,17 @@ impl PoolMap {
let invalidated = self
.links
.calc_relation_ids(invalidated, Relation::Children);
for invalidate_short_id in &invalidated {
self.links
.add_invalidator(invalidate_short_id, short_id.clone());
}

entry.invalidated_tx_count = invalidated.len();
self.links.add_link(
short_id,
TxLinks {
parents,
children: Default::default(),
..Default::default()
},
);

Expand Down
4 changes: 2 additions & 2 deletions tx-pool/src/component/tests/proposed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ fn test_add_entry_from_detached() {
assert!(pool.links.get_parents(&id1).unwrap().is_empty());
assert_eq!(
pool.links.get_children(&id1).unwrap(),
&HashSet::from_iter(vec![].into_iter())
&HashSet::from_iter(vec![id2.clone()].into_iter())
);

assert_eq!(
pool.links.get_parents(&id2).unwrap(),
&HashSet::from_iter(vec![].into_iter())
&HashSet::from_iter(vec![id1].into_iter())
);
assert_eq!(
pool.links
Expand Down

0 comments on commit 0702a20

Please sign in to comment.