Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Unpin requirement from Transport #399

Merged
merged 3 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ secp256k1 = { version = "0.19", features = ["recovery"] }
serde = { version = "1.0.90", features = ["derive"] }
serde_json = "1.0.39"
tiny-keccak = { version = "2.0.1", features = ["keccak"] }
pin-project = "1.0"
# Optional deps
## HTTP
base64 = { version = "0.13", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> web3::Result {
})
.await;

sub.unsubscribe();
sub.unsubscribe().await?;

Ok(())
}
87 changes: 35 additions & 52 deletions src/api/eth_subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//! `Eth` namespace, subscriptions

use crate::api::Namespace;
use crate::helpers::{self, CallFuture};
use crate::helpers;
use crate::types::{BlockHeader, Filter, Log, SyncState, H256};
use crate::{error, DuplexTransport};
use futures::{
task::{Context, Poll},
Future, FutureExt, Stream, StreamExt,
Stream,
};
use pin_project::{pin_project, pinned_drop};
use std::marker::PhantomData;
use std::pin::Pin;

Expand Down Expand Up @@ -43,10 +44,12 @@ impl From<String> for SubscriptionId {
/// Stream of notifications from a subscription
/// Given a type deserializable from rpc::Value and a subscription id, yields items of that type as
/// notifications are delivered.
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct SubscriptionStream<T: DuplexTransport, I> {
transport: T,
id: SubscriptionId,
#[pin]
rx: T::NotificationStream,
_marker: PhantomData<I>,
}
Expand All @@ -68,92 +71,72 @@ impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
}

/// Unsubscribe from the event represented by this stream
pub fn unsubscribe(self) -> CallFuture<bool, T::Out> {
pub async fn unsubscribe(self) -> error::Result<bool> {
let &SubscriptionId(ref id) = &self.id;
let id = helpers::serialize(&id);
CallFuture::new(self.transport.execute("eth_unsubscribe", vec![id]))
let response = self.transport.execute("eth_unsubscribe", vec![id]).await?;
helpers::decode(response)
}
}

impl<T, I> Stream for SubscriptionStream<T, I>
where
T: DuplexTransport,
T::NotificationStream: Unpin,
I: serde::de::DeserializeOwned + Unpin,
I: serde::de::DeserializeOwned,
{
type Item = error::Result<I>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let x = ready!(self.rx.poll_next_unpin(ctx));
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let x = ready!(this.rx.poll_next(ctx));
Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
}
}

impl<T: DuplexTransport, I> Drop for SubscriptionStream<T, I> {
fn drop(&mut self) {
let _ = self.transport.unsubscribe(self.id().clone());
}
}

/// A result of calling a subscription.
#[derive(Debug)]
pub struct SubscriptionResult<T: DuplexTransport, I> {
transport: T,
inner: CallFuture<String, T::Out>,
_marker: PhantomData<I>,
}

impl<T: DuplexTransport, I> SubscriptionResult<T, I> {
/// New `SubscriptionResult`.
pub fn new(transport: T, id_future: CallFuture<String, T::Out>) -> Self {
SubscriptionResult {
transport,
inner: id_future,
_marker: PhantomData,
}
}
}

impl<T, I> Future for SubscriptionResult<T, I>
#[pinned_drop]
impl<T, I> PinnedDrop for SubscriptionStream<T, I>
where
T: DuplexTransport,
I: serde::de::DeserializeOwned + Unpin,
{
type Output = error::Result<SubscriptionStream<T, I>>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let id = ready!(self.inner.poll_unpin(ctx))?;
Poll::Ready(SubscriptionStream::new(self.transport.clone(), SubscriptionId(id)))
fn drop(self: Pin<&mut Self>) {
let _ = self.transport.unsubscribe(self.id().clone());
}
}

