Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert eth_subscribe to async #398

Merged
merged 1 commit into from
Nov 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ secp256k1 = { version = "0.19", features = ["recovery"] }
serde = { version = "1.0.90", features = ["derive"] }
serde_json = "1.0.39"
tiny-keccak = { version = "2.0.1", features = ["keccak"] }
pin-project = "1.0"
# Optional deps
## HTTP
base64 = { version = "0.13", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() -> web3::Result {
})
.await;

sub.unsubscribe();
sub.unsubscribe().await?;

Ok(())
}
87 changes: 35 additions & 52 deletions src/api/eth_subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//! `Eth` namespace, subscriptions

use crate::api::Namespace;
use crate::helpers::{self, CallFuture};
use crate::helpers;
use crate::types::{BlockHeader, Filter, Log, SyncState, H256};
use crate::{error, DuplexTransport};
use futures::{
task::{Context, Poll},
Future, FutureExt, Stream, StreamExt,
Stream,
};
use pin_project::{pin_project, pinned_drop};
use std::marker::PhantomData;
use std::pin::Pin;

Expand Down Expand Up @@ -43,10 +44,12 @@ impl From<String> for SubscriptionId {
/// Stream of notifications from a subscription
/// Given a type deserializable from rpc::Value and a subscription id, yields items of that type as
/// notifications are delivered.
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct SubscriptionStream<T: DuplexTransport, I> {
transport: T,
id: SubscriptionId,
#[pin]
rx: T::NotificationStream,
_marker: PhantomData<I>,
}
Expand All @@ -68,92 +71,72 @@ impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
}

/// Unsubscribe from the event represented by this stream
pub fn unsubscribe(self) -> CallFuture<bool, T::Out> {
pub async fn unsubscribe(self) -> error::Result<bool> {
let &SubscriptionId(ref id) = &self.id;
let id = helpers::serialize(&id);
CallFuture::new(self.transport.execute("eth_unsubscribe", vec![id]))
let response = self.transport.execute("eth_unsubscribe", vec![id]).await?;
helpers::decode(response)
}
}

impl<T, I> Stream for SubscriptionStream<T, I>
where
T: DuplexTransport,
T::NotificationStream: Unpin,
I: serde::de::DeserializeOwned + Unpin,
I: serde::de::DeserializeOwned,
{
type Item = error::Result<I>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let x = ready!(self.rx.poll_next_unpin(ctx));
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let x = ready!(this.rx.poll_next(ctx));
Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
}
}

impl<T: DuplexTransport, I> Drop for SubscriptionStream<T, I> {
fn drop(&mut self) {
let _ = self.transport.unsubscribe(self.id().clone());
}
}

/// A result of calling a subscription.
#[derive(Debug)]
pub struct SubscriptionResult<T: DuplexTransport, I> {
transport: T,
inner: CallFuture<String, T::Out>,
_marker: PhantomData<I>,
}

impl<T: DuplexTransport, I> SubscriptionResult<T, I> {
/// New `SubscriptionResult`.
pub fn new(transport: T, id_future: CallFuture<String, T::Out>) -> Self {
SubscriptionResult {
transport,
inner: id_future,
_marker: PhantomData,
}
}
}

impl<T, I> Future for SubscriptionResult<T, I>
#[pinned_drop]
impl<T, I> PinnedDrop for SubscriptionStream<T, I>
where
T: DuplexTransport,
I: serde::de::DeserializeOwned + Unpin,
{
type Output = error::Result<SubscriptionStream<T, I>>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let id = ready!(self.inner.poll_unpin(ctx))?;
Poll::Ready(SubscriptionStream::new(self.transport.clone(), SubscriptionId(id)))
fn drop(self: Pin<&mut Self>) {
let _ = self.transport.unsubscribe(self.id().clone());
}
}

impl<T: DuplexTransport> EthSubscribe<T> {
/// Create a new heads subscription
pub fn subscribe_new_heads(&self) -> SubscriptionResult<T, BlockHeader> {
pub async fn subscribe_new_heads(&self) -> error::Result<SubscriptionStream<T, BlockHeader>> {
let subscription = helpers::serialize(&&"newHeads");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a logs subscription
pub fn subscribe_logs(&self, filter: Filter) -> SubscriptionResult<T, Log> {
pub async fn subscribe_logs(&self, filter: Filter) -> error::Result<SubscriptionStream<T, Log>> {
let subscription = helpers::serialize(&&"logs");
let filter = helpers::serialize(&filter);
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription, filter]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self
.transport
.execute("eth_subscribe", vec![subscription, filter])
.await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a pending transactions subscription
pub fn subscribe_new_pending_transactions(&self) -> SubscriptionResult<T, H256> {
pub async fn subscribe_new_pending_transactions(&self) -> error::Result<SubscriptionStream<T, H256>> {
let subscription = helpers::serialize(&&"newPendingTransactions");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a sync status subscription
pub fn subscribe_syncing(&self) -> SubscriptionResult<T, SyncState> {
pub async fn subscribe_syncing(&self) -> error::Result<SubscriptionStream<T, SyncState>> {
let subscription = helpers::serialize(&&"syncing");
let id_future = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]));
SubscriptionResult::new(self.transport().clone(), id_future)
let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id: String = helpers::decode(response)?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}
}
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod web3;
pub use self::accounts::Accounts;
pub use self::eth::Eth;
pub use self::eth_filter::{BaseFilter, EthFilter};
pub use self::eth_subscribe::{EthSubscribe, SubscriptionId, SubscriptionResult, SubscriptionStream};
pub use self::eth_subscribe::{EthSubscribe, SubscriptionId, SubscriptionStream};
pub use self::net::Net;
pub use self::parity::Parity;
pub use self::parity_accounts::ParityAccounts;
Expand Down