Skip to content

Commit

Permalink
Remove failure dependency. (#579)
Browse files Browse the repository at this point in the history
* Remove failure dependency.

* Remove needless format!.

* cargo fmt --all
  • Loading branch information
tomusdrw authored Aug 28, 2020
1 parent 8e7ddc9 commit b165fac
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 52 deletions.
2 changes: 1 addition & 1 deletion core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ipc = [
arbitrary_precision = ["serde_json/arbitrary_precision", "jsonrpc-core/arbitrary_precision"]

[dependencies]
failure = "0.1"
derive_more = "0.99"
futures = { version = "0.3", features = [ "compat" ] }
jsonrpc-core = { version = "15.0", path = "../../core" }
jsonrpc-pubsub = { version = "15.0", path = "../../pubsub" }
Expand Down
59 changes: 36 additions & 23 deletions core-client/transports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#![deny(missing_docs)]

use failure::{format_err, Fail};
use jsonrpc_core::futures::channel::{mpsc, oneshot};
use jsonrpc_core::futures::{
self,
Expand All @@ -22,20 +21,34 @@ pub mod transports;
mod logger;

/// The errors returned by the client.
#[derive(Debug, Fail)]
#[derive(Debug, derive_more::Display)]
pub enum RpcError {
/// An error returned by the server.
#[fail(display = "Server returned rpc error {}", _0)]
#[display(fmt = "Server returned rpc error {}", _0)]
JsonRpcError(Error),
/// Failure to parse server response.
#[fail(display = "Failed to parse server response as {}: {}", _0, _1)]
ParseError(String, failure::Error),
#[display(fmt = "Failed to parse server response as {}: {}", _0, _1)]
ParseError(String, Box<dyn std::error::Error + Send>),
/// Request timed out.
#[fail(display = "Request timed out")]
#[display(fmt = "Request timed out")]
Timeout,
/// A general client error.
#[display(fmt = "Client error: {}", _0)]
Client(String),
/// Not rpc specific errors.
#[fail(display = "{}", _0)]
Other(failure::Error),
#[display(fmt = "{}", _0)]
Other(Box<dyn std::error::Error + Send>),
}

impl std::error::Error for RpcError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match *self {
Self::JsonRpcError(ref e) => Some(e),
Self::ParseError(_, ref e) => Some(&**e),
Self::Other(ref e) => Some(&**e),
_ => None,
}
}
}

impl From<Error> for RpcError {
Expand Down Expand Up @@ -162,7 +175,7 @@ impl<T: DeserializeOwned + Unpin + 'static> Stream for TypedSubscriptionStream<T
match result {
Some(Ok(value)) => Some(
serde_json::from_value::<T>(value)
.map_err(|error| RpcError::ParseError(self.returns.into(), error.into())),
.map_err(|error| RpcError::ParseError(self.returns.into(), Box::new(error))),
),
None => None,
Some(Err(err)) => Some(Err(err.into())),
Expand Down Expand Up @@ -192,9 +205,9 @@ impl RawClient {
};
let result = self.0.send(msg.into());
async move {
let () = result.map_err(|e| RpcError::Other(e.into()))?;
let () = result.map_err(|e| RpcError::Other(Box::new(e)))?;

receiver.await.map_err(|e| RpcError::Other(e.into()))?
receiver.await.map_err(|e| RpcError::Other(Box::new(e)))?
}
}

Expand All @@ -206,7 +219,7 @@ impl RawClient {
};
match self.0.send(msg.into()) {
Ok(()) => Ok(()),
Err(error) => Err(RpcError::Other(error.into())),
Err(error) => Err(RpcError::Other(Box::new(error))),
}
}

Expand All @@ -232,7 +245,7 @@ impl RawClient {
self.0
.send(msg.into())
.map(|()| receiver)
.map_err(|e| RpcError::Other(e.into()))
.map_err(|e| RpcError::Other(Box::new(e)))
}
}

Expand Down Expand Up @@ -266,9 +279,9 @@ impl TypedClient {
Value::Array(vec) => Ok(Params::Array(vec)),
Value::Null => Ok(Params::None),
Value::Object(map) => Ok(Params::Map(map)),
_ => Err(RpcError::Other(format_err!(
"RPC params should serialize to a JSON array, JSON object or null"
))),
_ => Err(RpcError::Client(
"RPC params should serialize to a JSON array, JSON object or null".into(),
)),
};
let result = params.map(|params| self.0.call_method(method, params));

