diff --git a/crates/json-rpc/src/request.rs b/crates/json-rpc/src/request.rs index 0bc74ffbdb5d..e83b0044855b 100644 --- a/crates/json-rpc/src/request.rs +++ b/crates/json-rpc/src/request.rs @@ -112,11 +112,12 @@ where impl Request<&Params> where - Params: Clone, + Params: ToOwned, + Params::Owned: RpcParam, { /// Clone the request, including the request parameters. - pub fn into_owned_params(self) -> Request { - Request { meta: self.meta, params: self.params.clone() } + pub fn into_owned_params(self) -> Request { + Request { meta: self.meta, params: self.params.to_owned() } } } diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index b484b4fe3624..783fda537508 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -28,20 +28,21 @@ extern crate tracing; mod builder; pub use builder::{Identity, ProviderBuilder, ProviderLayer, Stack}; +mod chain; + pub mod ext; pub mod fillers; -pub mod layers; - -mod chain; mod heart; pub use heart::{PendingTransaction, PendingTransactionBuilder, PendingTransactionConfig}; +pub mod layers; + mod provider; pub use provider::{ - EthCall, FilterPollerBuilder, Provider, RootProvider, RpcWithBlock, SendableTx, TraceCallList, - WalletProvider, + EthCall, FilterPollerBuilder, Provider, ProviderCall, RootProvider, RpcWithBlock, SendableTx, + TraceCallList, WalletProvider, }; pub mod utils; diff --git a/crates/provider/src/provider/call.rs b/crates/provider/src/provider/call.rs index 134b4fb54550..2973f7af2543 100644 --- a/crates/provider/src/provider/call.rs +++ b/crates/provider/src/provider/call.rs @@ -1,185 +1,253 @@ -use alloy_eips::BlockId; -use alloy_network::Network; -use alloy_primitives::Bytes; -use alloy_rpc_client::{RpcCall, WeakClient}; -use alloy_rpc_types::state::StateOverride; -use alloy_transport::{Transport, TransportErrorKind, TransportResult}; +use alloy_json_rpc::{RpcParam, RpcReturn}; +use alloy_rpc_client::{RpcCall, Waiter}; +use alloy_transport::{Transport, TransportResult}; use futures::FutureExt; -use serde::ser::SerializeSeq; -use std::{future::Future, task::Poll}; - -type RunningFut<'req, 'state, T, N> = RpcCall, Bytes>; - -#[derive(Clone, Debug)] -struct EthCallParams<'req, 'state, N: Network> { - data: &'req N::TransactionRequest, - block: BlockId, - overrides: Option<&'state StateOverride>, -} - -impl serde::Serialize for EthCallParams<'_, '_, N> { - fn serialize(&self, serializer: S) -> Result { - let len = if self.overrides.is_some() { 3 } else { 2 }; - let mut seq = serializer.serialize_seq(Some(len))?; - seq.serialize_element(&self.data)?; - seq.serialize_element(&self.block)?; - if let Some(overrides) = self.overrides { - seq.serialize_element(overrides)?; - } - seq.end() - } -} - -/// The [`EthCallFut`] future is the future type for an `eth_call` RPC request. -#[derive(Clone, Debug)] -#[doc(hidden)] // Not public API. -pub struct EthCallFut<'req, 'state, T, N>(EthCallFutInner<'req, 'state, T, N>) -where - T: Transport + Clone, - N: Network; - -#[derive(Clone, Debug)] -enum EthCallFutInner<'req, 'state, T, N: Network> +use pin_project::pin_project; +use serde_json::value::RawValue; +use std::{ + future::Future, + pin::Pin, + task::{self, Poll}, +}; +use tokio::sync::oneshot; + +/// The primary future type for the [`Provider`]. +/// +/// This future abstracts over several potential data sources. It allows +/// providers to: +/// - produce data via an [`RpcCall`] +/// - produce data by waiting on a batched RPC [`Waiter`] +/// - proudce data via an arbitrary boxed future +/// - produce data in any synchronous way +/// +/// [`Provider`]: crate::Provider +#[pin_project(project = ProviderCallProj)] +pub enum ProviderCall Output> where - T: Transport + Clone, - N: Network, + Conn: Transport + Clone, + Params: RpcParam, + Map: FnOnce(Resp) -> Output, { - Preparing { - client: WeakClient, - data: &'req N::TransactionRequest, - overrides: Option<&'state StateOverride>, - block: Option, - }, - Running(RunningFut<'req, 'state, T, N>), - Polling, + /// An underlying call to an RPC server. + RpcCall(RpcCall), + /// A waiter for a batched call to a remote RPC server. + Waiter(Waiter), + /// A boxed future. + BoxedFuture(Pin> + Send>>), + /// The output, produces synchronously. + Ready(Option), } -impl<'req, 'state, T, N> EthCallFutInner<'req, 'state, T, N> +impl ProviderCall where - T: Transport + Clone, - N: Network, + Conn: Transport + Clone, + Params: RpcParam, + Map: FnOnce(Resp) -> Output, { - /// Returns `true` if the future is in the preparing state. - const fn is_preparing(&self) -> bool { - matches!(self, Self::Preparing { .. }) + /// Instantiate a new [`ProviderCall`] from the output. + pub const fn ready(output: Output) -> Self { + Self::Ready(Some(output)) } - /// Returns `true` if the future is in the running state. - const fn is_running(&self) -> bool { - matches!(self, Self::Running(..)) + /// True if this is an RPC call. + pub const fn is_rpc_call(&self) -> bool { + matches!(self, Self::RpcCall(_)) } - fn poll_preparing(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - let Self::Preparing { client, data, overrides, block } = - std::mem::replace(self, Self::Polling) - else { - unreachable!("bad state") - }; + /// Fallible cast to [`RpcCall`] + pub const fn as_rpc_call(&self) -> Option<&RpcCall> { + match self { + Self::RpcCall(call) => Some(call), + _ => None, + } + } - let client = match client.upgrade().ok_or_else(TransportErrorKind::backend_gone) { - Ok(client) => client, - Err(e) => return Poll::Ready(Err(e)), - }; + /// Fallible cast to mutable [`RpcCall`] + pub fn as_mut_rpc_call(&mut self) -> Option<&mut RpcCall> { + match self { + Self::RpcCall(call) => Some(call), + _ => None, + } + } - let params = EthCallParams { data, block: block.unwrap_or_default(), overrides }; + /// True if this is a waiter. + pub const fn is_waiter(&self) -> bool { + matches!(self, Self::Waiter(_)) + } - let fut = client.request("eth_call", params); + /// Fallible cast to [`Waiter`] + pub const fn as_waiter(&self) -> Option<&Waiter> { + match self { + Self::Waiter(waiter) => Some(waiter), + _ => None, + } + } - *self = Self::Running(fut); - self.poll_running(cx) + /// Fallible cast to mutable [`Waiter`] + pub fn as_mut_waiter(&mut self) -> Option<&mut Waiter> { + match self { + Self::Waiter(waiter) => Some(waiter), + _ => None, + } } - fn poll_running(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - let Self::Running(ref mut call) = self else { unreachable!("bad state") }; + /// True if this is a boxed future. + pub const fn is_boxed_future(&self) -> bool { + matches!(self, Self::BoxedFuture(_)) + } - call.poll_unpin(cx) + /// Fallible cast to a boxed future. + pub const fn as_boxed_future( + &self, + ) -> Option<&Pin> + Send>>> { + match self { + Self::BoxedFuture(fut) => Some(fut), + _ => None, + } + } + + /// True if this is a ready value. + pub const fn is_ready(&self) -> bool { + matches!(self, Self::Ready(_)) + } + + /// Fallible cast to a ready value. + /// + /// # Panics + /// + /// Panics if the future is already complete + pub const fn as_ready(&self) -> Option<&Output> { + match self { + Self::Ready(Some(output)) => Some(output), + Self::Ready(None) => panic!("tried to access ready value after taking"), + _ => None, + } + } + + /// Set a function to map the response into a different type. This is + /// useful for transforming the response into a more usable type, e.g. + /// changing `U64` to `u64`. + /// + /// This function fails if the inner future is not an [`RpcCall`] or + /// [`Waiter`]. + pub fn map_resp( + self, + map: NewMap, + ) -> Result, Self> + where + NewMap: FnOnce(Resp) -> NewOutput, + { + match self { + Self::RpcCall(call) => Ok(ProviderCall::RpcCall(call.map_resp(map))), + Self::Waiter(waiter) => Ok(ProviderCall::Waiter(waiter.map_resp(map))), + _ => Err(self), + } } } -impl<'req, 'state, T, N> Future for EthCallFut<'req, 'state, T, N> +impl ProviderCall where - T: Transport + Clone, - N: Network, + Conn: Transport + Clone, + Params: RpcParam, + Params: ToOwned, + Params::Owned: RpcParam, + Map: FnOnce(Resp) -> Output, { - type Output = TransportResult; - - fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - let this = &mut self.get_mut().0; - if this.is_preparing() { - this.poll_preparing(cx) - } else if this.is_running() { - this.poll_running(cx) - } else { - panic!("unexpected state") + /// Convert this call into one with owned params, by cloning the params. + /// + /// # Panics + /// + /// Panics if called after the request has been polled. + pub fn into_owned_params(self) -> ProviderCall { + match self { + Self::RpcCall(call) => ProviderCall::RpcCall(call.into_owned_params()), + _ => panic!(), } } } -/// A builder for an `"eth_call"` request. This type is returned by the -/// [`Provider::call`] method. -/// -/// [`Provider::call`]: crate::Provider::call -#[must_use = "EthCall must be awaited to execute the call"] -#[derive(Debug, Clone)] -pub struct EthCall<'req, 'state, T, N> +impl std::fmt::Debug + for ProviderCall where - T: Transport + Clone, - N: Network, + Conn: Transport + Clone, + Params: RpcParam, + Map: FnOnce(Resp) -> Output, { - client: WeakClient, - - data: &'req N::TransactionRequest, - overrides: Option<&'state StateOverride>, - block: Option, + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::RpcCall(call) => f.debug_tuple("RpcCall").field(call).finish(), + Self::Waiter { .. } => f.debug_struct("Waiter").finish_non_exhaustive(), + Self::BoxedFuture(_) => f.debug_struct("BoxedFuture").finish_non_exhaustive(), + Self::Ready(_) => f.debug_struct("Ready").finish_non_exhaustive(), + } + } } -impl<'req, T, N> EthCall<'req, 'static, T, N> +impl From> + for ProviderCall where - T: Transport + Clone, - N: Network, + Conn: Transport + Clone, + Params: RpcParam, + Map: FnOnce(Resp) -> Output, { - /// Create a new CallBuilder. - pub const fn new(client: WeakClient, data: &'req N::TransactionRequest) -> Self { - Self { client, data, overrides: None, block: None } + fn from(call: RpcCall) -> Self { + Self::RpcCall(call) } } -impl<'req, 'state, T, N> EthCall<'req, 'state, T, N> +impl From> + for ProviderCall Resp> where - T: Transport + Clone, - N: Network, + Conn: Transport + Clone, + Params: RpcParam, { - /// Set the state overrides for this call. - pub const fn overrides(mut self, overrides: &'state StateOverride) -> Self { - self.overrides = Some(overrides); - self + fn from(waiter: Waiter) -> Self { + Self::Waiter(waiter) } +} - /// Set the block to use for this call. - pub const fn block(mut self, block: BlockId) -> Self { - self.block = Some(block); - self +impl + From> + Send>>> + for ProviderCall +where + Conn: Transport + Clone, + Params: RpcParam, + Map: FnOnce(Resp) -> Output, +{ + fn from(fut: Pin> + Send>>) -> Self { + Self::BoxedFuture(fut) } } -impl<'req, 'state, T, N> std::future::IntoFuture for EthCall<'req, 'state, T, N> +impl From>>> + for ProviderCall where - T: Transport + Clone, - N: Network, + Conn: Transport + Clone, + Params: RpcParam, { - type Output = TransportResult; - - type IntoFuture = EthCallFut<'req, 'state, T, N>; + fn from(rx: oneshot::Receiver>>) -> Self { + Waiter::from(rx).into() + } +} - fn into_future(self) -> Self::IntoFuture { - EthCallFut(EthCallFutInner::Preparing { - client: self.client, - data: self.data, - overrides: self.overrides, - block: self.block, - }) +impl Future for ProviderCall +where + Conn: Transport + Clone, + Params: RpcParam, + Resp: RpcReturn, + Output: 'static, + Map: FnOnce(Resp) -> Output, +{ + type Output = TransportResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + match self.as_mut().project() { + ProviderCallProj::RpcCall(call) => call.poll_unpin(cx), + ProviderCallProj::Waiter(waiter) => waiter.poll_unpin(cx), + ProviderCallProj::BoxedFuture(fut) => fut.poll_unpin(cx), + ProviderCallProj::Ready(output) => { + Poll::Ready(Ok(output.take().expect("output taken twice"))) + } + } } } diff --git a/crates/provider/src/provider/eth_call.rs b/crates/provider/src/provider/eth_call.rs new file mode 100644 index 000000000000..134b4fb54550 --- /dev/null +++ b/crates/provider/src/provider/eth_call.rs @@ -0,0 +1,185 @@ +use alloy_eips::BlockId; +use alloy_network::Network; +use alloy_primitives::Bytes; +use alloy_rpc_client::{RpcCall, WeakClient}; +use alloy_rpc_types::state::StateOverride; +use alloy_transport::{Transport, TransportErrorKind, TransportResult}; +use futures::FutureExt; +use serde::ser::SerializeSeq; +use std::{future::Future, task::Poll}; + +type RunningFut<'req, 'state, T, N> = RpcCall, Bytes>; + +#[derive(Clone, Debug)] +struct EthCallParams<'req, 'state, N: Network> { + data: &'req N::TransactionRequest, + block: BlockId, + overrides: Option<&'state StateOverride>, +} + +impl serde::Serialize for EthCallParams<'_, '_, N> { + fn serialize(&self, serializer: S) -> Result { + let len = if self.overrides.is_some() { 3 } else { 2 }; + let mut seq = serializer.serialize_seq(Some(len))?; + seq.serialize_element(&self.data)?; + seq.serialize_element(&self.block)?; + if let Some(overrides) = self.overrides { + seq.serialize_element(overrides)?; + } + seq.end() + } +} + +/// The [`EthCallFut`] future is the future type for an `eth_call` RPC request. +#[derive(Clone, Debug)] +#[doc(hidden)] // Not public API. +pub struct EthCallFut<'req, 'state, T, N>(EthCallFutInner<'req, 'state, T, N>) +where + T: Transport + Clone, + N: Network; + +#[derive(Clone, Debug)] +enum EthCallFutInner<'req, 'state, T, N: Network> +where + T: Transport + Clone, + N: Network, +{ + Preparing { + client: WeakClient, + data: &'req N::TransactionRequest, + overrides: Option<&'state StateOverride>, + block: Option, + }, + Running(RunningFut<'req, 'state, T, N>), + Polling, +} + +impl<'req, 'state, T, N> EthCallFutInner<'req, 'state, T, N> +where + T: Transport + Clone, + N: Network, +{ + /// Returns `true` if the future is in the preparing state. + const fn is_preparing(&self) -> bool { + matches!(self, Self::Preparing { .. }) + } + + /// Returns `true` if the future is in the running state. + const fn is_running(&self) -> bool { + matches!(self, Self::Running(..)) + } + + fn poll_preparing(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + let Self::Preparing { client, data, overrides, block } = + std::mem::replace(self, Self::Polling) + else { + unreachable!("bad state") + }; + + let client = match client.upgrade().ok_or_else(TransportErrorKind::backend_gone) { + Ok(client) => client, + Err(e) => return Poll::Ready(Err(e)), + }; + + let params = EthCallParams { data, block: block.unwrap_or_default(), overrides }; + + let fut = client.request("eth_call", params); + + *self = Self::Running(fut); + self.poll_running(cx) + } + + fn poll_running(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + let Self::Running(ref mut call) = self else { unreachable!("bad state") }; + + call.poll_unpin(cx) + } +} + +impl<'req, 'state, T, N> Future for EthCallFut<'req, 'state, T, N> +where + T: Transport + Clone, + N: Network, +{ + type Output = TransportResult; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = &mut self.get_mut().0; + if this.is_preparing() { + this.poll_preparing(cx) + } else if this.is_running() { + this.poll_running(cx) + } else { + panic!("unexpected state") + } + } +} + +/// A builder for an `"eth_call"` request. This type is returned by the +/// [`Provider::call`] method. +/// +/// [`Provider::call`]: crate::Provider::call +#[must_use = "EthCall must be awaited to execute the call"] +#[derive(Debug, Clone)] +pub struct EthCall<'req, 'state, T, N> +where + T: Transport + Clone, + N: Network, +{ + client: WeakClient, + + data: &'req N::TransactionRequest, + overrides: Option<&'state StateOverride>, + block: Option, +} + +impl<'req, T, N> EthCall<'req, 'static, T, N> +where + T: Transport + Clone, + N: Network, +{ + /// Create a new CallBuilder. + pub const fn new(client: WeakClient, data: &'req N::TransactionRequest) -> Self { + Self { client, data, overrides: None, block: None } + } +} + +impl<'req, 'state, T, N> EthCall<'req, 'state, T, N> +where + T: Transport + Clone, + N: Network, +{ + /// Set the state overrides for this call. + pub const fn overrides(mut self, overrides: &'state StateOverride) -> Self { + self.overrides = Some(overrides); + self + } + + /// Set the block to use for this call. + pub const fn block(mut self, block: BlockId) -> Self { + self.block = Some(block); + self + } +} + +impl<'req, 'state, T, N> std::future::IntoFuture for EthCall<'req, 'state, T, N> +where + T: Transport + Clone, + N: Network, +{ + type Output = TransportResult; + + type IntoFuture = EthCallFut<'req, 'state, T, N>; + + fn into_future(self) -> Self::IntoFuture { + EthCallFut(EthCallFutInner::Preparing { + client: self.client, + data: self.data, + overrides: self.overrides, + block: self.block, + }) + } +} diff --git a/crates/provider/src/provider/mod.rs b/crates/provider/src/provider/mod.rs index 1d431f829122..b54652ad885f 100644 --- a/crates/provider/src/provider/mod.rs +++ b/crates/provider/src/provider/mod.rs @@ -1,5 +1,8 @@ mod call; -pub use call::EthCall; +pub use call::ProviderCall; + +mod eth_call; +pub use eth_call::EthCall; mod root; pub use root::RootProvider; diff --git a/crates/rpc-client/src/batch.rs b/crates/rpc-client/src/batch.rs index ab45295405e3..4bba23416969 100644 --- a/crates/rpc-client/src/batch.rs +++ b/crates/rpc-client/src/batch.rs @@ -4,7 +4,8 @@ use alloy_json_rpc::{ RpcReturn, SerializedRequest, }; use alloy_transport::{Transport, TransportError, TransportErrorKind, TransportResult}; -use futures::channel::oneshot; +use futures::FutureExt; +use pin_project::pin_project; use serde_json::value::RawValue; use std::{ borrow::Cow, @@ -12,8 +13,12 @@ use std::{ future::{Future, IntoFuture}, marker::PhantomData, pin::Pin, - task::{self, ready, Poll}, + task::{ + self, ready, + Poll::{self, Ready}, + }, }; +use tokio::sync::oneshot; pub(crate) type Channel = oneshot::Sender>>; pub(crate) type ChannelMap = HashMap; @@ -35,29 +40,48 @@ pub struct BatchRequest<'a, T> { /// Awaits a single response for a request that has been included in a batch. #[must_use = "A Waiter does nothing unless the corresponding BatchRequest is sent via `send_batch` and `.await`, AND the Waiter is awaited."] +#[pin_project] #[derive(Debug)] -pub struct Waiter { +pub struct Waiter Output> { + #[pin] rx: oneshot::Receiver>>, - _resp: PhantomData Resp>, + map: Option, + _resp: PhantomData (Output, Resp)>, +} + +impl Waiter { + /// Maps the response to a new type. + pub fn map_resp(self, map: NewMap) -> Waiter + where + NewMap: FnOnce(Resp) -> NewOutput, + { + Waiter { rx: self.rx, map: Some(map), _resp: PhantomData } + } } impl From>>> for Waiter { fn from(rx: oneshot::Receiver>>) -> Self { - Self { rx, _resp: PhantomData } + Self { rx, map: Some(std::convert::identity), _resp: PhantomData } } } -impl std::future::Future for Waiter +impl std::future::Future for Waiter where Resp: RpcReturn, + Map: FnOnce(Resp) -> Output, { - type Output = TransportResult; + type Output = TransportResult; - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - Pin::new(&mut self.rx).poll(cx).map(|resp| match resp { - Ok(resp) => try_deserialize_ok(resp), - Err(e) => Err(TransportErrorKind::custom(e)), - }) + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let this = self.get_mut(); + + match ready!(this.rx.poll_unpin(cx)) { + Ok(resp) => { + let resp: Result = try_deserialize_ok(resp); + Ready(resp.map(this.map.take().expect("polled after completion"))) + } + Err(e) => Poll::Ready(Err(TransportErrorKind::custom(e))), + } } } diff --git a/crates/rpc-client/src/call.rs b/crates/rpc-client/src/call.rs index a1efca47f50a..05d830eb7abc 100644 --- a/crates/rpc-client/src/call.rs +++ b/crates/rpc-client/src/call.rs @@ -4,13 +4,14 @@ use alloy_json_rpc::{ }; use alloy_transport::{RpcFut, Transport, TransportError, TransportResult}; use core::panic; +use futures::FutureExt; use serde_json::value::RawValue; use std::{ fmt, future::Future, marker::PhantomData, pin::Pin, - task::{self, Poll::Ready}, + task::{self, ready, Poll::Ready}, }; use tower::Service; @@ -139,11 +140,11 @@ pub struct RpcCall Output> where Conn: Transport + Clone, Params: RpcParam, - Map: Fn(Resp) -> Output, + Map: FnOnce(Resp) -> Output, { #[pin] state: CallState, - map: Map, + map: Option, _pd: core::marker::PhantomData (Resp, Output)>, } @@ -151,7 +152,7 @@ impl core::fmt::Debug for RpcCall Output, + Map: FnOnce(Resp) -> Output, { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("RpcCall").field("state", &self.state).finish() @@ -167,7 +168,7 @@ where pub fn new(req: Request, connection: Conn) -> Self { Self { state: CallState::Prepared { request: Some(req), connection }, - map: std::convert::identity, + map: Some(std::convert::identity), _pd: PhantomData, } } @@ -177,7 +178,7 @@ impl RpcCall where Conn: Transport + Clone, Params: RpcParam, - Map: Fn(Resp) -> Output, + Map: FnOnce(Resp) -> Output, { /// Set a function to map the response into a different type. pub fn map_resp( @@ -185,9 +186,9 @@ where map: NewMap, ) -> RpcCall where - NewMap: Fn(Resp) -> NewOutput, + NewMap: FnOnce(Resp) -> NewOutput, { - RpcCall { state: self.state, map, _pd: PhantomData } + RpcCall { state: self.state, map: Some(map), _pd: PhantomData } } /// Returns `true` if the request is a subscription. @@ -254,15 +255,16 @@ where impl RpcCall where Conn: Transport + Clone, - Params: RpcParam + Clone, - Map: Fn(Resp) -> Output, + Params: RpcParam + ToOwned, + Params::Owned: RpcParam, + Map: FnOnce(Resp) -> Output, { /// Convert this call into one with owned params, by cloning the params. /// /// # Panics /// - /// Panics if called after the request has been sent. - pub fn into_owned_params(self) -> RpcCall { + /// Panics if called after the request has been polled. + pub fn into_owned_params(self) -> RpcCall { let CallState::Prepared { request, connection } = self.state else { panic!("Cannot get params after request has been sent"); }; @@ -282,7 +284,7 @@ where Params: RpcParam + 'a, Resp: RpcReturn, Output: 'static, - Map: Fn(Resp) -> Output + Send + 'a, + Map: FnOnce(Resp) -> Output + Send + 'a, { /// Convert this future into a boxed, pinned future, erasing its type. pub fn boxed(self) -> RpcFut<'a, Output> { @@ -296,13 +298,16 @@ where Params: RpcParam, Resp: RpcReturn, Output: 'static, - Map: Fn(Resp) -> Output, + Map: FnOnce(Resp) -> Output, { type Output = TransportResult; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { trace!(?self.state, "polling RpcCall"); - let this = self.project(); - this.state.poll(cx).map(try_deserialize_ok).map(|r| r.map(this.map)) + + let this = self.get_mut(); + let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx))); + + Ready(resp.map(this.map.take().expect("polled after completion"))) } }