Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransactionPool API uses async_trait #6528

Merged
merged 10 commits into from
Nov 19, 2024
Prev Previous commit
Next Next commit
more fixes
michalkucharczyk committed Nov 18, 2024
commit 767edd40dd1a82cea5e0a88b3ec2c05f47735f76
4 changes: 1 addition & 3 deletions substrate/bin/node/bench/src/construct.rs
Original file line number Diff line number Diff line change
@@ -264,9 +264,7 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
&self,
_at: Self::Hash,
) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
let iter: Box<dyn ReadyTransactions<Item = Arc<PoolTransaction>> + Send> =
Box::new(TransactionsIterator(self.0.clone().into_iter()));
iter
Box::new(TransactionsIterator(self.0.clone().into_iter()))
}

fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
Original file line number Diff line number Diff line change
@@ -108,11 +108,10 @@ impl TransactionPool for MiddlewarePool {
source: TransactionSource,
xt: TransactionFor<Self>,
) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
let pool = self.inner_pool.clone();
let sender = self.sender.clone();
let transaction = hex_string(&xt.encode());
let sender = self.sender.clone();

let watcher = match pool.submit_and_watch(at, source, xt).await {
let watcher = match self.inner_pool.submit_and_watch(at, source, xt).await {
Ok(watcher) => watcher,
Err(err) => {
let _ = sender.send(MiddlewarePoolEvent::PoolError {
8 changes: 4 additions & 4 deletions substrate/client/transaction-pool/api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -279,8 +279,8 @@ pub trait TransactionPool: Send + Sync {
// *** Block production / Networking
/// Get an iterator for ready transactions ordered by priority.
///
/// Guarantees to resolve only when transaction pool got updated at `at` block.
/// Guarantees to resolve immediately when `None` is passed.
/// Guaranteed to resolve only when transaction pool got updated at `at` block.
/// Guaranteed to resolve immediately when `None` is passed.
async fn ready_at(
&self,
at: <Self::Block as BlockT>::Hash,
@@ -316,8 +316,8 @@ pub trait TransactionPool: Send + Sync {

/// Asynchronously returns a set of ready transaction at given block within given timeout.
///
/// If the timeout is hit during method execution then the best effort (without executing full
/// maintain process) set of ready transactions for given block, is returned.
/// If the timeout is hit during method execution, then the best effort (without executing full
/// maintain process) set of ready transactions for given block is returned.
async fn ready_at_with_timeout(
&self,
at: <Self::Block as BlockT>::Hash,
Original file line number Diff line number Diff line change
@@ -110,8 +110,6 @@ where
}
}

type PolledIterator<PoolApi> = Pin<Box<dyn Future<Output = ReadyIteratorFor<PoolApi>> + Send>>;

/// The fork-aware transaction pool.
///
/// It keeps track of every fork and provides the set of transactions that is valid for every fork.
@@ -377,14 +375,13 @@ where
///
/// Pruning is just rebuilding the underlying transactions graph, no validations are executed,
/// so this process shall be fast.
pub fn ready_at_light(&self, at: Block::Hash) -> PolledIterator<ChainApi> {
pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor<ChainApi> {
let start = Instant::now();
let api = self.api.clone();
log::trace!(target: LOG_TARGET, "fatp::ready_at_light {:?}", at);

let Ok(block_number) = self.api.resolve_block_number(at) else {
let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
return Box::pin(async { empty })
return Box::new(std::iter::empty())
};

let best_result = {
@@ -403,57 +400,53 @@ where
)
};

Box::pin(async move {
if let Ok((Some(best_tree_route), Some(best_view))) = best_result {
let tmp_view: View<ChainApi> = View::new_from_other(
&best_view,
&HashAndNumber { hash: at, number: block_number },
);

let mut all_extrinsics = vec![];
if let Ok((Some(best_tree_route), Some(best_view))) = best_result {
let tmp_view: View<ChainApi> =
View::new_from_other(&best_view, &HashAndNumber { hash: at, number: block_number });

for h in best_tree_route.enacted() {
let extrinsics = api
.block_body(h.hash)
.await
.unwrap_or_else(|e| {
log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e);
None
})
.unwrap_or_default()
.into_iter()
.map(|t| api.hash_and_length(&t).0);
all_extrinsics.extend(extrinsics);
}
let mut all_extrinsics = vec![];

let before_count = tmp_view.pool.validated_pool().status().ready;
let tags = tmp_view
.pool
.validated_pool()
.extrinsics_tags(&all_extrinsics)
for h in best_tree_route.enacted() {
let extrinsics = api
.block_body(h.hash)
.await
.unwrap_or_else(|e| {
log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e);
None
})
.unwrap_or_default()
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>();
let _ = tmp_view.pool.validated_pool().prune_tags(tags);

let after_count = tmp_view.pool.validated_pool().status().ready;
log::debug!(target: LOG_TARGET,
"fatp::ready_at_light {} from {} before: {} to be removed: {} after: {} took:{:?}",
at,
best_view.at.hash,
before_count,
all_extrinsics.len(),
after_count,
start.elapsed()
);
Box::new(tmp_view.pool.validated_pool().ready())
} else {
let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
log::debug!(target: LOG_TARGET, "fatp::ready_at_light {} -> empty, took:{:?}", at, start.elapsed());
empty
.map(|t| api.hash_and_length(&t).0);
all_extrinsics.extend(extrinsics);
}
})

let before_count = tmp_view.pool.validated_pool().status().ready;
let tags = tmp_view
.pool
.validated_pool()
.extrinsics_tags(&all_extrinsics)
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>();
let _ = tmp_view.pool.validated_pool().prune_tags(tags);

let after_count = tmp_view.pool.validated_pool().status().ready;
log::debug!(target: LOG_TARGET,
"fatp::ready_at_light {} from {} before: {} to be removed: {} after: {} took:{:?}",
at,
best_view.at.hash,
before_count,
all_extrinsics.len(),
after_count,
start.elapsed()
);
Box::new(tmp_view.pool.validated_pool().ready())
} else {
let empty: ReadyIteratorFor<ChainApi> = Box::new(std::iter::empty());
log::debug!(target: LOG_TARGET, "fatp::ready_at_light {} -> empty, took:{:?}", at, start.elapsed());
empty
}
}

/// Waits for the set of ready transactions for a given block up to a specified timeout.
@@ -496,15 +489,18 @@ where

let fall_back_ready = self.ready_at_light(at);
let (maybe_ready, fall_back_ready) =
futures::future::join(maybe_ready.boxed(), fall_back_ready.boxed()).await;
futures::future::join(maybe_ready, fall_back_ready).await;
maybe_ready.unwrap_or(fall_back_ready)
}

fn ready_at_internal(&self, at: Block::Hash) -> (bool, PolledIterator<ChainApi>) {
fn ready_at_internal(
&self,
at: Block::Hash,
) -> (bool, Pin<Box<dyn Future<Output = ReadyIteratorFor<ChainApi>> + Send>>) {
let mut ready_poll = self.ready_poll.lock();

if let Some((view, inactive)) = self.view_store.get_view_at(at, true) {
log::debug!(target: LOG_TARGET, "fatp::ready_at {at:?} (inactive:{inactive:?})");
log::debug!(target: LOG_TARGET, "fatp::ready_at_internal {at:?} (inactive:{inactive:?})");
let iterator: ReadyIteratorFor<ChainApi> = Box::new(view.pool.validated_pool().ready());
return (true, async move { iterator }.boxed());
}
@@ -519,7 +515,7 @@ where
})
.boxed();
log::debug!(target: LOG_TARGET,
"fatp::ready_at {at:?} pending keys: {:?}",
"fatp::ready_at_internal {at:?} pending keys: {:?}",
ready_poll.pollers.keys()
);
(false, pending)