Skip to content

Commit

Permalink
Remove Unpin requirement from Transport (#399)
Browse files Browse the repository at this point in the history
* Convert eth_subscribe to async

Here I was unable to convert SubscriptionStream to `impl Stream` because
it has additional methods and runs code on drop.
To get rid of the Unpin requirement anyway we use pin_project
(https://docs.rs/pin-project/1.0.1/pin_project/).

* Remove Unpin requirement from Transport

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
  • Loading branch information
e00E and tomusdrw authored Nov 2, 2020
1 parent 3595756 commit c8a1aad
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 88 deletions.
15 changes: 9 additions & 6 deletions src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Web3 helpers.

use std::marker::PhantomData;
use std::marker::Unpin;
use std::pin::Pin;

use crate::{error, rpc};
use futures::{
task::{Context, Poll},
Future, FutureExt,
Future,
};
use pin_project::pin_project;

/// Takes any type which is deserializable from rpc::Value and such a value and
/// yields the deserialized value
Expand All @@ -17,8 +17,10 @@ pub fn decode<T: serde::de::DeserializeOwned>(value: rpc::Value) -> error::Resul
}

/// Calls decode on the result of the wrapped future.
#[pin_project]
#[derive(Debug)]
pub struct CallFuture<T, F> {
#[pin]
inner: F,
_marker: PhantomData<T>,
}
Expand All @@ -35,13 +37,14 @@ impl<T, F> CallFuture<T, F> {

impl<T, F> Future for CallFuture<T, F>
where
T: serde::de::DeserializeOwned + Unpin,
F: Future<Output = error::Result<rpc::Value>> + Unpin,
T: serde::de::DeserializeOwned,
F: Future<Output = error::Result<rpc::Value>>,
{
type Output = error::Result<T>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let x = ready!(self.inner.poll_unpin(ctx));
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let x = ready!(this.inner.poll(ctx));
Poll::Ready(x.and_then(decode))
}
}
Expand Down
7 changes: 2 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ pub type RequestId = usize;
// TODO [ToDr] The transport most likely don't need to be thread-safe.
// (though it has to be Send)
/// Transport implementation
pub trait Transport: std::fmt::Debug + Clone + Unpin {
pub trait Transport: std::fmt::Debug + Clone {
/// The type of future this transport returns when a call is made.
type Out: futures::Future<Output = error::Result<rpc::Value>> + Unpin;
type Out: futures::Future<Output = error::Result<rpc::Value>>;

/// Prepare serializable RPC call for given method with parameters.
fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call);
Expand Down Expand Up @@ -86,7 +86,6 @@ where
X: std::ops::Deref<Target = T>,
X: std::fmt::Debug,
X: Clone,
X: Unpin,
{
type Out = T::Out;

Expand All @@ -105,7 +104,6 @@ where
X: std::ops::Deref<Target = T>,
X: std::fmt::Debug,
X: Clone,
X: Unpin,
{
type Batch = T::Batch;

Expand All @@ -123,7 +121,6 @@ where
X: std::ops::Deref<Target = T>,
X: std::fmt::Debug,
X: Clone,
X: Unpin,
{
type NotificationStream = T::NotificationStream;

Expand Down
4 changes: 2 additions & 2 deletions src/signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl std::error::Error for RecoveryError {}
///
/// If it's enough to pass a reference to `SecretKey` (lifetimes) than you can use `SecretKeyRef`
/// wrapper.
pub trait Key: std::marker::Unpin {
pub trait Key {
/// Sign given message and include chain-id replay protection.
///
/// When a chain ID is provided, the `Signature`'s V-value will have chain relay
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<'a> Deref for SecretKeyRef<'a> {
}
}

impl<T: Deref<Target = SecretKey> + std::marker::Unpin> Key for T {
impl<T: Deref<Target = SecretKey>> Key for T {
fn sign(&self, message: &[u8], chain_id: Option<u64>) -> Result<Signature, SigningError> {
let message = Message::from_slice(&message).map_err(|_| SigningError::InvalidMessage)?;
let (recovery_id, signature) = Secp256k1::signing_only()
Expand Down
71 changes: 15 additions & 56 deletions src/transports/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,27 @@ where
}

/// Sends all requests as a batch.
pub fn submit_batch(&self) -> BatchFuture<T::Batch> {
pub fn submit_batch(&self) -> impl Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> {
let batch = mem::replace(&mut *self.batch.lock(), vec![]);
let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();

let batch = self.transport.send_batch(batch);
let pending = self.pending.clone();

BatchFuture {
state: BatchState::SendingBatch(batch, ids),
pending,
async move {
let res = batch.await;
let mut pending = pending.lock();
for (idx, request_id) in ids.into_iter().enumerate() {
if let Some(rx) = pending.remove(&request_id) {
// Ignore sending error
let _ = match res {
Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
Err(ref err) => rx.send(Err(err.clone())),
_ => rx.send(Err(Error::Internal)),
};
}
}
res
}
}
}
Expand All @@ -72,58 +83,6 @@ where
}
}

enum BatchState<T> {
SendingBatch(T, Vec<RequestId>),
Done,
}

/// A result of submitting a batch request.
/// Returns the results of all requests within the batch.
pub struct BatchFuture<T> {
state: BatchState<T>,
pending: PendingRequests,
}

impl<T> Future for BatchFuture<T>
where
T: Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> + Unpin,
{
type Output = error::Result<Vec<error::Result<rpc::Value>>>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
loop {
match mem::replace(&mut self.state, BatchState::Done) {
BatchState::SendingBatch(mut batch, ids) => {
let res = match batch.poll_unpin(ctx) {
Poll::Pending => {
self.state = BatchState::SendingBatch(batch, ids);
return Poll::Pending;
}
Poll::Ready(v) => v,
};

let mut pending = self.pending.lock();
for (idx, request_id) in ids.into_iter().enumerate() {
if let Some(rx) = pending.remove(&request_id) {
// Ignore sending error
let _ = match res {
Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
Err(ref err) => rx.send(Err(err.clone())),
_ => rx.send(Err(Error::Internal)),
};
}
}

return Poll::Ready(res);
}
BatchState::Done => {
panic!("Poll after Ready.");
}
};
}
}
}

/// Result of calling a single method that will be part of the batch.
/// Converts `oneshot::Receiver` error into `Error::Internal`
pub struct SingleResult(oneshot::Receiver<error::Result<rpc::Value>>);
Expand Down
42 changes: 23 additions & 19 deletions src/transports/either.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! A strongly-typed transport alternative.

use crate::{api, error, rpc, BatchTransport, DuplexTransport, RequestId, Transport};
use futures::{
future::{BoxFuture, FutureExt},
stream::{BoxStream, StreamExt},
};

/// A wrapper over two possible transports.
///
Expand All @@ -20,10 +24,10 @@ impl<A, B, AOut, BOut> Transport for Either<A, B>
where
A: Transport<Out = AOut>,
B: Transport<Out = BOut>,
AOut: futures::Future<Output = error::Result<rpc::Value>> + Unpin + 'static,
BOut: futures::Future<Output = error::Result<rpc::Value>> + Unpin + 'static,
AOut: futures::Future<Output = error::Result<rpc::Value>> + 'static + Send,
BOut: futures::Future<Output = error::Result<rpc::Value>> + 'static + Send,
{
type Out = Box<dyn futures::Future<Output = error::Result<rpc::Value>> + Unpin>;
type Out = BoxFuture<'static, error::Result<rpc::Value>>;

fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
match *self {
Expand All @@ -34,8 +38,8 @@ where

fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
match *self {
Self::Left(ref a) => Box::new(a.send(id, request)),
Self::Right(ref b) => Box::new(b.send(id, request)),
Self::Left(ref a) => a.send(id, request).boxed(),
Self::Right(ref b) => b.send(id, request).boxed(),
}
}
}
Expand All @@ -44,20 +48,20 @@ impl<A, B, ABatch, BBatch> BatchTransport for Either<A, B>
where
A: BatchTransport<Batch = ABatch>,
B: BatchTransport<Batch = BBatch>,
A::Out: Unpin + 'static,
B::Out: Unpin + 'static,
ABatch: futures::Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> + Unpin + 'static,
BBatch: futures::Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> + Unpin + 'static,
A::Out: 'static + Send,
B::Out: 'static + Send,
ABatch: futures::Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> + 'static + Send,
BBatch: futures::Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> + 'static + Send,
{
type Batch = Box<dyn futures::Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> + Unpin>;
type Batch = BoxFuture<'static, error::Result<Vec<error::Result<rpc::Value>>>>;

fn send_batch<T>(&self, requests: T) -> Self::Batch
where
T: IntoIterator<Item = (RequestId, rpc::Call)>,
{
match *self {
Self::Left(ref a) => Box::new(a.send_batch(requests)),
Self::Right(ref b) => Box::new(b.send_batch(requests)),
Self::Left(ref a) => a.send_batch(requests).boxed(),
Self::Right(ref b) => b.send_batch(requests).boxed(),
}
}
}
Expand All @@ -66,17 +70,17 @@ impl<A, B, AStream, BStream> DuplexTransport for Either<A, B>
where
A: DuplexTransport<NotificationStream = AStream>,
B: DuplexTransport<NotificationStream = BStream>,
A::Out: Unpin + 'static,
B::Out: Unpin + 'static,
AStream: futures::Stream<Item = rpc::Value> + Unpin + 'static,
BStream: futures::Stream<Item = rpc::Value> + Unpin + 'static,
A::Out: 'static + Send,
B::Out: 'static + Send,
AStream: futures::Stream<Item = rpc::Value> + 'static + Send,
BStream: futures::Stream<Item = rpc::Value> + 'static + Send,
{
type NotificationStream = Box<dyn futures::Stream<Item = rpc::Value> + Unpin>;
type NotificationStream = BoxStream<'static, rpc::Value>;

fn subscribe(&self, id: api::SubscriptionId) -> error::Result<Self::NotificationStream> {
Ok(match *self {
Self::Left(ref a) => Box::new(a.subscribe(id)?),
Self::Right(ref b) => Box::new(b.subscribe(id)?),
Self::Left(ref a) => a.subscribe(id)?.boxed(),
Self::Right(ref b) => b.subscribe(id)?.boxed(),
})
}

Expand Down

0 comments on commit c8a1aad

Please sign in to comment.