From 94eefa5ac0f09fe185c1a6068ad9a056c9670981 Mon Sep 17 00:00:00 2001 From: Nick Sanders Date: Thu, 27 Sep 2018 12:27:18 -0700 Subject: [PATCH] Update hyper to version 0.12. (#303) * Update hyper to version 0.12. * Replace `tokio_core` with `tokio`. * `server_utils::reactor::Remote` has been renamed to `Executor`. The `Shared` variant now contains a `tokio::runtime::TaskExecutor`. This may need to be changed to a trait object (of `tokio::executor::Executor`) or be otherwise abstracted to conceal the type in the public API. * Bump crate versions to 0.9 * Update hyper to version 0.12. * Replace `tokio_core` with `tokio`. * `server_utils::reactor::Remote` has been renamed to `Executor`. The `Shared` variant now contains a `tokio::runtime::TaskExecutor`. This may need to be changed to a trait object (of `tokio::executor::Executor`) or be otherwise abstracted to conceal the type in the public API. * Bump crate versions to 0.9 * Fix compilation of hyper-0.2 * Address grumbles. * Re-export `AllowCors` from `http`. --- core/Cargo.toml | 2 +- http/Cargo.toml | 2 +- http/examples/http_meta.rs | 12 +- http/examples/http_middleware.rs | 4 +- http/src/handler.rs | 157 ++++++++--------- http/src/lib.rs | 112 ++++++------ http/src/response.rs | 77 +++++---- http/src/tests.rs | 91 +++++----- http/src/utils.rs | 51 +++--- ipc/Cargo.toml | 2 +- ipc/src/lib.rs | 2 +- ipc/src/server.rs | 94 ++++++----- minihttp/src/lib.rs | 2 +- server-utils/Cargo.toml | 15 +- server-utils/src/cors.rs | 281 ++++++++++++------------------- server-utils/src/lib.rs | 5 +- server-utils/src/reactor.rs | 108 +++++++----- server-utils/src/stream_codec.rs | 10 +- tcp/src/lib.rs | 2 +- tcp/src/server.rs | 60 ++++--- tcp/src/tests.rs | 197 ++++++++++++---------- ws/src/lib.rs | 2 +- ws/src/metadata.rs | 7 +- ws/src/server.rs | 22 +-- ws/src/server_builder.rs | 14 +- ws/src/session.rs | 16 +- 26 files changed, 679 insertions(+), 668 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 61ab522cd..5abfea54f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -19,7 +19,7 @@ categories = [ [dependencies] log = "0.4" -futures = "0.1.6" +futures = "~0.1.6" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" diff --git a/http/Cargo.toml b/http/Cargo.toml index e7f4cc3fe..2377ec54a 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["jsonrpc", "json-rpc", "json", "rpc", "server"] documentation = "https://paritytech.github.io/jsonrpc/jsonrpc_http_server/index.html" [dependencies] -hyper = "0.11" +hyper = "0.12" solana-jsonrpc-core = { version = "0.1", path = "../core" } solana-jsonrpc-server-utils = { version = "0.1", path = "../server-utils" } log = "0.4" diff --git a/http/examples/http_meta.rs b/http/examples/http_meta.rs index 5827e923c..c5d7a6a3c 100644 --- a/http/examples/http_meta.rs +++ b/http/examples/http_meta.rs @@ -1,9 +1,8 @@ extern crate solana_jsonrpc_http_server as jsonrpc_http_server; extern crate unicase; -use jsonrpc_http_server::{ServerBuilder, hyper, RestApi, AccessControlAllowHeaders}; +use jsonrpc_http_server::{ServerBuilder, hyper, RestApi, cors::AccessControlAllowHeaders}; use jsonrpc_http_server::jsonrpc_core::*; -use self::hyper::header; #[derive(Default, Clone)] struct Meta { @@ -27,15 +26,14 @@ fn main() { let server = ServerBuilder::new(io) .cors_allow_headers(AccessControlAllowHeaders::Only( vec![ - "Authorization", + "Authorization".to_owned(), ]) ) .rest_api(RestApi::Unsecure) // You can also implement `MetaExtractor` trait and pass a struct here. - .meta_extractor(|req: &hyper::Request| { - let auth = req.headers() - .get::>(); - let auth = auth.map(|h| h.token.clone()); + .meta_extractor(|req: &hyper::Request| { + let auth = req.headers().get(hyper::header::AUTHORIZATION) + .map(|h| h.to_str().unwrap_or("").to_owned()); Meta { auth } }) diff --git a/http/examples/http_middleware.rs b/http/examples/http_middleware.rs index 815b5b761..ed4671c17 100644 --- a/http/examples/http_middleware.rs +++ b/http/examples/http_middleware.rs @@ -14,8 +14,8 @@ fn main() { let server = ServerBuilder::new(io) .cors(DomainsValidation::AllowOnly(vec![AccessControlAllowOrigin::Null])) - .request_middleware(|request: hyper::server::Request| { - if request.path() == "/status" { + .request_middleware(|request: hyper::Request| { + if request.uri() == "/status" { Response::ok("Server running OK.").into() } else { request.into() diff --git a/http/src/handler.rs b/http/src/handler.rs index 26b90e15b..5b2af3b5e 100644 --- a/http/src/handler.rs +++ b/http/src/handler.rs @@ -3,16 +3,14 @@ use Rpc; use std::{fmt, mem, str}; use std::sync::Arc; -use hyper::{self, mime, server, Method}; -use hyper::header::{self, Headers}; -use unicase::Ascii; +use hyper::{self, service::Service, Body, Method}; +use hyper::header::{self, HeaderMap, HeaderValue}; -use jsonrpc::{self as core, FutureResult, Metadata, Middleware, NoopMiddleware}; +use jsonrpc::{self as core, FutureResult, Metadata, Middleware, NoopMiddleware, FutureRpcResult}; use jsonrpc::futures::{Future, Poll, Async, Stream, future}; use jsonrpc::serde_json; use response::Response; use server_utils::cors; -use server_utils::cors::AllowHeaders; use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts, RestApi}; @@ -22,7 +20,7 @@ pub struct ServerHandler = NoopMiddleware> { allowed_hosts: AllowedHosts, cors_domains: CorsDomains, cors_max_age: Option, - allowed_headers: cors::AccessControlAllowHeadersUnicase, + cors_allowed_headers: cors::AccessControlAllowHeaders, middleware: Arc, rest_api: RestApi, health_api: Option<(String, String)>, @@ -35,7 +33,7 @@ impl> ServerHandler { jsonrpc_handler: Rpc, cors_domains: CorsDomains, cors_max_age: Option, - allowed_headers: cors::AccessControlAllowHeadersUnicase, + cors_allowed_headers: cors::AccessControlAllowHeaders, allowed_hosts: AllowedHosts, middleware: Arc, rest_api: RestApi, @@ -47,7 +45,7 @@ impl> ServerHandler { allowed_hosts, cors_domains, cors_max_age, - allowed_headers, + cors_allowed_headers, middleware, rest_api, health_api, @@ -56,13 +54,13 @@ impl> ServerHandler { } } -impl> server::Service for ServerHandler { - type Request = server::Request; - type Response = server::Response; +impl> Service for ServerHandler { + type ReqBody = Body; + type ResBody = Body; type Error = hyper::Error; type Future = Handler; - fn call(&self, request: Self::Request) -> Self::Future { + fn call(&mut self, request: hyper::Request) -> Self::Future { let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts); let action = self.middleware.on_request(request); @@ -87,17 +85,17 @@ impl> server::Service for ServerHandler { Handler::Rpc(RpcHandler { jsonrpc_handler: self.jsonrpc_handler.clone(), state: RpcHandlerState::ReadingHeaders { - request: request, + request, cors_domains: self.cors_domains.clone(), + cors_headers: self.cors_allowed_headers.clone(), continue_on_invalid_cors: should_continue_on_invalid_cors, }, is_options: false, - cors_allow_origin: cors::AllowOrigin::NotRequired, + cors_max_age: self.cors_max_age, + cors_allow_origin: cors::AllowCors::NotRequired, + cors_allow_headers: cors::AllowCors::NotRequired, rest_api: self.rest_api, health_api: self.health_api.clone(), - cors_max_age: self.cors_max_age, - allowed_headers: self.allowed_headers.clone(), - cors_allow_headers: cors::AllowHeaders::NotRequired, max_request_body_size: self.max_request_body_size, }) } @@ -108,11 +106,11 @@ impl> server::Service for ServerHandler { pub enum Handler> { Rpc(RpcHandler), Error(Option), - Middleware(Box + Send>), + Middleware(Box, Error = hyper::Error> + Send>), } impl> Future for Handler { - type Item = server::Response; + type Item = hyper::Response; type Error = hyper::Error; fn poll(&mut self) -> Poll { @@ -146,7 +144,7 @@ impl RpcPollState where } type FutureResponse = future::Map< - future::Either, ()>, core::FutureRpcResult>, + future::Either, ()>, FutureRpcResult>, fn(Option) -> Response, >; @@ -155,8 +153,9 @@ enum RpcHandlerState where F: Future, Error = ()>, { ReadingHeaders { - request: server::Request, + request: hyper::Request, cors_domains: CorsDomains, + cors_headers: cors::AccessControlAllowHeaders, continue_on_invalid_cors: bool, }, ReadingBody { @@ -202,28 +201,25 @@ pub struct RpcHandler> { jsonrpc_handler: Rpc, state: RpcHandlerState, is_options: bool, - cors_allow_origin: cors::AllowOrigin, + cors_allow_origin: cors::AllowCors, + cors_allow_headers: cors::AllowCors>, cors_max_age: Option, - cors_allow_headers: cors::AllowHeaders, - allowed_headers: cors::AccessControlAllowHeadersUnicase, rest_api: RestApi, health_api: Option<(String, String)>, max_request_body_size: usize, } impl> Future for RpcHandler { - type Item = server::Response; + type Item = hyper::Response; type Error = hyper::Error; fn poll(&mut self) -> Poll { - let allowed_headers = self.allowed_headers.clone(); - let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) { - RpcHandlerState::ReadingHeaders { request, cors_domains, continue_on_invalid_cors, } => { + RpcHandlerState::ReadingHeaders { request, cors_domains, cors_headers, continue_on_invalid_cors, } => { // Read cors header self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains); - self.cors_allow_headers = utils::cors_allow_headers(&request, &allowed_headers); - self.is_options = *request.method() == Method::Options; + self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers); + self.is_options = *request.method() == Method::OPTIONS; // Read other headers RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors)) }, @@ -279,16 +275,16 @@ impl> Future for RpcHandler { let (new_state, is_ready) = new_state.decompose(); match new_state { RpcHandlerState::Writing(res) => { - let mut response: server::Response = res.into(); - let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowOrigin::Invalid); - let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowHeaders::Invalid); + let mut response: hyper::Response = res.into(); + let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowCors::Invalid); + let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowCors::Invalid); Self::set_response_headers( response.headers_mut(), self.is_options, - cors_allow_origin.into(), self.cors_max_age, - cors_allow_headers, + cors_allow_origin.into(), + cors_allow_headers.into(), ); Ok(Async::Ready(response)) }, @@ -305,7 +301,7 @@ impl> Future for RpcHandler { } // Intermediate and internal error type to better distinguish -// error cases occuring during request body processing. +// error cases occurring during request body processing. enum BodyError { Hyper(hyper::Error), Utf8(str::Utf8Error), @@ -321,13 +317,13 @@ impl From for BodyError { impl> RpcHandler { fn read_headers( &self, - request: server::Request, + request: hyper::Request, continue_on_invalid_cors: bool, ) -> RpcHandlerState { - if self.cors_allow_origin == cors::AllowOrigin::Invalid && !continue_on_invalid_cors { + if self.cors_allow_origin == cors::AllowCors::Invalid && !continue_on_invalid_cors { return RpcHandlerState::Writing(Response::invalid_allow_origin()); } - if self.cors_allow_headers == cors::AllowHeaders::Invalid && !continue_on_invalid_cors { + if self.cors_allow_headers == cors::AllowCors::Invalid && !continue_on_invalid_cors { return RpcHandlerState::Writing(Response::invalid_allow_headers()); } @@ -338,31 +334,31 @@ impl> RpcHandler { match *request.method() { // Validate the ContentType header // to prevent Cross-Origin XHRs with text/plain - Method::Post if Self::is_json(request.headers().get::()) => { + Method::POST if Self::is_json(request.headers().get("content-type")) => { let uri = if self.rest_api != RestApi::Disabled { Some(request.uri().clone()) } else { None }; RpcHandlerState::ReadingBody { metadata, request: Default::default(), uri, - body: request.body(), + body: request.into_body(), } }, - Method::Post if self.rest_api == RestApi::Unsecure && request.uri().path().split('/').count() > 2 => { + Method::POST if self.rest_api == RestApi::Unsecure && request.uri().path().split('/').count() > 2 => { RpcHandlerState::ProcessRest { metadata, uri: request.uri().clone(), } }, // Just return error for unsupported content type - Method::Post => { + Method::POST => { RpcHandlerState::Writing(Response::unsupported_content_type()) }, // Don't validate content type on options - Method::Options => { + Method::OPTIONS => { RpcHandlerState::Writing(Response::empty()) }, // Respond to health API request if there is one configured. - Method::Get if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => { + Method::GET if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => { RpcHandlerState::ProcessHealth { metadata, method: self.health_api.as_ref() @@ -497,51 +493,58 @@ impl> RpcHandler { } fn set_response_headers( - headers: &mut Headers, + headers: &mut HeaderMap, is_options: bool, - cors_allow_origin: Option, cors_max_age: Option, - cors_allow_headers: AllowHeaders, + cors_allow_origin: Option, + cors_allow_headers: Option>, ) { + let as_header = |m: Method| m.as_str().parse().expect("`Method` will always parse; qed"); + let concat = |headers: &[HeaderValue]| { + let separator = b", "; + let val = headers + .iter() + .flat_map(|h| h.as_bytes().iter().chain(separator.iter())) + .cloned() + .collect::>(); + let max_len = if val.is_empty() { 0 } else { val.len() - 2 }; + HeaderValue::from_bytes(&val[..max_len]).expect("Concatenation of valid headers with `, ` is still valid; qed") + }; + + let allowed = concat(&[as_header(Method::OPTIONS), as_header(Method::POST)]); + if is_options { - headers.set(header::Allow(vec![ - Method::Options, - Method::Post, - ])); - headers.set(header::Accept(vec![ - header::qitem(mime::APPLICATION_JSON) - ])); + headers.append(header::ALLOW, allowed.clone()); + headers.append(header::ACCEPT, HeaderValue::from_static("application/json")); } - if let Some(cors_domain) = cors_allow_origin { - headers.set(header::AccessControlAllowMethods(vec![ - Method::Options, - Method::Post - ])); + if let Some(cors_allow_origin) = cors_allow_origin { + headers.append(header::VARY, HeaderValue::from_static("origin")); + headers.append(header::ACCESS_CONTROL_ALLOW_METHODS, allowed); + headers.append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_allow_origin); - if let AllowHeaders::Ok(cors_allow_headers) = cors_allow_headers { - if !cors_allow_headers.is_empty() { - headers.set(cors_allow_headers); - } + if let Some(cma) = cors_max_age { + headers.append( + header::ACCESS_CONTROL_MAX_AGE, + HeaderValue::from_str(&cma.to_string()).expect("`u32` will always parse; qed") + ); } - if let Some(cors_max_age) = cors_max_age { - headers.set(header::AccessControlMaxAge(cors_max_age)); + if let Some(cors_allow_headers) = cors_allow_headers { + if !cors_allow_headers.is_empty() { + headers.append(header::ACCESS_CONTROL_ALLOW_HEADERS, concat(&cors_allow_headers)); + } } - headers.set(cors_domain); - headers.set(header::Vary::Items(vec![ - Ascii::new("origin".to_owned()) - ])); } } - fn is_json(content_type: Option<&header::ContentType>) -> bool { - const APPLICATION_JSON_UTF_8: &str = "application/json; charset=utf-8"; - - match content_type { - Some(&header::ContentType(ref mime)) - if *mime == mime::APPLICATION_JSON || *mime == APPLICATION_JSON_UTF_8 => true, - _ => false + /// Returns true if the `content_type` header indicates a valid JSON + /// message. + fn is_json(content_type: Option<&header::HeaderValue>) -> bool { + match content_type.and_then(|val| val.to_str().ok()) { + Some("application/json") => true, + Some("application/json; charset=utf-8") => true, + _ => false, } } } diff --git a/http/src/lib.rs b/http/src/lib.rs index 597a1f0f4..72d3d8440 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -44,19 +44,18 @@ use std::sync::{mpsc, Arc}; use std::net::SocketAddr; use std::thread; -use hyper::server; +use hyper::{server, Body}; use jsonrpc_core as jsonrpc; use jsonrpc::MetaIoHandler; -use jsonrpc::futures::{self, Future, Stream}; +use jsonrpc::futures::{self, Future, Stream, future}; use jsonrpc::futures::sync::oneshot; -use server_utils::reactor::{Remote, UninitializedRemote}; +use server_utils::reactor::{Executor, UninitializedExecutor}; pub use server_utils::hosts::{Host, DomainsValidation}; -pub use server_utils::cors::{AccessControlAllowOrigin, Origin, AccessControlAllowHeaders}; -pub use server_utils::cors; -pub use server_utils::tokio_core; +pub use server_utils::cors::{self, AccessControlAllowOrigin, Origin, AllowCors}; +pub use server_utils::tokio; pub use handler::ServerHandler; -pub use utils::{is_host_allowed, cors_allow_origin, AllowOrigin}; +pub use utils::{is_host_allowed, cors_allow_origin, cors_allow_headers}; pub use response::Response; /// Action undertaken by a middleware. @@ -67,14 +66,14 @@ pub enum RequestMiddlewareAction { /// This allows for side effects to take place. should_continue_on_invalid_cors: bool, /// The request object returned - request: server::Request, + request: hyper::Request, }, /// Intercept the request and respond differently. Respond { /// Should standard hosts validation be performed? should_validate_hosts: bool, /// a future for server response - response: Box + Send>, + response: Box, Error=hyper::Error> + Send>, } } @@ -87,8 +86,8 @@ impl From for RequestMiddlewareAction { } } -impl From for RequestMiddlewareAction { - fn from(response: server::Response) -> Self { +impl From> for RequestMiddlewareAction { + fn from(response: hyper::Response) -> Self { RequestMiddlewareAction::Respond { should_validate_hosts: true, response: Box::new(futures::future::ok(response)), @@ -96,8 +95,8 @@ impl From for RequestMiddlewareAction { } } -impl From for RequestMiddlewareAction { - fn from(request: server::Request) -> Self { +impl From> for RequestMiddlewareAction { + fn from(request: hyper::Request) -> Self { RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors: false, request, @@ -108,13 +107,13 @@ impl From for RequestMiddlewareAction { /// Allows to intercept request and handle it differently. pub trait RequestMiddleware: Send + Sync + 'static { /// Takes a request and decides how to proceed with it. - fn on_request(&self, request: server::Request) -> RequestMiddlewareAction; + fn on_request(&self, request: hyper::Request) -> RequestMiddlewareAction; } impl RequestMiddleware for F where - F: Fn(server::Request) -> RequestMiddlewareAction + Sync + Send + 'static, + F: Fn(hyper::Request) -> RequestMiddlewareAction + Sync + Send + 'static, { - fn on_request(&self, request: server::Request) -> RequestMiddlewareAction { + fn on_request(&self, request: hyper::Request) -> RequestMiddlewareAction { (*self)(request) } } @@ -122,7 +121,7 @@ impl RequestMiddleware for F where #[derive(Default)] struct NoopRequestMiddleware; impl RequestMiddleware for NoopRequestMiddleware { - fn on_request(&self, request: server::Request) -> RequestMiddlewareAction { + fn on_request(&self, request: hyper::Request) -> RequestMiddlewareAction { RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors: false, request, @@ -133,14 +132,14 @@ impl RequestMiddleware for NoopRequestMiddleware { /// Extracts metadata from the HTTP request. pub trait MetaExtractor: Sync + Send + 'static { /// Read the metadata from the request - fn read_metadata(&self, _: &server::Request) -> M; + fn read_metadata(&self, _: &hyper::Request) -> M; } impl MetaExtractor for F where M: jsonrpc::Metadata, - F: Fn(&server::Request) -> M + Sync + Send + 'static, + F: Fn(&hyper::Request) -> M + Sync + Send + 'static, { - fn read_metadata(&self, req: &server::Request) -> M { + fn read_metadata(&self, req: &hyper::Request) -> M { (*self)(req) } } @@ -148,7 +147,7 @@ impl MetaExtractor for F where #[derive(Default)] struct NoopExtractor; impl MetaExtractor for NoopExtractor { - fn read_metadata(&self, _: &server::Request) -> M { + fn read_metadata(&self, _: &hyper::Request) -> M { M::default() } } @@ -194,12 +193,12 @@ pub enum RestApi { /// Convenient JSON-RPC HTTP Server builder. pub struct ServerBuilder = jsonrpc::NoopMiddleware> { handler: Arc>, - remote: UninitializedRemote, + executor: UninitializedExecutor, meta_extractor: Arc>, request_middleware: Arc, cors_domains: CorsDomains, cors_max_age: Option, - allowed_headers: cors::AccessControlAllowHeadersUnicase, + allowed_headers: cors::AccessControlAllowHeaders, allowed_hosts: AllowedHosts, rest_api: RestApi, health_api: Option<(String, String)>, @@ -233,12 +232,12 @@ impl> ServerBuilder { { ServerBuilder { handler: Arc::new(handler.into()), - remote: UninitializedRemote::Unspawned, + executor: UninitializedExecutor::Unspawned, meta_extractor: Arc::new(extractor), request_middleware: Arc::new(NoopRequestMiddleware::default()), cors_domains: None, cors_max_age: None, - allowed_headers: cors::AccessControlAllowHeadersUnicase::Any, + allowed_headers: cors::AccessControlAllowHeaders::Any, allowed_hosts: None, rest_api: RestApi::Disabled, health_api: None, @@ -248,11 +247,11 @@ impl> ServerBuilder { } } - /// Utilize existing event loop remote to poll RPC results. + /// Utilize existing event loop executor to poll RPC results. /// /// Applies only to 1 of the threads. Other threads will spawn their own Event Loops. - pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self { - self.remote = UninitializedRemote::Shared(remote); + pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self { + self.executor = UninitializedExecutor::Shared(executor); self } @@ -377,11 +376,11 @@ impl> ServerBuilder { let (local_addr_tx, local_addr_rx) = mpsc::channel(); let (close, shutdown_signal) = oneshot::channel(); - let eloop = self.remote.init_with_name("http.worker0")?; + let eloop = self.executor.init_with_name("http.worker0")?; let req_max_size = self.max_request_body_size; serve( (shutdown_signal, local_addr_tx), - eloop.remote(), + eloop.executor(), addr.to_owned(), cors_domains.clone(), cors_max_age, @@ -398,10 +397,10 @@ impl> ServerBuilder { let handles = (0..self.threads - 1).map(|i| { let (local_addr_tx, local_addr_rx) = mpsc::channel(); let (close, shutdown_signal) = oneshot::channel(); - let eloop = UninitializedRemote::Unspawned.init_with_name(format!("http.worker{}", i + 1))?; + let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?; serve( (shutdown_signal, local_addr_tx), - eloop.remote(), + eloop.executor(), addr.to_owned(), cors_domains.clone(), cors_max_age, @@ -426,11 +425,11 @@ impl> ServerBuilder { Ok((eloop, close)) }).collect::)>>()?; handles.push((eloop, close)); - let (remotes, close) = handles.into_iter().unzip(); + let (executors, close) = handles.into_iter().unzip(); Ok(Server { address: local_addr?, - remote: Some(remotes), + executor: Some(executors), close: Some(close), }) } @@ -444,11 +443,11 @@ fn recv_address(local_addr_rx: mpsc::Receiver>) -> io::Re fn serve>( signals: (oneshot::Receiver<()>, mpsc::Sender>), - remote: tokio_core::reactor::Remote, + executor: tokio::runtime::TaskExecutor, addr: SocketAddr, cors_domains: CorsDomains, cors_max_age: Option, - allowed_headers: cors::AccessControlAllowHeadersUnicase, + allowed_headers: cors::AccessControlAllowHeaders, request_middleware: Arc, allowed_hosts: AllowedHosts, jsonrpc_handler: Rpc, @@ -459,8 +458,9 @@ fn serve>( max_request_body_size: usize, ) { let (shutdown_signal, local_addr_tx) = signals; - remote.spawn(move |handle| { - let handle1 = handle.clone(); + executor.spawn(future::lazy(move || { + let handle = tokio::reactor::Handle::current(); + let bind = move || { let listener = match addr { SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?, @@ -470,7 +470,7 @@ fn serve>( listener.reuse_address(true)?; listener.bind(&addr)?; let listener = listener.listen(1024)?; - let listener = tokio_core::net::TcpListener::from_listener(listener, &addr, &handle1)?; + let listener = tokio::net::TcpListener::from_std(listener, &handle)?; // Add current host to allowed headers. // NOTE: we need to use `l.local_addr()` instead of `addr` // it might be different! @@ -498,19 +498,15 @@ fn serve>( } }; - let handle = handle.clone(); bind_result.and_then(move |(listener, local_addr)| { let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr); - let http = { - let mut http = server::Http::new(); - http.keep_alive(keep_alive); - http.sleep_on_errors(true); - http - }; + let mut http = server::conn::Http::new(); + http.keep_alive(keep_alive); + listener.incoming() - .for_each(move |(socket, addr)| { - http.bind_connection(&handle, socket, addr, ServerHandler::new( + .for_each(move |socket| { + let service = ServerHandler::new( jsonrpc_handler.clone(), cors_domains.clone(), cors_max_age, @@ -520,7 +516,9 @@ fn serve>( rest_api, health_api.clone(), max_request_body_size, - )); + ); + tokio::spawn(http.serve_connection(socket, service) + .map_err(|e| error!("Error serving connection: {:?}", e))); Ok(()) }) .map_err(|e| { @@ -532,7 +530,7 @@ fn serve>( .map(|_| ()) .map_err(|_| ()) }) - }); + })); } #[cfg(unix)] @@ -554,7 +552,7 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> { /// jsonrpc http server instance pub struct Server { address: SocketAddr, - remote: Option>, + executor: Option>, close: Option>>, } @@ -571,23 +569,23 @@ impl Server { let _ = close.send(()); } - for remote in self.remote.take().expect(PROOF) { - remote.close(); + for executor in self.executor.take().expect(PROOF) { + executor.close(); } } /// Will block, waiting for the server to finish. pub fn wait(mut self) { - for remote in self.remote.take().expect(PROOF) { - remote.wait(); + for executor in self.executor.take().expect(PROOF) { + executor.wait(); } } } impl Drop for Server { fn drop(&mut self) { - self.remote.take().map(|remotes| { - for remote in remotes { remote.close(); } + self.executor.take().map(|executors| { + for executor in executors { executor.close(); } }); } } diff --git a/http/src/response.rs b/http/src/response.rs index a3511fb88..24748aad9 100644 --- a/http/src/response.rs +++ b/http/src/response.rs @@ -1,8 +1,6 @@ //! Basic Request/Response structures used internally. -use hyper::server; - -pub use hyper::{header, Method, StatusCode}; +pub use hyper::{self, Method, Body, StatusCode, header::HeaderValue}; /// Simple server response structure #[derive(Debug)] @@ -10,7 +8,7 @@ pub struct Response { /// Response code pub code: StatusCode, /// Response content type - pub content_type: header::ContentType, + pub content_type: HeaderValue, /// Response body pub content: String, } @@ -24,8 +22,8 @@ impl Response { /// Create a response with given body and 200 OK status code. pub fn ok>(response: T) -> Self { Response { - code: StatusCode::Ok, - content_type: header::ContentType::json(), + code: StatusCode::OK, + content_type: HeaderValue::from_static("application/json; charset=utf-8"), content: response.into(), } } @@ -33,8 +31,8 @@ impl Response { /// Create a response for plaintext internal error. pub fn internal_error>(msg: T) -> Self { Response { - code: StatusCode::InternalServerError, - content_type: header::ContentType::plaintext(), + code: StatusCode::INTERNAL_SERVER_ERROR, + content_type: plain_text(), content: format!("Internal Server Error: {}", msg.into()), } } @@ -42,17 +40,17 @@ impl Response { /// Create a json response for service unavailable. pub fn service_unavailable>(msg: T) -> Self { Response { - code: StatusCode::ServiceUnavailable, - content_type: header::ContentType::json(), - content: msg.into(), + code: StatusCode::SERVICE_UNAVAILABLE, + content_type: HeaderValue::from_static("application/json; charset=utf-8"), + content: format!("Service Unavailable: {}", msg.into()), } } /// Create a response for not allowed hosts. pub fn host_not_allowed() -> Self { Response { - code: StatusCode::Forbidden, - content_type: header::ContentType::plaintext(), + code: StatusCode::FORBIDDEN, + content_type: plain_text(), content: "Provided Host header is not whitelisted.\n".to_owned(), } } @@ -60,8 +58,8 @@ impl Response { /// Create a response for unsupported content type. pub fn unsupported_content_type() -> Self { Response { - code: StatusCode::UnsupportedMediaType, - content_type: header::ContentType::plaintext(), + code: StatusCode::UNSUPPORTED_MEDIA_TYPE, + content_type: plain_text(), content: "Supplied content type is not allowed. Content-Type: application/json is required\n".to_owned(), } } @@ -69,8 +67,8 @@ impl Response { /// Create a response for disallowed method used. pub fn method_not_allowed() -> Self { Response { - code: StatusCode::MethodNotAllowed, - content_type: header::ContentType::plaintext(), + code: StatusCode::METHOD_NOT_ALLOWED, + content_type: plain_text(), content: "Used HTTP Method is not allowed. POST or OPTIONS is required\n".to_owned(), } } @@ -78,8 +76,8 @@ impl Response { /// CORS invalid pub fn invalid_allow_origin() -> Self { Response { - code: StatusCode::Forbidden, - content_type: header::ContentType::plaintext(), + code: StatusCode::FORBIDDEN, + content_type: plain_text(), content: "Origin of the request is not whitelisted. CORS headers would not be sent and any side-effects were cancelled as well.\n".to_owned(), } } @@ -87,17 +85,17 @@ impl Response { /// CORS header invalid pub fn invalid_allow_headers() -> Self { Response { - code: StatusCode::Forbidden, - content_type: header::ContentType::plaintext(), - content: "Header field is not allowed.\n".to_owned(), + code: StatusCode::FORBIDDEN, + content_type: plain_text(), + content: "Requested headers are not allowed for CORS. CORS headers would not be sent and any side-effects were cancelled as well.\n".to_owned(), } } /// Create a response for bad request pub fn bad_request>(msg: S) -> Self { Response { - code: StatusCode::BadRequest, - content_type: header::ContentType::plaintext(), + code: StatusCode::BAD_REQUEST, + content_type: plain_text(), content: msg.into() } } @@ -105,18 +103,33 @@ impl Response { /// Create a response for too large (413) pub fn too_large>(msg: S) -> Self { Response { - code: StatusCode::PayloadTooLarge, - content_type: header::ContentType::plaintext(), + code: StatusCode::PAYLOAD_TOO_LARGE, + content_type: plain_text(), content: msg.into() } } } -impl Into for Response { - fn into(self) -> server::Response { - server::Response::new() - .with_status(self.code) - .with_header(self.content_type) - .with_body(self.content) +fn plain_text() -> HeaderValue { + HeaderValue::from_static("text/plain; charset=utf-8") +} + +// TODO: Consider switching to a `TryFrom` conversion once it stabilizes. +impl From for hyper::Response { + /// Converts from a jsonrpc `Response` to a `hyper::Response` + /// + /// ## Panics + /// + /// Panics if the response cannot be converted due to failure to parse + /// body content. + /// + fn from(res: Response) -> hyper::Response { + hyper::Response::builder() + .status(res.code) + .header("content-type", res.content_type) + .body(res.content.into()) + // Parsing `StatusCode` and `HeaderValue` is infalliable but + // parsing body content is not. + .expect("Unable to parse response body for type conversion") } } diff --git a/http/src/tests.rs b/http/src/tests.rs index 67e663819..6655adf56 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -109,7 +109,7 @@ fn request(server: Server, request: &str) -> Response { let mut lines = response.lines(); let status = lines.next().unwrap().to_owned(); - let headers = read_block(&mut lines); + let headers = read_block(&mut lines); let body = read_block(&mut lines); Response { @@ -137,7 +137,7 @@ fn should_return_method_not_allowed_for_get() { // then assert_eq!(response.status, "HTTP/1.1 405 Method Not Allowed".to_owned()); - assert_eq!(response.body, "3D\nUsed HTTP Method is not allowed. POST or OPTIONS is required\n".to_owned()); + assert_eq!(response.body, "Used HTTP Method is not allowed. POST or OPTIONS is required\n".to_owned()); } #[test] @@ -158,7 +158,7 @@ fn should_handle_health_endpoint() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - assert_eq!(response.body, "7\n\"world\"\n0\n"); + assert_eq!(response.body, "\"world\"\n"); } #[test] @@ -179,7 +179,7 @@ fn should_handle_health_endpoint_failure() { // then assert_eq!(response.status, "HTTP/1.1 503 Service Unavailable".to_owned()); - assert_eq!(response.body, "25\n{\"code\":-34,\"message\":\"Server error\"}\n0\n"); + assert_eq!(response.body, "Service Unavailable: {\"code\":-34,\"message\":\"Server error\"}\n"); } #[test] @@ -200,7 +200,7 @@ fn should_return_unsupported_media_type_if_not_json() { // then assert_eq!(response.status, "HTTP/1.1 415 Unsupported Media Type".to_owned()); - assert_eq!(response.body, "51\nSupplied content type is not allowed. Content-Type: application/json is required\n".to_owned()); + assert_eq!(response.body, "Supplied content type is not allowed. Content-Type: application/json is required\n".to_owned()); } #[test] @@ -272,7 +272,7 @@ fn should_return_empty_response_for_notification() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - assert_eq!(response.body, "0\n".to_owned()); + assert_eq!(response.body, "".to_owned()); } @@ -323,7 +323,7 @@ fn should_add_cors_allow_origins() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); assert_eq!(response.body, method_not_found()); - assert!(response.headers.contains("Access-Control-Allow-Origin: http://parity.io"), "Headers missing in {}", response.headers); + assert!(response.headers.contains("access-control-allow-origin: http://parity.io"), "Headers missing in {}", response.headers); } #[test] @@ -349,8 +349,8 @@ fn should_add_cors_max_age_headers() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); assert_eq!(response.body, method_not_found()); - assert!(response.headers.contains("Access-Control-Allow-Origin: http://parity.io"), "Headers missing in {}", response.headers); - assert!(response.headers.contains("Access-Control-Max-Age: 1000"), "Headers missing in {}", response.headers); + assert!(response.headers.contains("access-control-allow-origin: http://parity.io"), "Headers missing in {}", response.headers); + assert!(response.headers.contains("access-control-max-age: 1000"), "Headers missing in {}", response.headers); } #[test] @@ -424,9 +424,9 @@ fn should_return_proper_headers_on_options() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - assert!(response.headers.contains("Allow: OPTIONS, POST"), "Headers missing in {}", response.headers); - assert!(response.headers.contains("Accept: application/json"), "Headers missing in {}", response.headers); - assert_eq!(response.body, "0\n"); + assert!(response.headers.contains("allow: OPTIONS, POST"), "Headers missing in {}", response.headers); + assert!(response.headers.contains("accept: application/json"), "Headers missing in {}", response.headers); + assert_eq!(response.body, ""); } #[test] @@ -452,7 +452,7 @@ fn should_add_cors_allow_origin_for_null_origin() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); assert_eq!(response.body, method_not_found()); - assert!(response.headers.contains("Access-Control-Allow-Origin: null"), "Headers missing in {}", response.headers); + assert!(response.headers.contains("access-control-allow-origin: null"), "Headers missing in {}", response.headers); } #[test] @@ -478,7 +478,7 @@ fn should_add_cors_allow_origin_for_null_origin_when_all() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); assert_eq!(response.body, method_not_found()); - assert!(response.headers.contains("Access-Control-Allow-Origin: null"), "Headers missing in {}", response.headers); + assert!(response.headers.contains("access-control-allow-origin: null"), "Headers missing in {}", response.headers); } #[test] @@ -577,8 +577,8 @@ fn should_allow_if_host_is_valid() { fn should_respond_configured_allowed_hosts_to_options() { // given let allowed = vec![ - "X-Allowed", - "X-AlsoAllowed", + "X-Allowed".to_owned(), + "X-AlsoAllowed".to_owned(), ]; let custom = cors::AccessControlAllowHeaders::Only(allowed.clone()); let server = serve_allow_headers(custom); @@ -599,7 +599,7 @@ fn should_respond_configured_allowed_hosts_to_options() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - let expected = format!("Access-Control-Allow-Headers: {}", &allowed.join(", ")); + let expected = format!("access-control-allow-headers: {}", &allowed.join(", ")); assert!(response.headers.contains(&expected), "Headers missing in {}", response.headers); } @@ -623,7 +623,7 @@ fn should_not_contain_default_cors_allow_headers() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - assert!(!response.headers.contains("Access-Control-Allow-Headers:"), + assert!(!response.headers.contains("access-control-allow-headers:"), "Header should not be in {}", response.headers); } @@ -648,7 +648,7 @@ fn should_respond_valid_to_default_allowed_headers() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - let expected = "Access-Control-Allow-Headers: Accept, Content-Type, Origin"; + let expected = "access-control-allow-headers: Accept, Content-Type, Origin"; assert!(response.headers.contains(expected), "Headers missing in {}", response.headers); } @@ -656,8 +656,8 @@ fn should_respond_valid_to_default_allowed_headers() { fn should_by_default_respond_valid_to_any_request_headers() { // given let allowed = vec![ - "X-Abc", - "X-123", + "X-Abc".to_owned(), + "X-123".to_owned(), ]; let custom = cors::AccessControlAllowHeaders::Only(allowed.clone()); let server = serve_allow_headers(custom); @@ -678,7 +678,7 @@ fn should_by_default_respond_valid_to_any_request_headers() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - let expected = format!("Access-Control-Allow-Headers: {}", &allowed.join(", ")); + let expected = format!("access-control-allow-headers: {}", &allowed.join(", ")); assert!(response.headers.contains(&expected), "Headers missing in {}", response.headers); } @@ -686,8 +686,8 @@ fn should_by_default_respond_valid_to_any_request_headers() { fn should_respond_valid_to_configured_allow_headers() { // given let allowed = vec![ - "X-Allowed", - "X-AlsoAllowed", + "X-Allowed".to_owned(), + "X-AlsoAllowed".to_owned(), ]; let custom = cors::AccessControlAllowHeaders::Only(allowed.clone()); let server = serve_allow_headers(custom); @@ -708,7 +708,7 @@ fn should_respond_valid_to_configured_allow_headers() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - let expected = format!("Access-Control-Allow-Headers: {}", &allowed.join(", ")); + let expected = format!("access-control-allow-headers: {}", &allowed.join(", ")); assert!(response.headers.contains(&expected), "Headers missing in {}", response.headers); } @@ -717,7 +717,7 @@ fn should_respond_invalid_if_non_allowed_header_used() { // given let custom = cors::AccessControlAllowHeaders::Only( vec![ - "X-Allowed", + "X-Allowed".to_owned(), ]); let server = serve_allow_headers(custom); @@ -745,7 +745,7 @@ fn should_respond_valid_if_allowed_header_used() { // given let custom = cors::AccessControlAllowHeaders::Only( vec![ - "X-Allowed", + "X-Allowed".to_owned(), ]); let server = serve_allow_headers(custom); let addr = server.address().clone(); @@ -775,7 +775,7 @@ fn should_respond_valid_if_case_insensitive_allowed_header_used() { // given let custom = cors::AccessControlAllowHeaders::Only( vec![ - "X-Allowed", + "X-Allowed".to_owned(), ]); let server = serve_allow_headers(custom); let addr = server.address().clone(); @@ -804,8 +804,8 @@ fn should_respond_valid_if_case_insensitive_allowed_header_used() { fn should_respond_valid_on_case_mismatches_in_allowed_headers() { // given let allowed = vec![ - "X-Allowed", - "X-AlsoAllowed", + "X-Allowed".to_owned(), + "X-AlsoAllowed".to_owned(), ]; let custom = cors::AccessControlAllowHeaders::Only(allowed.clone()); let server = serve_allow_headers(custom); @@ -827,7 +827,7 @@ fn should_respond_valid_on_case_mismatches_in_allowed_headers() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); let contained = response.headers.contains( - "Access-Control-Allow-Headers: x-ALLoweD, x-alSOaLloWeD" + "access-control-allow-headers: x-ALLoweD, x-alSOaLloWeD" ); assert!(contained, "Headers missing in {}", response.headers); } @@ -855,16 +855,16 @@ fn should_respond_valid_to_any_requested_header() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - let expected = format!("Access-Control-Allow-Headers: {}", headers); + let expected = format!("access-control-allow-headers: {}", headers); assert!(response.headers.contains(&expected), "Headers missing in {}", response.headers); } #[test] -fn should_respond_invalid_to_wildcard_if_only_certain_headers_allowed() { +fn should_forbid_invalid_request_headers() { // given let custom = cors::AccessControlAllowHeaders::Only( vec![ - "X-Allowed", + "X-Allowed".to_owned(), ]); let server = serve_allow_headers(custom); @@ -883,6 +883,9 @@ fn should_respond_invalid_to_wildcard_if_only_certain_headers_allowed() { ); // then + // According to the spec wildcard is nly supported for `Allow-Origin`, + // some ppl believe it should be supported by other `Allow-*` headers, + // but I didn't see any mention of allowing wildcard for `Request-Headers`. assert_eq!(response.status, "HTTP/1.1 403 Forbidden".to_owned()); assert_eq!(response.body, cors_invalid_allow_headers()); } @@ -908,7 +911,7 @@ fn should_respond_valid_to_wildcard_if_any_header_allowed() { // then assert_eq!(response.status, "HTTP/1.1 200 OK".to_owned()); - assert!(response.headers.contains("Access-Control-Allow-Headers: *"), + assert!(response.headers.contains("access-control-allow-headers: *"), "Headers missing in {}", response.headers); } @@ -1132,34 +1135,34 @@ fn should_return_error_in_case_of_unsecure_rest_and_no_method() { // then assert_eq!(response.status, "HTTP/1.1 415 Unsupported Media Type".to_owned()); - assert_eq!(&response.body, "51\nSupplied content type is not allowed. Content-Type: application/json is required\n"); + assert_eq!(&response.body, "Supplied content type is not allowed. Content-Type: application/json is required\n"); } fn invalid_host() -> String { - "29\nProvided Host header is not whitelisted.\n".into() + "Provided Host header is not whitelisted.\n".into() } fn cors_invalid_allow_origin() -> String { - "76\nOrigin of the request is not whitelisted. CORS headers would not be sent and any side-effects were cancelled as well.\n".into() + "Origin of the request is not whitelisted. CORS headers would not be sent and any side-effects were cancelled as well.\n".into() } fn cors_invalid_allow_headers() -> String { - "1D\nHeader field is not allowed.\n".into() + "Requested headers are not allowed for CORS. CORS headers would not be sent and any side-effects were cancelled as well.\n".into() } fn method_not_found() -> String { - "4E\n{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32601,\"message\":\"Method not found\"},\"id\":1}\n".into() + "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32601,\"message\":\"Method not found\"},\"id\":1}\n".into() } fn invalid_request() -> String { - "50\n{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32600,\"message\":\"Invalid request\"},\"id\":null}\n".into() + "{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32600,\"message\":\"Invalid request\"},\"id\":null}\n".into() } fn world() -> String { - "2A\n{\"jsonrpc\":\"2.0\",\"result\":\"world\",\"id\":1}\n".into() + "{\"jsonrpc\":\"2.0\",\"result\":\"world\",\"id\":1}\n".into() } fn world_5() -> String { - "2D\n{\"jsonrpc\":\"2.0\",\"result\":\"world: 5\",\"id\":1}\n".into() + "{\"jsonrpc\":\"2.0\",\"result\":\"world: 5\",\"id\":1}\n".into() } fn world_batch() -> String { - "2C\n[{\"jsonrpc\":\"2.0\",\"result\":\"world\",\"id\":1}]\n".into() + "[{\"jsonrpc\":\"2.0\",\"result\":\"world\",\"id\":1}]\n".into() } diff --git a/http/src/utils.rs b/http/src/utils.rs index fb04ae871..eb15423a8 100644 --- a/http/src/utils.rs +++ b/http/src/utils.rs @@ -1,45 +1,54 @@ -use hyper::{header, server}; +use hyper::{self, header}; use server_utils::{cors, hosts}; -pub use server_utils::cors::{AllowOrigin, AllowHeaders, AccessControlAllowHeaders}; /// Extracts string value of a single header in request. -fn read_header<'a>(req: &'a server::Request, header: &str) -> Option<&'a str> { - match req.headers().get_raw(header) { - Some(ref v) if v.len() == 1 => { - ::std::str::from_utf8(&v[0]).ok() - }, - _ => None - } +fn read_header<'a>(req: &'a hyper::Request, header_name: &str) -> Option<&'a str> { + req.headers().get(header_name).and_then(|v| v.to_str().ok()) } /// Returns `true` if Host header in request matches a list of allowed hosts. pub fn is_host_allowed( - request: &server::Request, + request: &hyper::Request, allowed_hosts: &Option>, ) -> bool { hosts::is_host_valid(read_header(request, "host"), allowed_hosts) } -/// Returns a CORS header that should be returned with that request. +/// Returns a CORS AllowOrigin header that should be returned with that request. pub fn cors_allow_origin( - request: &server::Request, + request: &hyper::Request, cors_domains: &Option> -) -> AllowOrigin { +) -> cors::AllowCors { cors::get_cors_allow_origin(read_header(request, "origin"), read_header(request, "host"), cors_domains).map(|origin| { use self::cors::AccessControlAllowOrigin::*; match origin { - Value(val) => header::AccessControlAllowOrigin::Value((*val).to_owned()), - Null => header::AccessControlAllowOrigin::Null, - Any => header::AccessControlAllowOrigin::Any, + Value(ref val) => header::HeaderValue::from_str(val).unwrap_or(header::HeaderValue::from_static("null")), + Null => header::HeaderValue::from_static("null"), + Any => header::HeaderValue::from_static("*"), } }) } -/// Returns the CORS header that should be returned with that request. +/// Returns the CORS AllowHeaders header that should be returned with that request. pub fn cors_allow_headers( - request: &server::Request, - cors_allow_headers: &cors::AccessControlAllowHeadersUnicase -) -> AllowHeaders { - cors::get_cors_allow_headers(request.headers(), cors_allow_headers.into()) + request: &hyper::Request, + cors_allow_headers: &cors::AccessControlAllowHeaders +) -> cors::AllowCors> { + let headers = request.headers().keys() + .map(|name| name.as_str()); + let requested_headers = request.headers() + .get_all("access-control-request-headers") + .iter() + .filter_map(|val| val.to_str().ok()) + .flat_map(|val| val.split(", ")) + .flat_map(|val| val.split(",")); + + cors::get_cors_allow_headers( + headers, + requested_headers, + cors_allow_headers.into(), + |name| header::HeaderValue::from_str(name) + .unwrap_or_else(|_| header::HeaderValue::from_static("unknown")) + ) } diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index ce9b38cc0..36a35226a 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -13,7 +13,7 @@ log = "0.4" tokio-service = "0.1" solana-jsonrpc-core = { version = "0.1", path = "../core" } solana-jsonrpc-server-utils = { version = "0.1", path = "../server-utils" } -parity-tokio-ipc = { git = "https://github.com/nikvolf/parity-tokio-ipc", branch = "stable" } +parity-tokio-ipc = { git = "https://github.com/nikvolf/parity-tokio-ipc", rev = "306ea3e" } parking_lot = "0.6" [dev-dependencies] diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs index 789f801dd..02e79a58c 100644 --- a/ipc/src/lib.rs +++ b/ipc/src/lib.rs @@ -24,5 +24,5 @@ use jsonrpc_core as jsonrpc; pub use meta::{MetaExtractor, NoopExtractor, RequestContext}; pub use server::{Server, ServerBuilder, CloseHandle,SecurityAttributes}; -pub use self::server_utils::tokio_core; +pub use self::server_utils::tokio; pub use self::server_utils::session::{SessionStats, SessionId}; diff --git a/ipc/src/server.rs b/ipc/src/server.rs index c36398e2b..45bdfe640 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -7,11 +7,11 @@ use tokio_service::{self, Service as TokioService}; use jsonrpc::futures::{future, Future, Stream, Sink}; use jsonrpc::futures::sync::{mpsc, oneshot}; use jsonrpc::{FutureResult, Metadata, MetaIoHandler, Middleware, NoopMiddleware}; - -use server_utils::tokio_core::reactor::Remote; -use server_utils::tokio_io::AsyncRead; -use server_utils::{reactor, session, codecs}; - +use server_utils::{ + tokio_codec::Framed, + tokio::{self, runtime::TaskExecutor, reactor::Handle}, + reactor, session, codecs, +}; use parking_lot::Mutex; use meta::{MetaExtractor, NoopExtractor, RequestContext}; @@ -51,7 +51,7 @@ pub struct ServerBuilder = NoopMiddleware> { handler: Arc>, meta_extractor: Arc>, session_stats: Option>, - remote: reactor::UninitializedRemote, + executor: reactor::UninitializedExecutor, incoming_separator: codecs::Separator, outgoing_separator: codecs::Separator, security_attributes: SecurityAttributes, @@ -76,16 +76,16 @@ impl> ServerBuilder { handler: Arc::new(io_handler.into()), meta_extractor: Arc::new(extractor), session_stats: None, - remote: reactor::UninitializedRemote::Unspawned, + executor: reactor::UninitializedExecutor::Unspawned, incoming_separator: codecs::Separator::Empty, outgoing_separator: codecs::Separator::default(), security_attributes: SecurityAttributes::empty(), } } - /// Sets shared different event loop remote. - pub fn event_loop_remote(mut self, remote: Remote) -> Self { - self.remote = reactor::UninitializedRemote::Shared(remote); + /// Sets shared different event loop executor. + pub fn event_loop_executor(mut self, executor: TaskExecutor) -> Self { + self.executor = reactor::UninitializedExecutor::Shared(executor); self } @@ -118,7 +118,7 @@ impl> ServerBuilder { /// Creates a new server from the given endpoint. pub fn start(self, path: &str) -> std::io::Result { - let remote = self.remote.initialize()?; + let executor = self.executor.initialize()?; let rpc_handler = self.handler; let endpoint_addr = path.to_owned(); let meta_extractor = self.meta_extractor; @@ -130,8 +130,7 @@ impl> ServerBuilder { let (wait_signal, wait_receiver) = oneshot::channel(); let security_attributes = self.security_attributes; - remote.remote().spawn(move |handle| { - + executor.spawn(future::lazy(move || { let mut endpoint = Endpoint::new(endpoint_addr); endpoint.set_security_attributes(security_attributes); @@ -142,8 +141,8 @@ impl> ServerBuilder { } } - let endpoint_handle = handle.clone(); - let connections = match endpoint.incoming(endpoint_handle) { + let endpoint_handle = Handle::current(); + let connections = match endpoint.incoming(&endpoint_handle) { Ok(connections) => connections, Err(e) => { start_signal.send(Err(e)).expect("Cannot fail since receiver never dropped before receiving"); @@ -151,7 +150,6 @@ impl> ServerBuilder { } }; - let remote = handle.remote().clone(); let mut id = 0u64; let server = connections.for_each(move |(io_stream, remote_id)| { @@ -168,11 +166,12 @@ impl> ServerBuilder { sender, }); let service = Service::new(rpc_handler.clone(), meta); - let (writer, reader) = io_stream.framed( + let (writer, reader) = Framed::new( + io_stream, codecs::StreamCodec::new( incoming_separator.clone(), outgoing_separator.clone(), - ) + ), ).split(); let responses = reader.and_then(move |req| { service.call(req).then(move |response| match response { @@ -203,7 +202,7 @@ impl> ServerBuilder { Ok(()) }); - remote.spawn(|_| writer); + tokio::spawn(writer); Ok(()) }); @@ -218,18 +217,18 @@ impl> ServerBuilder { }) .map_err(|_| ()) ) - }); + })); let handle = InnerHandles { - remote: Some(remote), - stop: Some(stop_signal), - path: path.to_owned(), + executor: Some(executor), + stop: Some(stop_signal), + path: path.to_owned(), }; match start_receiver.wait().expect("Message should always be sent") { Ok(()) => Ok(Server { - handles: Arc::new(Mutex::new(handle)), - wait_handle: Some(wait_receiver), + handles: Arc::new(Mutex::new(handle)), + wait_handle: Some(wait_receiver), }), Err(e) => Err(e) } @@ -267,7 +266,7 @@ impl Server { #[derive(Debug)] struct InnerHandles { - remote: Option, + executor: Option, stop: Option>, path: String, } @@ -275,7 +274,7 @@ struct InnerHandles { impl InnerHandles { pub fn close(&mut self) { let _ = self.stop.take().map(|stop| stop.send(())); - self.remote.take().map(|remote| remote.close()); + self.executor.take().map(|executor| executor.close()); let _ = ::std::fs::remove_file(&self.path); // ignore error, file could have been gone somewhere } } @@ -306,15 +305,18 @@ mod tests { use std::thread; use std::sync::Arc; use std::time; + use std::time::{Instant, Duration}; use super::{ServerBuilder, Server}; use jsonrpc::{MetaIoHandler, Value}; use jsonrpc::futures::{Future, future, Stream, Sink}; use jsonrpc::futures::sync::{mpsc, oneshot}; use self::tokio_uds::UnixStream; use parking_lot::Mutex; - use server_utils::tokio_io::AsyncRead; + use server_utils::{ + tokio_codec::Decoder, + tokio::{self, timer::Delay} + }; use server_utils::codecs; - use tokio_core::reactor::{Timeout, Core}; use meta::{MetaExtractor, RequestContext, NoopExtractor}; use super::SecurityAttributes; @@ -335,7 +337,8 @@ mod tests { fn dummy_request_str(path: &str, data: &str) -> String { let stream_future = UnixStream::connect(path); let reply = stream_future.and_then(|stream| { - let stream= stream.framed(codecs::StreamCodec::stream_incoming()); + let stream = codecs::StreamCodec::stream_incoming() + .framed(stream); let reply = stream .send(data.to_owned()) .and_then(move |stream| { @@ -574,19 +577,26 @@ mod tests { tx.send(true).expect("failed to report that the server has stopped"); }); - let mut core = Core::new().expect("failed to create a core"); - let timeout = Timeout::new(time::Duration::from_millis(500), &core.handle()) - .expect("failed to setup a timer") + let delay = Delay::new(Instant::now() + Duration::from_millis(500)) .map(|_| false) - .map_err(|_| ()); - - let result_fut = rx.map_err(|_| ()) - .select(timeout) - .map(|(result, _next_fut)| result) - .map_err(|_| ()); - let result = core.run(result_fut); - assert_eq!(result, Ok(true), "Wait timeout exceeded"); - assert!(UnixStream::connect(path).wait().is_err(), "Connection to the closed socket should fail"); + .map_err(|err| panic!("{:?}", err)); + + let result_fut = rx + .map_err(|_| ()) + .select(delay) + .then(move |result| { + match result { + Ok((result, _)) => { + assert_eq!(result, true, "Wait timeout exceeded"); + assert!(UnixStream::connect(path).wait().is_err(), + "Connection to the closed socket should fail"); + Ok(()) + }, + Err(_) => Err(()), + } + }); + + tokio::run(result_fut); } #[test] diff --git a/minihttp/src/lib.rs b/minihttp/src/lib.rs index b035bfa08..ab034d187 100644 --- a/minihttp/src/lib.rs +++ b/minihttp/src/lib.rs @@ -263,7 +263,7 @@ impl> tokio_service::Service for let cors = cors::get_cors_allow_origin(origin, host, &self.cors_domains); // Validate cors header - if let cors::AllowOrigin::Invalid = cors { + if let cors::AllowCors::Invalid = cors { return Either::A(future::ok( res::invalid_allow_origin() )); diff --git a/server-utils/Cargo.toml b/server-utils/Cargo.toml index c883bcaf5..e00750731 100644 --- a/server-utils/Cargo.toml +++ b/server-utils/Cargo.toml @@ -10,20 +10,23 @@ homepage = "https://github.com/solana-labs/jsonrpc" repository = "https://github.com/solana-labs/jsonrpc" [dependencies] +bytes = "0.4" globset = "0.4" <<<<<<< HEAD +<<<<<<< HEAD ======= solana-jsonrpc-core = { version = "0.1", path = "../core" } lazy_static = "1.1.0" >>>>>>> 2189750... fixup! Allow specifying access rights for the IPC endpoint in jsonrpc-ipc-server. (#300) +======= +solana-jsonrpc-core = { version = "9.0", path = "../core" } +lazy_static = "1.1.0" +>>>>>>> 4d29744... Update hyper to version 0.12. (#303) log = "0.4" -solana-jsonrpc-core = { version = "0.1", path = "../core" } -tokio-core = { version = "0.1" } -tokio-io = { version = "0.1" } -bytes = "0.4" -hyper = "0.11" +num_cpus = "1.8" +tokio = { version = "0.1" } +tokio-codec = { version = "0.1" } unicase = "2.0" -lazy_static = "1.1.0" [badges] travis-ci = { repository = "solana-labs/jsonrpc", branch = "solana-0.1"} diff --git a/server-utils/src/cors.rs b/server-utils/src/cors.rs index 405324417..78b8157d6 100644 --- a/server-utils/src/cors.rs +++ b/server-utils/src/cors.rs @@ -1,14 +1,10 @@ //! CORS handling utility functions -extern crate hyper; extern crate unicase; use std::{fmt, ops}; use hosts::{Host, Port}; use matcher::{Matcher, Pattern}; use std::collections::HashSet; -pub use cors::hyper::header; -pub use cors::hyper::header::AccessControlRequestHeaders; -pub use cors::hyper::Headers; pub use self::unicase::Ascii; /// Origin Protocol @@ -138,86 +134,32 @@ impl> From for AccessControlAllowOrigin { } } -/// CORS Allow-Origin Header Result. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum AllowOrigin { - /// CORS header was not required. Origin is not present in the request. - NotRequired, - /// CORS header is not returned, Origin is not allowed to access the resource. - Invalid, - /// CORS header to include in the response. Origin is allowed to access the resource. - Ok(T), -} - /// Headers allowed to access #[derive(Debug, Clone, PartialEq)] -pub enum AccessControlAllowHeadersUnicase>> { +pub enum AccessControlAllowHeaders { /// Specific headers - Only(T), - /// Any non-null origin + Only(Vec), + /// Any header Any, } -/// CORS Allow-Headers Result. +/// CORS response headers #[derive(Debug, Clone, PartialEq, Eq)] -pub enum AllowHeaders { - /// CORS header was not required. Request-Headers is not present in the request. +pub enum AllowCors { + /// CORS header was not required. Origin is not present in the request. NotRequired, - /// CORS header is not returned, Request-Headers are not allowed to access the resource. + /// CORS header is not returned, Origin is not allowed to access the resource. Invalid, - /// CORS header to include in the response. Origin Request-Headers are allowed to access the resource. + /// CORS header to include in the response. Origin is allowed to access the resource. Ok(T), } -impl Into>> for AccessControlAllowHeadersUnicase { - fn into(self) -> HashSet> { - use self::AccessControlAllowHeadersUnicase::Any; - use self::AccessControlAllowHeadersUnicase::Only; - - match self { - Any => { - let mut hs = HashSet::new(); - hs.insert(Ascii::new("*")); - hs - }, - Only(h) => h, - } - } -} - -/// Headers allowed to access -#[derive(Debug, Clone, PartialEq)] -pub enum AccessControlAllowHeaders> { - /// Specific headers - Only(T), - /// Any non-null origin - Any, -} - -impl From for AccessControlAllowHeadersUnicase { - fn from(allow_headers: AccessControlAllowHeaders) -> AccessControlAllowHeadersUnicase { - match allow_headers { - AccessControlAllowHeaders::Any => AccessControlAllowHeadersUnicase::Any, - AccessControlAllowHeaders::Only(only) => { - let only = only.into_iter().map(|name| Ascii::new(name)).collect(); - AccessControlAllowHeadersUnicase::Only(only) - }, - } - } -} - -impl Into for HashSet> { - fn into(self) -> AccessControlAllowHeadersUnicase { - AccessControlAllowHeadersUnicase::Only(self) - } -} - -impl AllowOrigin { - /// Maps `Ok` variant of `AllowOrigin`. - pub fn map(self, f: F) -> AllowOrigin where +impl AllowCors { + /// Maps `Ok` variant of `AllowCors`. + pub fn map(self, f: F) -> AllowCors where F: FnOnce(T) -> O, { - use self::AllowOrigin::*; + use self::AllowCors::*; match self { NotRequired => NotRequired, @@ -227,9 +169,9 @@ impl AllowOrigin { } } -impl Into> for AllowOrigin { +impl Into> for AllowCors { fn into(self) -> Option { - use self::AllowOrigin::*; + use self::AllowCors::*; match self { NotRequired | Invalid => None, @@ -239,9 +181,13 @@ impl Into> for AllowOrigin { } /// Returns correct CORS header (if any) given list of allowed origins and current origin. -pub fn get_cors_allow_origin(origin: Option<&str>, host: Option<&str>, allowed: &Option>) -> AllowOrigin { +pub fn get_cors_allow_origin( + origin: Option<&str>, + host: Option<&str>, + allowed: &Option> +) -> AllowCors { match origin { - None => AllowOrigin::NotRequired, + None => AllowCors::NotRequired, Some(ref origin) => { if let Some(host) = host { // Request initiated from the same server. @@ -249,29 +195,32 @@ pub fn get_cors_allow_origin(origin: Option<&str>, host: Option<&str>, allowed: // Additional check let origin = Origin::parse(origin); if &*origin.host == host { - return AllowOrigin::NotRequired; + return AllowCors::NotRequired; } } } match allowed.as_ref() { - None if *origin == "null" => AllowOrigin::Ok(AccessControlAllowOrigin::Null), - None => AllowOrigin::Ok(AccessControlAllowOrigin::Value(Origin::parse(origin))), + None if *origin == "null" => AllowCors::Ok(AccessControlAllowOrigin::Null), + None => AllowCors::Ok(AccessControlAllowOrigin::Value(Origin::parse(origin))), Some(ref allowed) if *origin == "null" => { allowed.iter().find(|cors| **cors == AccessControlAllowOrigin::Null).cloned() - .map(AllowOrigin::Ok) - .unwrap_or(AllowOrigin::Invalid) + .map(AllowCors::Ok) + .unwrap_or(AllowCors::Invalid) }, Some(ref allowed) => { allowed.iter().find(|cors| { match **cors { AccessControlAllowOrigin::Any => true, - AccessControlAllowOrigin::Value(ref val) if val.matches(origin) => true, + AccessControlAllowOrigin::Value(ref val) if val.matches(origin) => + { + true + }, _ => false } }) .map(|_| AccessControlAllowOrigin::Value(Origin::parse(origin))) - .map(AllowOrigin::Ok).unwrap_or(AllowOrigin::Invalid) + .map(AllowCors::Ok).unwrap_or(AllowCors::Invalid) }, } }, @@ -279,65 +228,54 @@ pub fn get_cors_allow_origin(origin: Option<&str>, host: Option<&str>, allowed: } /// Validates if the `AccessControlAllowedHeaders` in the request are allowed. -pub fn get_cors_allow_headers(request_headers: &Headers, cors_allow_headers: &AccessControlAllowHeadersUnicase) -> AllowHeaders { - // Check if the header fields which were sent in the request are required - if let AccessControlAllowHeadersUnicase::Only(only) = cors_allow_headers { - let are_all_allowed = request_headers.iter() +pub fn get_cors_allow_headers, O, F: Fn(T) -> O>( + mut headers: impl Iterator, + requested_headers: impl Iterator, + cors_allow_headers: &AccessControlAllowHeaders, + to_result: F +) -> AllowCors> { + // Check if the header fields which were sent in the request are allowed + if let AccessControlAllowHeaders::Only(only) = cors_allow_headers { + let are_all_allowed = headers .all(|header| { - let name = &Ascii::new(header.name()); - only.contains(name) || ALWAYS_ALLOWED_HEADERS.contains(name) + let name = &Ascii::new(header.as_ref()); + only.iter().any(|h| &Ascii::new(&*h) == name) || ALWAYS_ALLOWED_HEADERS.contains(name) }); if !are_all_allowed { - return AllowHeaders::Invalid; + return AllowCors::Invalid; } } // Check if `AccessControlRequestHeaders` contains fields which were allowed - match request_headers.get::() { - None => { - // No fields were requested, no comparison necessary - match cors_allow_headers { - AccessControlAllowHeadersUnicase::Any => AllowHeaders::NotRequired, - - // The fields which can be used are constrainted, but since it - // was asked for none we don't return any. - AccessControlAllowHeadersUnicase::Only(_) => { - let empty = header::AccessControlAllowHeaders(vec![]); - AllowHeaders::Ok(empty) - }, - } + let (filtered, headers) = match cors_allow_headers { + AccessControlAllowHeaders::Any => { + let headers = requested_headers.map(to_result).collect(); + (false, headers) }, - Some(requested) => { - // "requested" contains the fields for which it is inquired to know - // if they can be used. - - let echo = AllowHeaders::Ok( - header::AccessControlAllowHeaders(requested.to_vec()) - ); - - match cors_allow_headers { - AccessControlAllowHeadersUnicase::Any => { - // Any field is allowed. Our response are the fields about which - // the request inquired. - echo - }, - - AccessControlAllowHeadersUnicase::Only(only) => { - let are_all_allowed = requested.iter() - .all(|header| { - let name = &Ascii::new(header.as_ref()); - only.contains(name) || ALWAYS_ALLOWED_HEADERS.contains(name) - }); - - if !are_all_allowed { - return AllowHeaders::Invalid; - } + AccessControlAllowHeaders::Only(only) => { + let mut filtered = false; + let headers: Vec<_> = requested_headers + .filter(|header| { + let name = &Ascii::new(header.as_ref()); + filtered = true; + only.iter().any(|h| &Ascii::new(&*h) == name) || ALWAYS_ALLOWED_HEADERS.contains(name) + }) + .map(to_result) + .collect(); + + (filtered, headers) + }, + }; - echo - } - } + if headers.is_empty() { + if filtered { + AllowCors::Invalid + } else { + AllowCors::NotRequired } + } else { + AllowCors::Ok(headers) } } @@ -362,10 +300,10 @@ lazy_static! { #[cfg(test)] mod tests { + use std::iter; + + use super::*; use hosts::Host; - use super::{get_cors_allow_origin, AllowOrigin, AccessControlAllowOrigin, Origin, OriginProtocol}; - use super::{get_cors_allow_headers, AccessControlAllowHeaders, AccessControlRequestHeaders}; - use super::{Headers, Ascii, AllowHeaders, header}; #[test] fn should_parse_origin() { @@ -394,8 +332,8 @@ mod tests { let res2 = get_cors_allow_origin(origin2, host, &Some(vec![])); // then - assert_eq!(res1, AllowOrigin::Invalid); - assert_eq!(res2, AllowOrigin::Invalid); + assert_eq!(res1, AllowCors::Invalid); + assert_eq!(res2, AllowCors::Invalid); } #[test] @@ -411,7 +349,7 @@ mod tests { let res = get_cors_allow_origin(origin, host, &None); // then - assert_eq!(res, AllowOrigin::NotRequired); + assert_eq!(res, AllowCors::NotRequired); } #[test] @@ -424,7 +362,7 @@ mod tests { let res = get_cors_allow_origin(origin, host, &None); // then - assert_eq!(res, AllowOrigin::NotRequired); + assert_eq!(res, AllowCors::NotRequired); } #[test] @@ -437,7 +375,7 @@ mod tests { let res = get_cors_allow_origin(origin, host, &None); // then - assert_eq!(res, AllowOrigin::Ok("parity.io".into())); + assert_eq!(res, AllowCors::Ok("parity.io".into())); } #[test] @@ -454,7 +392,7 @@ mod tests { ); // then - assert_eq!(res, AllowOrigin::NotRequired); + assert_eq!(res, AllowCors::NotRequired); } #[test] @@ -467,7 +405,7 @@ mod tests { let res = get_cors_allow_origin(origin, host, &Some(Vec::new())); // then - assert_eq!(res, AllowOrigin::NotRequired); + assert_eq!(res, AllowCors::NotRequired); } #[test] @@ -484,7 +422,7 @@ mod tests { ); // then - assert_eq!(res, AllowOrigin::Invalid); + assert_eq!(res, AllowCors::Invalid); } #[test] @@ -497,7 +435,7 @@ mod tests { let res = get_cors_allow_origin(origin, host, &Some(vec![AccessControlAllowOrigin::Any])); // then - assert_eq!(res, AllowOrigin::Ok(AccessControlAllowOrigin::Value("http://parity.io".into()))); + assert_eq!(res, AllowCors::Ok(AccessControlAllowOrigin::Value("http://parity.io".into()))); } #[test] @@ -514,7 +452,7 @@ mod tests { ); // then - assert_eq!(res, AllowOrigin::NotRequired); + assert_eq!(res, AllowCors::NotRequired); } #[test] @@ -531,7 +469,7 @@ mod tests { ); // then - assert_eq!(res, AllowOrigin::Ok(AccessControlAllowOrigin::Null)); + assert_eq!(res, AllowCors::Ok(AccessControlAllowOrigin::Null)); } #[test] @@ -548,7 +486,7 @@ mod tests { ); // then - assert_eq!(res, AllowOrigin::Ok(AccessControlAllowOrigin::Value("http://parity.io".into()))); + assert_eq!(res, AllowCors::Ok(AccessControlAllowOrigin::Value("http://parity.io".into()))); } #[test] @@ -569,83 +507,74 @@ mod tests { let res3 = get_cors_allow_origin(origin3, host, &allowed); // then - assert_eq!(res1, AllowOrigin::Ok(AccessControlAllowOrigin::Value("http://parity.io".into()))); - assert_eq!(res2, AllowOrigin::Invalid); - assert_eq!(res3, AllowOrigin::Ok(AccessControlAllowOrigin::Value("chrome-extension://test".into()))); + assert_eq!(res1, AllowCors::Ok(AccessControlAllowOrigin::Value("http://parity.io".into()))); + assert_eq!(res2, AllowCors::Invalid); + assert_eq!(res3, AllowCors::Ok(AccessControlAllowOrigin::Value("chrome-extension://test".into()))); } #[test] fn should_return_invalid_if_header_not_allowed() { // given - let cors_allow_headers = AccessControlAllowHeaders::Only( - vec![ - "x-allowed", - ]); - let mut request_headers = Headers::new(); - request_headers.set::( - AccessControlRequestHeaders(vec![ - Ascii::new("x-not-allowed".to_owned()), - ]) - ); + let cors_allow_headers = AccessControlAllowHeaders::Only(vec![ + "x-allowed".to_owned(), + ]); + let headers = vec!["Access-Control-Request-Headers"]; + let requested = vec!["x-not-allowed"]; // when - let res = get_cors_allow_headers(&request_headers, &cors_allow_headers.into()); + let res = get_cors_allow_headers(headers.iter(), requested.iter(), &cors_allow_headers.into(), |x| x); // then - assert_eq!(res, AllowHeaders::Invalid); + assert_eq!(res, AllowCors::Invalid); } #[test] fn should_return_valid_if_header_allowed() { // given let allowed = vec![ - "x-allowed", + "x-allowed".to_owned(), ]; let cors_allow_headers = AccessControlAllowHeaders::Only(allowed.clone()); - let mut request_headers = Headers::new(); - request_headers.set::( - AccessControlRequestHeaders(vec![ - Ascii::new("x-allowed".to_owned()), - ]) - ); + let headers = vec!["Access-Control-Request-Headers"]; + let requested = vec!["x-allowed"]; // when - let res = get_cors_allow_headers(&request_headers, &cors_allow_headers.into()); + let res = get_cors_allow_headers(headers.iter(), requested.iter(), &cors_allow_headers.into(), |x| (*x).to_owned()); // then let allowed = vec![ - Ascii::new("x-allowed".to_owned()), + "x-allowed".to_owned(), ]; - assert_eq!(res, AllowHeaders::Ok(header::AccessControlAllowHeaders(allowed))); + assert_eq!(res, AllowCors::Ok(allowed)); } #[test] fn should_return_no_allowed_headers_if_none_in_request() { // given let allowed = vec![ - "x-allowed", + "x-allowed".to_owned(), ]; let cors_allow_headers = AccessControlAllowHeaders::Only(allowed.clone()); - let request_headers = Headers::new(); + let headers: Vec = vec![]; // when - let res = get_cors_allow_headers(&request_headers, &cors_allow_headers.into()); + let res = get_cors_allow_headers(headers.iter(), iter::empty(), &cors_allow_headers, |x| x); // then - assert_eq!(res, AllowHeaders::Ok(header::AccessControlAllowHeaders(vec![]))); + assert_eq!(res, AllowCors::NotRequired); } #[test] fn should_return_not_required_if_any_header_allowed() { // given let cors_allow_headers = AccessControlAllowHeaders::Any; - let request_headers = Headers::new(); + let headers: Vec = vec![]; // when - let res = get_cors_allow_headers(&request_headers, &cors_allow_headers.into()); + let res = get_cors_allow_headers(headers.iter(), iter::empty(), &cors_allow_headers.into(), |x| x); // then - assert_eq!(res, AllowHeaders::NotRequired); + assert_eq!(res, AllowCors::NotRequired); } } diff --git a/server-utils/src/lib.rs b/server-utils/src/lib.rs index 2f790a739..beb8104b4 100644 --- a/server-utils/src/lib.rs +++ b/server-utils/src/lib.rs @@ -11,9 +11,10 @@ extern crate lazy_static; extern crate globset; extern crate solana_jsonrpc_core as core; extern crate bytes; +extern crate num_cpus; -pub extern crate tokio_core; -pub extern crate tokio_io; +pub extern crate tokio; +pub extern crate tokio_codec; pub mod cors; pub mod hosts; diff --git a/server-utils/src/reactor.rs b/server-utils/src/reactor.rs index 092c57e1d..63083d734 100644 --- a/server-utils/src/reactor.rs +++ b/server-utils/src/reactor.rs @@ -1,68 +1,77 @@ -//! Event Loop Remote +//! Event Loop Executor //! Either spawns a new event loop, or re-uses provided one. use std::{io, thread}; use std::sync::mpsc; -use tokio_core; +use tokio; +use num_cpus; use core::futures::{self, Future}; -/// Possibly uninitialized event loop remote. +/// Possibly uninitialized event loop executor. #[derive(Debug)] -pub enum UninitializedRemote { - /// Shared instance of remote. - Shared(tokio_core::reactor::Remote), +pub enum UninitializedExecutor { + /// Shared instance of executor. + Shared(tokio::runtime::TaskExecutor), /// Event Loop should be spawned by the transport. Unspawned, } -impl UninitializedRemote { - /// Initializes remote. - /// In case there is no shared remote, will spawn a new event loop. - /// Dropping `Remote` closes the loop. - pub fn initialize(self) -> io::Result { +impl UninitializedExecutor { + /// Initializes executor. + /// In case there is no shared executor, will spawn a new event loop. + /// Dropping `Executor` closes the loop. + pub fn initialize(self) -> io::Result { self.init_with_name("event.loop") } - /// Initializes remote. - /// In case there is no shared remote, will spawn a new event loop. - /// Dropping `Remote` closes the loop. - pub fn init_with_name>(self, name: T) -> io::Result { + /// Initializes executor. + /// In case there is no shared executor, will spawn a new event loop. + /// Dropping `Executor` closes the loop. + pub fn init_with_name>(self, name: T) -> io::Result { match self { - UninitializedRemote::Shared(remote) => Ok(Remote::Shared(remote)), - UninitializedRemote::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Remote::Spawned), + UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)), + UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned), } } } -/// Initialized Remote +/// Initialized Executor #[derive(Debug)] -pub enum Remote { +pub enum Executor { /// Shared instance - Shared(tokio_core::reactor::Remote), + Shared(tokio::runtime::TaskExecutor), /// Spawned Event Loop Spawned(RpcEventLoop), } -impl Remote { - /// Get remote associated with this event loop. - pub fn remote(&self) -> tokio_core::reactor::Remote { +impl Executor { + /// Get tokio executor associated with this event loop. + pub fn executor(&self) -> tokio::runtime::TaskExecutor { match *self { - Remote::Shared(ref remote) => remote.clone(), - Remote::Spawned(ref eloop) => eloop.remote(), + Executor::Shared(ref executor) => executor.clone(), + Executor::Spawned(ref eloop) => eloop.executor(), } } + /// Spawn a future onto the Tokio runtime. + pub fn spawn(&self, future: F) + where + F: Future + Send + 'static, + { + self.executor().spawn(future) + } + /// Closes underlying event loop (if any!). pub fn close(self) { - if let Remote::Spawned(eloop) = self { + if let Executor::Spawned(eloop) = self { eloop.close() } } /// Wait for underlying event loop to finish (if any!). pub fn wait(self) { - if let Remote::Spawned(eloop) = self { + if let Executor::Spawned(eloop) = self { let _ = eloop.wait(); } } @@ -71,7 +80,7 @@ impl Remote { /// A handle to running event loop. Dropping the handle will cause event loop to finish. #[derive(Debug)] pub struct RpcEventLoop { - remote: tokio_core::reactor::Remote, + executor: tokio::runtime::TaskExecutor, close: Option>, handle: Option>, } @@ -96,30 +105,51 @@ impl RpcEventLoop { if let Some(name) = name { tb = tb.name(name); } + let handle = tb.spawn(move || { - let el = tokio_core::reactor::Core::new(); - match el { - Ok(mut el) => { - tx.send(Ok(el.remote())).expect("Rx is blocking upper thread."); - let _ = el.run(futures::empty().select(stopped)); + let mut tp_builder = tokio::executor::thread_pool::Builder::new(); + + let pool_size = match num_cpus::get_physical() { + 1 => 1, + 2...4 => 2, + _ => 3, + }; + + tp_builder + .pool_size(pool_size) + .name_prefix("jsonrpc-eventloop-"); + + let runtime = tokio::runtime::Builder::new() + .threadpool_builder(tp_builder) + .build(); + + match runtime { + Ok(mut runtime) => { + tx.send(Ok(runtime.executor())).expect("Rx is blocking upper thread."); + let terminate = futures::empty().select(stopped) + .map(|_| ()) + .map_err(|_| ()); + runtime.spawn(terminate); + runtime.shutdown_on_idle().wait().unwrap(); }, Err(err) => { tx.send(Err(err)).expect("Rx is blocking upper thread."); } } }).expect("Couldn't spawn a thread."); - let remote = rx.recv().expect("tx is transfered to a newly spawned thread."); - remote.map(|remote| RpcEventLoop { - remote: remote, + let exec = rx.recv().expect("tx is transfered to a newly spawned thread."); + + exec.map(|executor| RpcEventLoop { + executor, close: Some(stop), handle: Some(handle), }) } - /// Get remote for this event loop. - pub fn remote(&self) -> tokio_core::reactor::Remote { - self.remote.clone() + /// Get executor for this event loop. + pub fn executor(&self) -> tokio::runtime::TaskExecutor { + self.executor.clone() } /// Blocks current thread and waits until the event loop is finished. diff --git a/server-utils/src/stream_codec.rs b/server-utils/src/stream_codec.rs index 53f73263c..9a20a7311 100644 --- a/server-utils/src/stream_codec.rs +++ b/server-utils/src/stream_codec.rs @@ -1,5 +1,5 @@ use std::{io, str}; -use tokio_io::codec::{Decoder, Encoder}; +use tokio_codec::{Decoder, Encoder}; use bytes::BytesMut; /// Separator for enveloping messages in streaming codecs @@ -64,7 +64,7 @@ impl Decoder for StreamCodec { } } else { Ok(None) - } + } } else { let mut depth = 0; let mut in_str = false; @@ -112,7 +112,7 @@ impl Decoder for StreamCodec { impl Encoder for StreamCodec { type Item = String; type Error = io::Error; - + fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> { let mut payload = msg.into_bytes(); if let Separator::Byte(separator) = self.outgoing_separator { @@ -127,7 +127,7 @@ impl Encoder for StreamCodec { mod tests { use super::StreamCodec; - use tokio_io::codec::Decoder; + use tokio_codec::Decoder; use bytes::{BytesMut, BufMut}; #[test] @@ -269,5 +269,5 @@ mod tests { assert_eq!(request, "{ test: 1 }"); assert_eq!(request2, "{ test: 2 }"); - } + } } diff --git a/tcp/src/lib.rs b/tcp/src/lib.rs index 787de018f..47506eaac 100644 --- a/tcp/src/lib.rs +++ b/tcp/src/lib.rs @@ -46,4 +46,4 @@ use jsonrpc_core as jsonrpc; pub use dispatch::{Dispatcher, PushMessageError}; pub use meta::{MetaExtractor, RequestContext}; pub use server::{ServerBuilder, Server}; -pub use self::server_utils::tokio_core; +pub use self::server_utils::tokio; diff --git a/tcp/src/server.rs b/tcp/src/server.rs index a6292c04e..95c318ef0 100644 --- a/tcp/src/server.rs +++ b/tcp/src/server.rs @@ -7,8 +7,10 @@ use tokio_service::Service as TokioService; use jsonrpc::{MetaIoHandler, Metadata, Middleware, NoopMiddleware}; use jsonrpc::futures::{future, Future, Stream, Sink}; use jsonrpc::futures::sync::{mpsc, oneshot}; -use server_utils::{reactor, tokio_core, codecs}; -use server_utils::tokio_io::AsyncRead; +use server_utils::{ + tokio_codec::Framed, + tokio, reactor, codecs, +}; use dispatch::{Dispatcher, SenderChannels, PeerMessageQueue}; use meta::{MetaExtractor, RequestContext, NoopExtractor}; @@ -16,7 +18,7 @@ use service::Service; /// TCP server builder pub struct ServerBuilder = NoopMiddleware> { - remote: reactor::UninitializedRemote, + executor: reactor::UninitializedExecutor, handler: Arc>, meta_extractor: Arc>, channels: Arc, @@ -40,7 +42,7 @@ impl + 'static> ServerBuilder { E: MetaExtractor + 'static, { ServerBuilder { - remote: reactor::UninitializedRemote::Unspawned, + executor: reactor::UninitializedExecutor::Unspawned, handler: Arc::new(handler.into()), meta_extractor: Arc::new(extractor), channels: Default::default(), @@ -49,9 +51,9 @@ impl + 'static> ServerBuilder { } } - /// Utilize existing event loop remote. - pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self { - self.remote = reactor::UninitializedRemote::Shared(remote); + /// Utilize existing event loop executor. + pub fn event_loop_executor(mut self, handle: tokio::runtime::TaskExecutor) -> Self { + self.executor = reactor::UninitializedExecutor::Shared(handle); self } @@ -77,16 +79,17 @@ impl + 'static> ServerBuilder { let outgoing_separator = self.outgoing_separator; let address = addr.to_owned(); let (tx, rx) = std::sync::mpsc::channel(); - let (signal, stop) = oneshot::channel(); + let (stop_tx, stop_rx) = oneshot::channel(); - let remote = self.remote.initialize()?; + let executor = self.executor.initialize()?; - remote.remote().spawn(move |handle| { + executor.spawn(future::lazy(move || { let start = move || { - let listener = tokio_core::net::TcpListener::bind(&address, handle)?; + let listener = tokio::net::TcpListener::bind(&address)?; let connections = listener.incoming(); - let remote = handle.remote().clone(); - let server = connections.for_each(move |(socket, peer_addr)| { + + let server = connections.for_each(move |socket| { + let peer_addr = socket.peer_addr().expect("Unable to determine socket peer address"); trace!(target: "tcp", "Accepted incoming connection from {}", &peer_addr); let (sender, receiver) = mpsc::channel(65536); @@ -97,12 +100,13 @@ impl + 'static> ServerBuilder { let meta = meta_extractor.extract(&context); let service = Service::new(peer_addr, rpc_handler.clone(), meta); - let (writer, reader) = socket.framed( - codecs::StreamCodec::new( - incoming_separator.clone(), - outgoing_separator.clone(), - ) - ).split(); + let (writer, reader) = Framed::new( + socket, + codecs::StreamCodec::new( + incoming_separator.clone(), + outgoing_separator.clone(), + ), + ).split(); let responses = reader.and_then( move |req| service.call(req).then(|response| match response { @@ -140,7 +144,7 @@ impl + 'static> ServerBuilder { Ok(()) }); - remote.spawn(|_| writer); + tokio::spawn(writer); Ok(()) }); @@ -148,7 +152,7 @@ impl + 'static> ServerBuilder { Ok(server) }; - let stop = stop.map_err(|_| std::io::ErrorKind::Interrupted.into()); + let stop = stop_rx.map_err(|_| std::io::ErrorKind::Interrupted.into()); match start() { Ok(server) => { tx.send(Ok(())).expect("Rx is blocking parent thread."); @@ -166,13 +170,13 @@ impl + 'static> ServerBuilder { })) }, } - }); + })); let res = rx.recv().expect("Response is always sent before tx is dropped."); res.map(|_| Server { - remote: Some(remote), - stop: Some(signal), + executor: Some(executor), + stop: Some(stop_tx), }) } @@ -184,7 +188,7 @@ impl + 'static> ServerBuilder { /// TCP Server handle pub struct Server { - remote: Option, + executor: Option, stop: Option>, } @@ -192,18 +196,18 @@ impl Server { /// Closes the server (waits for finish) pub fn close(mut self) { let _ = self.stop.take().map(|sg| sg.send(())); - self.remote.take().unwrap().close(); + self.executor.take().unwrap().close(); } /// Wait for the server to finish pub fn wait(mut self) { - self.remote.take().unwrap().wait(); + self.executor.take().unwrap().wait(); } } impl Drop for Server { fn drop(&mut self) { let _ = self.stop.take().map(|sg| sg.send(())); - self.remote.take().map(|remote| remote.close()); + self.executor.take().map(|executor| executor.close()); } } diff --git a/tcp/src/tests.rs b/tcp/src/tests.rs index b5b5bf832..983372195 100644 --- a/tcp/src/tests.rs +++ b/tcp/src/tests.rs @@ -1,14 +1,17 @@ -use std::cell::RefCell; -use std::net::SocketAddr; +use std::net::{SocketAddr, Shutdown}; use std::str::FromStr; use std::sync::Arc; +use std::time::{Instant, Duration}; use jsonrpc::{MetaIoHandler, Value, Metadata}; -use jsonrpc::futures::{Future, future}; +use jsonrpc::futures::{self, Future, future}; -use server_utils::tokio_io::io; -use server_utils::tokio_core::net::TcpStream; -use server_utils::tokio_core::reactor::{Core, Timeout}; +use server_utils::tokio::{ + timer::Delay, + net::TcpStream, + io::{self}, + self, +}; use parking_lot::Mutex; @@ -46,11 +49,13 @@ fn doc_test_connect() { let server = casual_server(); let _server = server.start(&addr).expect("Server must run with no issues"); - let mut core = Core::new().expect("Tokio Core should be created with no errors"); - let stream = TcpStream::connect(&addr, &core.handle()); - let result = core.run(stream); + let stream = TcpStream::connect(&addr) + .and_then(move |_stream| { + Ok(()) + }) + .map_err(|err| panic!("Server connection error: {:?}", err)); - assert!(result.is_ok()); + tokio::run(stream); } #[test] @@ -61,39 +66,41 @@ fn disconnect() { let dispatcher = server.dispatcher(); let _server = server.start(&addr).expect("Server must run with no issues"); - { - let mut core = Core::new().expect("Tokio Core should be created with no errors"); - let stream = TcpStream::connect(&addr, &core.handle()) - .and_then(|stream| future::ok(stream)) - .and_then(|stream| future::result(stream.shutdown(::std::net::Shutdown::Both))); - core.run(stream).expect("tcp/ip session should finalize with no errors in disconnect test"); - } + let stream = TcpStream::connect(&addr) + .and_then(move |stream| { + assert_eq!(stream.peer_addr().unwrap(), addr); + stream.shutdown(::std::net::Shutdown::Both) + }) + .map_err(|err| panic!("Error disconnecting: {:?}", err)); + + tokio::run(stream); ::std::thread::sleep(::std::time::Duration::from_millis(50)); assert_eq!(0, dispatcher.peer_count()); } -fn dummy_request(addr: &SocketAddr, data: &[u8]) -> Vec { - let mut core = Core::new().expect("Tokio Core should be created with no errors"); - let mut buffer = vec![0u8; 1024]; +fn dummy_request(addr: &SocketAddr, data: Vec) -> Vec { + let (ret_tx, ret_rx) = futures::sync::oneshot::channel(); - let stream = TcpStream::connect(addr, &core.handle()) - .and_then(|stream| { + let stream = TcpStream::connect(addr) + .and_then(move |stream| { io::write_all(stream, data) }) - .and_then(|(stream, _)| { - io::read(stream, &mut buffer) - }) - .and_then(|(_, read_buf, len)| { - future::ok(read_buf[0..len].to_vec()) - }); - let result = core.run(stream).expect("Core should run with no errors"); + .and_then(|(stream, _data)| { + stream.shutdown(Shutdown::Write).unwrap(); + io::read_to_end(stream, vec![]).wait() + }) + .and_then(move |(_stream, read_buf)| { + ret_tx.send(read_buf).map_err(|err| panic!("Unable to send {:?}", err)) + }) + .map_err(|err| panic!("Error connecting or closing connection: {:?}", err));; - result + tokio::run(stream); + ret_rx.wait().expect("Unable to receive result") } -fn dummy_request_str(addr: &SocketAddr, data: &[u8]) -> String { +fn dummy_request_str(addr: &SocketAddr, data: Vec) -> String { String::from_utf8(dummy_request(addr, data)).expect("String should be utf-8") } @@ -101,12 +108,13 @@ fn dummy_request_str(addr: &SocketAddr, data: &[u8]) -> String { fn doc_test_handle() { ::logger::init_log(); let addr: SocketAddr = "127.0.0.1:17780".parse().unwrap(); + let server = casual_server(); let _server = server.start(&addr).expect("Server must run with no issues"); let result = dummy_request_str( &addr, - b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n", + b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"[..].to_owned(), ); assert_eq!( @@ -133,7 +141,7 @@ fn req_parallel() { for _ in 0..100 { let result = dummy_request_str( &addr, - b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n", + b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"[..].to_owned(), ); assert_eq!( @@ -201,7 +209,7 @@ fn peer_meta() { let result = dummy_request_str( &addr, - b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n" + b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"[..].to_owned() ); // contains random port, so just smoky comparing response length @@ -226,7 +234,6 @@ impl MetaExtractor for PeerListMetaExtractor { #[test] fn message() { - // MASSIVE SETUP ::logger::init_log(); let addr: SocketAddr = "127.0.0.1:17790".parse().unwrap(); @@ -242,67 +249,71 @@ fn message() { let _server = server.start(&addr).expect("Server must run with no issues"); - let mut core = Core::new().expect("Tokio Core should be created with no errors"); - let timeout = Timeout::new(::std::time::Duration::from_millis(100), &core.handle()) - .expect("There should be a timeout produced in message test"); - let mut buffer = vec![0u8; 1024]; - let mut buffer2 = vec![0u8; 1024]; - let executed_dispatch = RefCell::new(false); - let executed_request = RefCell::new(false); + let delay = Delay::new(Instant::now() + Duration::from_millis(500)) + .map_err(|err| panic!("{:?}", err)); + + let message = "ping"; + let executed_dispatch = Arc::new(Mutex::new(false)); + let executed_request = Arc::new(Mutex::new(false)); + let executed_dispatch_move = executed_dispatch.clone(); + let executed_request_move = executed_request.clone(); // CLIENT RUN - let stream = TcpStream::connect(&addr, &core.handle()) + let stream = TcpStream::connect(&addr) .and_then(|stream| { - future::ok(stream).join(timeout) + future::ok(stream).join(delay) }) - .and_then(|stream| { - let peer_addr = peer_list.lock()[0].clone(); - dispatcher.push_message( - &peer_addr, - "ping".to_owned(), - ).expect("Should be sent with no errors"); - trace!(target: "tcp", "Dispatched message for {}", peer_addr); - future::ok(stream) - }) - .and_then(|(stream, _)| { - io::read(stream, &mut buffer) - }) - .and_then(|(stream, read_buf, len)| { - trace!(target: "tcp", "Read ping message"); - let ping_signal = read_buf[0..len].to_vec(); - - assert_eq!( - "ping\n", - String::from_utf8(ping_signal).expect("String should be utf-8"), - "Sent request does not match received by the peer", - ); - // ensure tat the above assert was actually triggered - *executed_dispatch.borrow_mut() = true; - - future::ok(stream) - }) - .and_then(|stream| { - // make request AFTER message dispatches - let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"; - io::write_all(stream, &data[..]) - }) - .and_then(|(stream, _)| { - io::read(stream, &mut buffer2) - }) - .and_then(|(_, read_buf, len)| { - trace!(target: "tcp", "Read response message"); - let response_signal = read_buf[0..len].to_vec(); - assert_eq!( - "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}\n", - String::from_utf8(response_signal).expect("String should be utf-8"), - "Response does not match the expected handling", - ); - *executed_request.borrow_mut() = true; - - future::ok(()) - }); + .and_then(move |stream| { + let peer_addr = peer_list.lock()[0].clone(); + dispatcher.push_message( + &peer_addr, + message.to_owned(), + ).expect("Should be sent with no errors"); + trace!(target: "tcp", "Dispatched message for {}", peer_addr); + future::ok(stream) + }) + .and_then(move |(stream, _)| { + // Read message plus newline appended by codec. + io::read_exact(stream, vec![0u8; message.len() + 1]) + }) + .and_then(move |(stream, read_buf)| { + trace!(target: "tcp", "Read ping message"); + let ping_signal = read_buf[..].to_vec(); + + assert_eq!( + format!("{}\n", message), + String::from_utf8(ping_signal).expect("String should be utf-8"), + "Sent request does not match received by the peer", + ); + // ensure that the above assert was actually triggered + *executed_dispatch_move.lock() = true; + + future::ok(stream) + }) + .and_then(|stream| { + // make request AFTER message dispatches + let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"; + io::write_all(stream, &data[..]) + }) + .and_then(|(stream, _)| { + stream.shutdown(Shutdown::Write).unwrap(); + io::read_to_end(stream, Vec::new()) + }) + .and_then(move |(_, read_buf)| { + trace!(target: "tcp", "Read response message"); + let response_signal = read_buf[..].to_vec(); + assert_eq!( + "{\"jsonrpc\":\"2.0\",\"result\":\"hello\",\"id\":1}\n", + String::from_utf8(response_signal).expect("String should be utf-8"), + "Response does not match the expected handling", + ); + *executed_request_move.lock() = true; + + future::ok(()) + }) + .map_err(|err| panic!("Dispach message error: {:?}", err)); - core.run(stream).expect("Should be the payload in message test"); - assert!(*executed_dispatch.borrow_mut()); - assert!(*executed_request.borrow_mut()); + tokio::run(stream); + assert!(*executed_dispatch.lock()); + assert!(*executed_request.lock()); } diff --git a/ws/src/lib.rs b/ws/src/lib.rs index 9389cf54a..67ddc4054 100644 --- a/ws/src/lib.rs +++ b/ws/src/lib.rs @@ -31,5 +31,5 @@ pub use self::server::{CloseHandle, Server}; pub use self::server_builder::ServerBuilder; pub use self::server_utils::cors::Origin; pub use self::server_utils::hosts::{Host, DomainsValidation}; -pub use self::server_utils::tokio_core; +pub use self::server_utils::tokio; pub use self::server_utils::session::{SessionId, SessionStats}; diff --git a/ws/src/metadata.rs b/ws/src/metadata.rs index 51aac4bf6..60835c1d4 100644 --- a/ws/src/metadata.rs +++ b/ws/src/metadata.rs @@ -3,8 +3,7 @@ use std::sync::{atomic, Arc}; use core::{self, futures}; use core::futures::sync::mpsc; -use server_utils::tokio_core::reactor::Remote; -use server_utils::session; +use server_utils::{session, tokio::runtime::TaskExecutor}; use ws; use error; @@ -74,7 +73,7 @@ pub struct RequestContext { /// Direct channel to send messages to a client. pub out: Sender, /// Remote to underlying event loop. - pub remote: Remote, + pub executor: TaskExecutor, } impl RequestContext { @@ -83,7 +82,7 @@ impl RequestContext { pub fn sender(&self) -> mpsc::Sender { let out = self.out.clone(); let (sender, receiver) = mpsc::channel(1); - self.remote.spawn(move |_| SenderFuture(out, receiver)); + self.executor.spawn(SenderFuture(out, receiver)); sender } } diff --git a/ws/src/server.rs b/ws/src/server.rs index 740ca490b..0a0c28996 100644 --- a/ws/src/server.rs +++ b/ws/src/server.rs @@ -6,7 +6,7 @@ use std::thread; use core; use server_utils::cors::Origin; use server_utils::hosts::{self, Host}; -use server_utils::reactor::{UninitializedRemote, Remote}; +use server_utils::reactor::{UninitializedExecutor, Executor}; use server_utils::session::SessionStats; use ws; @@ -18,7 +18,7 @@ use session; pub struct Server { addr: SocketAddr, handle: Option>>, - remote: Arc>>, + executor: Arc>>, broadcaster: ws::Sender, } @@ -27,7 +27,7 @@ impl fmt::Debug for Server { f.debug_struct("Server") .field("addr", &self.addr) .field("handle", &self.handle) - .field("remote", &self.remote) + .field("executor", &self.executor) .finish() } } @@ -48,7 +48,7 @@ impl Server { allowed_hosts: Option>, request_middleware: Option>, stats: Option>, - remote: UninitializedRemote, + executor: UninitializedExecutor, max_connections: usize, ) -> Result { let config = { @@ -71,12 +71,12 @@ impl Server { let allowed_hosts = hosts::update(allowed_hosts, addr); // Spawn event loop (if necessary) - let eloop = remote.initialize()?; - let remote = eloop.remote(); + let eloop = executor.initialize()?; + let executor = eloop.executor(); // Create WebSocket let ws = ws::Builder::new().with_settings(config).build(session::Factory::new( - handler, meta_extractor, allowed_origins, allowed_hosts, request_middleware, stats, remote + handler, meta_extractor, allowed_origins, allowed_hosts, request_middleware, stats, executor ))?; let broadcaster = ws.broadcaster(); @@ -100,7 +100,7 @@ impl Server { Ok(Server { addr: local_addr, handle: Some(handle), - remote: Arc::new(Mutex::new(Some(eloop))), + executor: Arc::new(Mutex::new(Some(eloop))), broadcaster: broadcaster, }) } @@ -121,7 +121,7 @@ impl Server { /// blocking in `wait`. pub fn close_handle(&self) -> CloseHandle { CloseHandle { - remote: self.remote.clone(), + executor: self.executor.clone(), broadcaster: self.broadcaster.clone(), } } @@ -138,7 +138,7 @@ impl Drop for Server { /// A handle that allows closing of a server even if it owned by a thread blocked in `wait`. #[derive(Clone)] pub struct CloseHandle { - remote: Arc>>, + executor: Arc>>, broadcaster: ws::Sender, } @@ -146,6 +146,6 @@ impl CloseHandle { /// Closes the `Server`. pub fn close(self) { let _ = self.broadcaster.shutdown(); - self.remote.lock().unwrap().take().map(|remote| remote.close()); + self.executor.lock().unwrap().take().map(|executor| executor.close()); } } diff --git a/ws/src/server_builder.rs b/ws/src/server_builder.rs index 4266f2d7a..4f54487d9 100644 --- a/ws/src/server_builder.rs +++ b/ws/src/server_builder.rs @@ -5,7 +5,7 @@ use core; use server_utils; use server_utils::cors::Origin; use server_utils::hosts::{Host, DomainsValidation}; -use server_utils::reactor::UninitializedRemote; +use server_utils::reactor::UninitializedExecutor; use server_utils::session::SessionStats; use error::Result; @@ -21,7 +21,7 @@ pub struct ServerBuilder> { allowed_hosts: Option>, request_middleware: Option>, session_stats: Option>, - remote: UninitializedRemote, + executor: UninitializedExecutor, max_connections: usize, } @@ -47,14 +47,14 @@ impl> ServerBuilder { allowed_hosts: None, request_middleware: None, session_stats: None, - remote: UninitializedRemote::Unspawned, + executor: UninitializedExecutor::Unspawned, max_connections: 100, } } - /// Utilize existing event loop remote to poll RPC results. - pub fn event_loop_remote(mut self, remote: server_utils::tokio_core::reactor::Remote) -> Self { - self.remote = UninitializedRemote::Shared(remote); + /// Utilize existing event loop executor to poll RPC results. + pub fn event_loop_executor(mut self, executor: server_utils::tokio::runtime::TaskExecutor) -> Self { + self.executor = UninitializedExecutor::Shared(executor); self } @@ -107,7 +107,7 @@ impl> ServerBuilder { self.allowed_hosts, self.request_middleware, self.session_stats, - self.remote, + self.executor, self.max_connections, ) } diff --git a/ws/src/session.rs b/ws/src/session.rs index ac2d175db..9525807f6 100644 --- a/ws/src/session.rs +++ b/ws/src/session.rs @@ -11,8 +11,8 @@ use slab::Slab; use server_utils::Pattern; use server_utils::cors::Origin; use server_utils::hosts::Host; -use server_utils::tokio_core::reactor::Remote; use server_utils::session::{SessionId, SessionStats}; +use server_utils::tokio::runtime::TaskExecutor; use ws; use error; @@ -145,7 +145,7 @@ pub struct Session> { request_middleware: Option>, stats: Option>, metadata: Option, - remote: Remote, + executor: TaskExecutor, task_slab: Arc, } @@ -268,7 +268,7 @@ impl> ws::Handler for Session { .map(|_| ()) .map_err(|_| ()); - self.remote.spawn(|_| future); + self.executor.spawn(future); Ok(()) } @@ -282,7 +282,7 @@ pub struct Factory> { allowed_hosts: Option>, request_middleware: Option>, stats: Option>, - remote: Remote, + executor: TaskExecutor, } impl> Factory { @@ -293,7 +293,7 @@ impl> Factory { allowed_hosts: Option>, request_middleware: Option>, stats: Option>, - remote: Remote, + executor: TaskExecutor, ) -> Self { Factory { session_id: 0, @@ -303,7 +303,7 @@ impl> Factory { allowed_hosts: allowed_hosts, request_middleware: request_middleware, stats: stats, - remote: remote, + executor: executor, } } } @@ -323,7 +323,7 @@ impl> ws::Factory for Factory { origin: None, protocols: Vec::new(), out: metadata::Sender::new(sender, active), - remote: self.remote.clone(), + executor: self.executor.clone(), }, handler: self.handler.clone(), meta_extractor: self.meta_extractor.clone(), @@ -332,7 +332,7 @@ impl> ws::Factory for Factory { stats: self.stats.clone(), request_middleware: self.request_middleware.clone(), metadata: None, - remote: self.remote.clone(), + executor: self.executor.clone(), task_slab: Arc::new(Mutex::new(Slab::with_capacity(0))), } }