impl<T: DuplexTransport> EthSubscribe<T> {
/// Create a new heads subscription
pub fn subscribe_new_heads(&self) -> SubscriptionResult<T, BlockHeader> {
pub async fn subscribe_new_heads(&self) -> error::Result<SubscriptionStream<T, BlockHeader>> {
let subscription = helpers::serialize(&&"newHeads");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a logs subscription
pub fn subscribe_logs(&self, filter: Filter) -> SubscriptionResult<T, Log> {
pub async fn subscribe_logs(&self, filter: Filter) -> error::Result<SubscriptionStream<T, Log>> {
let subscription = helpers::serialize(&&"logs");
let filter = helpers::serialize(&filter);
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription, filter]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self
.transport
.execute("eth_subscribe", vec![subscription, filter])
.await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a pending transactions subscription
pub fn subscribe_new_pending_transactions(&self) -> SubscriptionResult<T, H256> {
pub async fn subscribe_new_pending_transactions(&self) -> error::Result<SubscriptionStream<T, H256>> {
let subscription = helpers::serialize(&&"newPendingTransactions");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a sync status subscription
pub fn subscribe_syncing(&self) -> SubscriptionResult<T, SyncState> {
pub async fn subscribe_syncing(&self) -> error::Result<SubscriptionStream<T, SyncState>> {
let subscription = helpers::serialize(&&"syncing");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}
}
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod web3;
pub use self::accounts::Accounts;
pub use self::eth::Eth;
pub use self::eth_filter::{BaseFilter, EthFilter};
pub use self::eth_subscribe::{EthSubscribe, SubscriptionId, SubscriptionResult, SubscriptionStream};
pub use self::eth_subscribe::{EthSubscribe, SubscriptionId, SubscriptionStream};
pub use self::net::Net;
pub use self::parity::Parity;
pub use self::parity_accounts::ParityAccounts;
Expand Down
15 changes: 9 additions & 6 deletions src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Web3 helpers.

use std::marker::PhantomData;
use std::marker::Unpin;
use std::pin::Pin;

use crate::{error, rpc};
use futures::{
task::{Context, Poll},
Future, FutureExt,
Future,
};
use pin_project::pin_project;

/// Takes any type which is deserializable from rpc::Value and such a value and
/// yields the deserialized value
Expand All @@ -17,8 +17,10 @@ pub fn decode<T: serde::de::DeserializeOwned>(value: rpc::Value) -> error::Resul
}

/// Calls decode on the result of the wrapped future.
#[pin_project]
#[derive(Debug)]
pub struct CallFuture<T, F> {
#[pin]
inner: F,
_marker: PhantomData<T>,
}
Expand All @@ -35,13 +37,14 @@ impl<T, F> CallFuture<T, F> {

impl<T, F> Future for CallFuture<T, F>
where
T: serde::de::DeserializeOwned + Unpin,
F: Future<Output = error::Result<rpc::Value>> + Unpin,
T: serde::de::DeserializeOwned,
F: Future<Output = error::Result<rpc::Value>>,
{
type Output = error::Result<T>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let x = ready!(self.inner.poll_unpin(ctx));
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let x = ready!(this.inner.poll(ctx));
Poll::Ready(x.and_then(decode))
}
}
Expand Down
7 changes: 2 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ pub type RequestId = usize;
// TODO [ToDr] The transport most likely don't need to be thread-safe.
// (though it has to be Send)
/// Transport implementation
pub trait Transport: std::fmt::Debug + Clone + Unpin {
pub trait Transport: std::fmt::Debug + Clone {
/// The type of future this transport returns when a call is made.
type Out: futures::Future<Output = error::Result<rpc::Value>> + Unpin;
type Out: futures::Future<Output = error::Result<rpc::Value>>;

/// Prepare serializable RPC call for given method with parameters.
fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call);
Expand Down Expand Up @@ -86,7 +86,6 @@ where
X: std::ops::Deref<Target = T>,
X: std::fmt::Debug,
X: Clone,
X: Unpin,
{
type Out = T::Out;

Expand All @@ -105,7 +104,6 @@ where
X: std::ops::Deref<Target = T>,
X: std::fmt::Debug,
X: Clone,
X: Unpin,
{
type Batch = T::Batch;

Expand All @@ -123,7 +121,6 @@ where
X: std::ops::Deref<Target = T>,
X: std::fmt::Debug,
X: Clone,
X: Unpin,
{
type NotificationStream = T::NotificationStream;

Expand Down
4 changes: 2 additions & 2 deletions src/signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl std::error::Error for RecoveryError {}
///
/// If it's enough to pass a reference to `SecretKey` (lifetimes) than you can use `SecretKeyRef`
/// wrapper.
pub trait Key: std::marker::Unpin {
pub trait Key {
/// Sign given message and include chain-id replay protection.
///
/// When a chain ID is provided, the `Signature`'s V-value will have chain relay
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<'a> Deref for SecretKeyRef<'a> {
}
}

impl<T: Deref<Target = SecretKey> + std::marker::Unpin> Key for T {
impl<T: Deref<Target = SecretKey>> Key for T {
fn sign(&self, message: &[u8], chain_id: Option<u64>) -> Result<Signature, SigningError> {
let message = Message::from_slice(&message).map_err(|_| SigningError::InvalidMessage)?;
let (recovery_id, signature) = Secp256k1::signing_only()
Expand Down
71 changes: 15 additions & 56 deletions src/transports/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,27 @@ where
}

/// Sends all requests as a batch.
pub fn submit_batch(&self) -> BatchFuture<T::Batch> {
pub fn submit_batch(&self) -> impl Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> {
let batch = mem::replace(&mut *self.batch.lock(), vec![]);
let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();

let batch = self.transport.send_batch(batch);
let pending = self.pending.clone();

BatchFuture {
state: BatchState::SendingBatch(batch, ids),
pending,
async move {
let res = batch.await;
let mut pending = pending.lock();
for (idx, request_id) in ids.into_iter().enumerate() {
if let Some(rx) = pending.remove(&request_id) {
// Ignore sending error
let _ = match res {
Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
Err(ref err) => rx.send(Err(err.clone())),
_ => rx.send(Err(Error::Internal)),
};
}
}
res
}
}
}
Expand All @@ -72,58 +83,6 @@ where
}
}

enum BatchState<T> {
SendingBatch(T, Vec<RequestId>),
Done,
}

/// A result of submitting a batch request.
/// Returns the results of all requests within the batch.
pub struct BatchFuture<T> {
state: BatchState<T>,
pending: PendingRequests,
}

impl<T> Future for BatchFuture<T>
where
T: Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> + Unpin,
{
type Output = error::Result<Vec<error::Result<rpc::Value>>>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
loop {
match mem::replace(&mut self.state, BatchState::Done) {
BatchState::SendingBatch(mut batch, ids) => {
let res = match batch.poll_unpin(ctx) {
Poll::Pending => {
self.state = BatchState::SendingBatch(batch, ids);
return Poll::Pending;
}
Poll::Ready(v) => v,
};

let mut pending = self.pending.lock();
for (idx, request_id) in ids.into_iter().enumerate() {
if let Some(rx) = pending.remove(&request_id) {
// Ignore sending error
let _ = match res {
Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
Err(ref err) => rx.send(Err(err.clone())),
_ => rx.send(Err(Error::Internal)),
};
}
}

return Poll::Ready(res);
}
BatchState::Done => {
panic!("Poll after Ready.");
}
};
}
}
}

/// Result of calling a single method that will be part of the batch.
/// Converts `oneshot::Receiver` error into `Error::Internal`
pub struct SingleResult(oneshot::Receiver<error::Result<rpc::Value>>);
Expand Down
Loading