Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc-testing-utils) : TraceApiExt with trace_call_many Stream Support #5255

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions crates/rpc/rpc-testing-util/src/trace.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Helpers for testing trace calls.
use futures::{Stream, StreamExt};
use jsonrpsee::core::Error as RpcError;

use reth_primitives::{BlockId, Bytes, TxHash};
use reth_rpc_api::clients::TraceApiClient;
use reth_rpc_types::trace::parity::{LocalizedTransactionTrace, TraceResults, TraceType};
use reth_rpc_types::{
trace::parity::{LocalizedTransactionTrace, TraceResults, TraceType},
CallRequest,
};
use std::{
collections::HashSet,
pin::Pin,
Expand All @@ -19,6 +21,13 @@ pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (R

pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>;

/// A type representing the result of calling `trace_call_many` method.

pub type CallManyTraceResult = Result<
(Vec<TraceResults>, Vec<(CallRequest, HashSet<TraceType>)>),
(RpcError, Vec<(CallRequest, HashSet<TraceType>)>),
>;

/// An extension trait for the Trace API.
#[async_trait::async_trait]
pub trait TraceApiExt {
Expand Down Expand Up @@ -59,7 +68,40 @@ pub trait TraceApiExt {
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> RawTransactionTraceStream<'_>;
/// Creates a stream of results for multiple dependent transaction calls on top of the same
/// block.

fn trace_call_many_stream<I>(
&self,
calls: I,
block_id: Option<BlockId>,
) -> CallManyTraceStream<'_>
where
I: IntoIterator<Item = (CallRequest, HashSet<TraceType>)>;
}
/// A stream that provides asynchronous iteration over results from the `trace_call_many` function.
///
/// The stream yields items of type `CallManyTraceResult`.
#[must_use = "streams do nothing unless polled"]
pub struct CallManyTraceStream<'a> {
stream: Pin<Box<dyn Stream<Item = CallManyTraceResult> + 'a>>,
}

impl<'a> Stream for CallManyTraceStream<'a> {
type Item = CallManyTraceResult;
/// Polls for the next item from the stream.

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}

impl<'a> std::fmt::Debug for CallManyTraceStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CallManyTraceStream").finish()
}
}

/// A stream that traces the provided raw transaction data.

#[must_use = "streams do nothing unless polled"]
Expand Down Expand Up @@ -171,6 +213,24 @@ impl<T: TraceApiClient + Sync> TraceApiExt for T {
});
RawTransactionTraceStream { stream: Box::pin(stream) }
}

fn trace_call_many_stream<I>(
&self,
calls: I,
block_id: Option<BlockId>,
) -> CallManyTraceStream<'_>
where
I: IntoIterator<Item = (CallRequest, HashSet<TraceType>)>,
{
let call_set = calls.into_iter().collect::<Vec<_>>();
let stream = futures::stream::once(async move {
match self.trace_call_many(call_set.clone(), block_id).await {
Ok(results) => Ok((results, call_set)),
Err(err) => Err((err, call_set)),
}
});
CallManyTraceStream { stream: Box::pin(stream) }
}
}

/// A stream that yields the traces for the requested blocks.
Expand Down
Loading