Expand All @@ -277,7 +290,7 @@ impl TypedClient {

log::debug!("response: {:?}", value);

serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns, error.into()))
serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns, Box::new(error)))
}
}

Expand All @@ -289,9 +302,9 @@ impl TypedClient {
Value::Array(vec) => Params::Array(vec),
Value::Null => Params::None,
_ => {
return Err(RpcError::Other(format_err!(
"RPC params should serialize to a JSON array, or null"
)))
return Err(RpcError::Client(
"RPC params should serialize to a JSON array, or null".into(),
))
}
};

Expand All @@ -314,9 +327,9 @@ impl TypedClient {
Value::Array(vec) => Params::Array(vec),
Value::Null => Params::None,
_ => {
return Err(RpcError::Other(format_err!(
"RPC params should serialize to a JSON array, or null"
)))
return Err(RpcError::Client(
"RPC params should serialize to a JSON array, or null".into(),
))
}
};

Expand Down
11 changes: 4 additions & 7 deletions core-client/transports/src/transports/duplex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Duplex transport
use failure::format_err;
use futures::channel::{mpsc, oneshot};
use futures::{
task::{Context, Poll},
Expand Down Expand Up @@ -195,7 +194,7 @@ where
// It's a regular Req-Res call, so just answer.
Some(PendingRequest::Call(tx)) => {
tx.send(result)
.map_err(|_| RpcError::Other(format_err!("oneshot channel closed")))?;
.map_err(|_| RpcError::Client("oneshot channel closed".into()))?;
continue;
}
// It was a subscription request,
Expand All @@ -219,11 +218,9 @@ where
);
}
} else {
let err = RpcError::Other(format_err!(
let err = RpcError::Client(format!(
"Subscription {:?} ({:?}) rejected: {:?}",
id,
method,
result,
id, method, result,
));

if subscription.channel.unbounded_send(result).is_err() {
Expand Down Expand Up @@ -276,7 +273,7 @@ where
// Writes queued messages to sink.
log::debug!("handle outgoing");
loop {
let err = || Err(RpcError::Other(failure::format_err!("closing")));
let err = || Err(RpcError::Client("closing".into()));
match self.sink.as_mut().poll_ready(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => return err().into(),
Expand Down
11 changes: 5 additions & 6 deletions core-client/transports/src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use super::RequestBuilder;
use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
use failure::format_err;
use futures::{Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use hyper::{http, rt, Client, Request, Uri};

Expand Down Expand Up @@ -39,13 +38,13 @@ fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
let max_parallel = 8;
let url: Uri = match url.parse() {
Ok(url) => url,
Err(e) => return ready(Err(RpcError::Other(e.into()))),
Err(e) => return ready(Err(RpcError::Other(Box::new(e)))),
};

#[cfg(feature = "tls")]
let connector = match hyper_tls::HttpsConnector::new(4) {
Ok(connector) => connector,
Err(e) => return ready(Err(RpcError::Other(e.into()))),
Err(e) => return ready(Err(RpcError::Other(Box::new(e)))),
};
#[cfg(feature = "tls")]
let client = Client::builder().build::<_, hyper::Body>(connector);
Expand Down Expand Up @@ -97,16 +96,16 @@ fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
let future = match result {
Ok(ref res) if !res.status().is_success() => {
log::trace!("http result status {}", res.status());
A(future::err(RpcError::Other(format_err!(
A(future::err(RpcError::Client(format!(
"Unexpected response status code: {}",
res.status()
))))
}
Ok(res) => B(res
.into_body()
.map_err(|e| RpcError::ParseError(e.to_string(), e.into()))
.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
.concat2()),
Err(err) => A(future::err(RpcError::Other(err.into()))),
Err(err) => A(future::err(RpcError::Other(Box::new(err)))),
};
future.then(|result| {
if let Some(sender) = sender {
Expand Down
8 changes: 4 additions & 4 deletions core-client/transports/src/transports/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where

fn send_response(&mut self) -> Result<(), RpcError> {
if let Buffered::Response(r) = std::mem::replace(&mut self.buffered, Buffered::None) {
self.queue.0.start_send(r).map_err(|e| RpcError::Other(e.into()))?;
self.queue.0.start_send(r).map_err(|e| RpcError::Other(Box::new(e)))?;
}
Ok(())
}
Expand All @@ -97,7 +97,7 @@ where
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.poll_buffered(cx))?;
futures::ready!(self.queue.0.poll_ready(cx))
.map_err(|e| RpcError::Other(e.into()))
.map_err(|e| RpcError::Other(Box::new(e)))
.into()
}

Expand All @@ -110,13 +110,13 @@ where
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.poll_buffered(cx))?;
futures::ready!(self.queue.0.poll_flush_unpin(cx))
.map_err(|e| RpcError::Other(e.into()))
.map_err(|e| RpcError::Other(Box::new(e)))
.into()
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.queue.0.poll_close_unpin(cx))
.map_err(|e| RpcError::Other(e.into()))
.map_err(|e| RpcError::Other(Box::new(e)))
.into()
}
}
Expand Down
2 changes: 1 addition & 1 deletion core-client/transports/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn parse_response(
response: &str,
) -> Result<(Id, Result<Value, RpcError>, Option<String>, Option<SubscriptionId>), RpcError> {
jsonrpc_core::serde_from_str::<ClientResponse>(response)
.map_err(|e| RpcError::ParseError(e.to_string(), e.into()))
.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
.map(|response| {
let id = response.id().unwrap_or(Id::Null);
let sid = response.subscription_id();
Expand Down
21 changes: 11 additions & 10 deletions core-client/transports/src/transports/ws.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! JSON-RPC websocket client implementation.
use crate::{RpcChannel, RpcError};
use failure::Error;
use futures01::prelude::*;
use log::info;
use std::collections::VecDeque;
Expand All @@ -11,11 +10,11 @@ use websocket::{ClientBuilder, OwnedMessage};
/// Uses an unbuffered channel to queue outgoing rpc messages.
///
/// Returns `Err` if the `url` is invalid.
pub fn try_connect<T>(url: &str) -> Result<impl Future<Item = T, Error = RpcError>, Error>
pub fn try_connect<T>(url: &str) -> Result<impl Future<Item = T, Error = RpcError>, RpcError>
where
T: From<RpcChannel>,
{
let client_builder = ClientBuilder::new(url)?;
let client_builder = ClientBuilder::new(url).map_err(|e| RpcError::Other(Box::new(e)))?;
Ok(do_connect(client_builder))
}

Expand Down Expand Up @@ -54,7 +53,7 @@ where
tokio::spawn(rpc_client);
sender.into()
})
.map_err(|error| RpcError::Other(error.into()))
.map_err(|error| RpcError::Other(Box::new(error)))
}

struct WebsocketClient<TSink, TStream> {
Expand All @@ -67,7 +66,7 @@ impl<TSink, TStream, TError> WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
TError: std::error::Error + Send + 'static,
{
pub fn new(sink: TSink, stream: TStream) -> Self {
Self {
Expand All @@ -82,7 +81,7 @@ impl<TSink, TStream, TError> Sink for WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
TError: std::error::Error + Send + 'static,
{
type SinkItem = String;
type SinkError = RpcError;
Expand All @@ -101,20 +100,22 @@ where
self.queue.push_front(request);
break;
}
Err(error) => return Err(RpcError::Other(error.into())),
Err(error) => return Err(RpcError::Other(Box::new(error))),
},
None => break,
}
}
self.sink.poll_complete().map_err(|error| RpcError::Other(error.into()))
self.sink
.poll_complete()
.map_err(|error| RpcError::Other(Box::new(error)))
}
}

impl<TSink, TStream, TError> Stream for WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
TError: std::error::Error + Send + 'static,
{
type Item = String;
type Error = RpcError;
Expand All @@ -134,7 +135,7 @@ where
return Ok(Async::Ready(None));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(error) => return Err(RpcError::Other(error.into())),
Err(error) => return Err(RpcError::Other(Box::new(error))),
}
}
}
Expand Down

0 comments on commit b165fac

Please sign in to comment.