Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

feat: implement IPC transport support #260

Merged
merged 16 commits into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion ethers-providers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ serde_json = { version = "1.0.64", default-features = false }
thiserror = { version = "1.0.24", default-features = false }
url = { version = "2.2.1", default-features = false }
auto_impl = { version = "0.4.1", default-features = false }
ahash = { version = "0.7", default-features = false, features = ["std"], optional = true }
Copy link

@tarcieri tarcieri Apr 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really need higher performance than SipHasher (I would suggest measuring), and don't care about hashDoS (the ahash crate is not designed to be hashDoS resistant, despite what the author might claim), I would suggest using the twox-hash crate which implements the XXHash algorithm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm aware of this, but figured this usage wouldn't require that property, but maybe I'm overlooking some concern. Thanks for the link, I'll look into that, but I'll just change to whatever @gakonst prefers since I don't have an opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched it to siphash as a default, but happy to switch to whatever is preferred :)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I think using what the standard library provides is a reasonable starting point, and if you need better performance, it would be good to measure whether a 3rd-party hashing crate actually provides an improvement.


# required for implementing stream on the filters
futures-core = { version = "0.3.12", default-features = false }
Expand All @@ -49,4 +50,4 @@ tokio = { version = "1.4", default-features = false, features = ["rt", "macros"]
default = ["ws", "ipc"]
celo = ["ethers-core/celo"]
ws = ["tokio", "tokio-tungstenite"]
ipc = ["tokio", "tokio/io-util", "tokio-util"]
ipc = ["tokio", "tokio/io-util", "tokio-util", "ahash"]
68 changes: 38 additions & 30 deletions ethers-providers/src/transports/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
// use crate::{helpers, BatchTransport, DuplexTransport, Error, U256, Result, Transport};
use crate::{
provider::ProviderError,
transports::common::{JsonRpcError, Notification, Request, Response},
JsonRpcClient, PubsubClient,
};
use ethers_core::types::U256;

use ahash::AHashMap;
use async_trait::async_trait;
use futures_util::stream::StreamExt;
use futures_channel::mpsc;
use futures_util::stream::StreamExt;
use oneshot::error::RecvError;
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::BTreeMap,
path::Path,
sync::{atomic::AtomicU64, Arc},
};
Expand Down Expand Up @@ -69,21 +68,21 @@ impl JsonRpcClient for Ipc {
) -> Result<R> {
let next_id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

// send the message
// Create the request and initialize the response channel
let (sender, receiver) = oneshot::channel();
let payload = TransportMessage::Request {
id: next_id,
request: serde_json::to_string(&Request::new(next_id, method, params))?,
sender,
};

// send the data
// Send the request to the IPC server to be handled.
self.send(payload)?;

// wait for the response
// Wait for the response from the IPC server.
let res = receiver.await?;

// parse it
// Parse JSON response.
Ok(serde_json::from_value(res)?)
}
}
Expand Down Expand Up @@ -127,65 +126,74 @@ async fn run_server(
messages_rx: mpsc::UnboundedReceiver<TransportMessage>,
) -> Result<()> {
let (socket_reader, mut socket_writer) = unix_stream.into_split();
let mut pending_response_txs = BTreeMap::default();
let mut subscription_txs = BTreeMap::default();
let mut pending_response_txs = AHashMap::default();
let mut subscription_txs = AHashMap::default();

let mut socket_reader = ReaderStream::new(socket_reader);
let mut messages_rx = messages_rx.fuse();
let mut read_buffer = vec![];
let mut closed = false;

while !closed || !pending_response_txs.is_empty() {
while !pending_response_txs.is_empty() {
tokio::select! {
message = messages_rx.next() => match message {
None => closed = true,
Some(TransportMessage::Subscribe{ id, sink }) => {
if subscription_txs.insert(id, sink).is_some() {
// log::warn!("Replacing a subscription with id {:?}", id);
println!("Replacing a subscription with id {:?}", id);
}
},
Some(TransportMessage::Unsubscribe{ id }) => {
if subscription_txs.remove(&id).is_none() {
// log::warn!("Unsubscribing not subscribed id {:?}", id);
if subscription_txs.remove(&id).is_none() {
println!("Unsubscribing not subscribed id {:?}", id);
}
},
Some(TransportMessage::Request{ id, request, sender }) => {
if pending_response_txs.insert(id, sender).is_some() {
// log::warn!("Replacing a pending request with id {:?}", id);
println!("Replacing a pending request with id {:?}", id);
}

if socket_writer.write(&request.as_bytes()).await.is_err() {
if let Err(err) = socket_writer.write(&request.as_bytes()).await {
pending_response_txs.remove(&id);
// log::error!("IPC write error: {:?}", err);
println!("IPC write error: {:?}", err);
}
}
},
None => break,
},
bytes = socket_reader.next() => match bytes {
Some(Ok(bytes)) => {
// Extend buffer of previously unread with the new read bytes
read_buffer.extend_from_slice(&bytes);

let read_len = {
// Deserialize as many full elements from the stream as exists
let mut de: serde_json::StreamDeserializer<_, serde_json::Value> =
serde_json::Deserializer::from_slice(&read_buffer).into_iter();

// Iterate through these elements, and handle responses/notifications
while let Some(Ok(value)) = de.next() {
if let Ok(notification) = serde_json::from_value::<Notification<serde_json::Value>>(value.clone()) {
let _ = notify(&mut subscription_txs, notification);
// Send notify response if okay.
if let Err(e) = notify(&mut subscription_txs, notification) {
println!("Failed to send IPC notification: {}", e)
}
} else if let Ok(response) = serde_json::from_value::<Response<serde_json::Value>>(value) {
let _ = respond(&mut pending_response_txs, response);
if let Err(e) = respond(&mut pending_response_txs, response) {
println!("Failed to send IPC response: {}", e)
}
}

// log::warn!("JSON is not a response or notification");
println!("JSON is not a response or notification");
}

// Get the offset of bytes to handle partial buffer reads
de.byte_offset()
};

// Reset buffer to just include the partial value bytes.
read_buffer.copy_within(read_len.., 0);
read_buffer.truncate(read_buffer.len() - read_len);
},
Some(Err(err)) => {
// log::error!("IPC read error: {:?}", err);
println!("IPC read error: {:?}", err);
return Err(err.into());
},
None => break,
Expand All @@ -196,8 +204,10 @@ async fn run_server(
Ok(())
}

/// Sends notification through the channel based on the ID of the subscription.
/// This handles streaming responses.
fn notify(
subscription_txs: &mut BTreeMap<U256, mpsc::UnboundedSender<serde_json::Value>>,
subscription_txs: &mut AHashMap<U256, mpsc::UnboundedSender<serde_json::Value>>,
notification: Notification<serde_json::Value>,
) -> std::result::Result<(), IpcError> {
let id = notification.params.subscription;
Expand All @@ -209,13 +219,15 @@ fn notify(
Ok(())
}

/// Sends JSON response through the channel based on the ID in that response.
/// This handles RPC calls with only one response, and the channel entry is dropped after sending.
fn respond(
pending_response_txs: &mut BTreeMap<u64, oneshot::Sender<serde_json::Value>>,
pending_response_txs: &mut AHashMap<u64, oneshot::Sender<serde_json::Value>>,
output: Response<serde_json::Value>,
) -> std::result::Result<(), IpcError> {
let id = output.id;

// Assuming results are always okay,
// Converts output into result, to send data if valid response.
let value = output.data.into_result()?;

let response_tx = pending_response_txs.remove(&id).ok_or_else(|| {
Expand Down Expand Up @@ -244,10 +256,6 @@ pub enum IpcError {
/// Thrown if the response could not be parsed
JsonRpcError(#[from] JsonRpcError),

/// Thrown if the websocket didn't respond to our message
#[error("IPC connection did not respond with text data")]
NoResponse,

#[error("{0}")]
ChannelError(String),

Expand Down