Skip to content

Commit

Permalink
rustfmt fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvc94ch committed Jun 15, 2019
1 parent 553a371 commit ce8cf50
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 85 deletions.
2 changes: 1 addition & 1 deletion core-client/transports/src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
#[cfg(feature = "tls")]
let connector = match hyper_tls::HttpsConnector::new(4) {
Ok(connector) => connector,
Err(e) => return A(future::err(RpcError::Other(e.into())))
Err(e) => return A(future::err(RpcError::Other(e.into()))),
};
#[cfg(feature = "tls")]
let client = Client::builder().build::<_, hyper::Body>(connector);
Expand Down
170 changes: 87 additions & 83 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,11 @@ fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Re
}

fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>, oneshot::Sender<()>),
signals: (
oneshot::Receiver<()>,
mpsc::Sender<io::Result<SocketAddr>>,
oneshot::Sender<()>,
),
executor: tokio::runtime::TaskExecutor,
addr: SocketAddr,
cors_domains: CorsDomains,
Expand All @@ -476,89 +480,89 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
max_request_body_size: usize,
) {
let (shutdown_signal, local_addr_tx, done_tx) = signals;
executor.spawn(future::lazy(move || {
let handle = tokio::reactor::Handle::default();

let bind = move || {
let listener = match addr {
SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
executor.spawn(
future::lazy(move || {
let handle = tokio::reactor::Handle::default();

let bind = move || {
let listener = match addr {
SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
};
configure_port(reuse_port, &listener)?;
listener.reuse_address(true)?;
listener.bind(&addr)?;
let listener = listener.listen(1024)?;
let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
// Add current host to allowed headers.
// NOTE: we need to use `l.local_addr()` instead of `addr`
// it might be different!
let local_addr = listener.local_addr()?;

Ok((listener, local_addr))
};
configure_port(reuse_port, &listener)?;
listener.reuse_address(true)?;
listener.bind(&addr)?;
let listener = listener.listen(1024)?;
let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
// Add current host to allowed headers.
// NOTE: we need to use `l.local_addr()` instead of `addr`
// it might be different!
let local_addr = listener.local_addr()?;

Ok((listener, local_addr))
};

let bind_result = match bind() {
Ok((listener, local_addr)) => {
// Send local address
match local_addr_tx.send(Ok(local_addr)) {
Ok(_) => futures::future::ok((listener, local_addr)),
Err(_) => {
warn!(
"Thread {:?} unable to reach receiver, closing server",
thread::current().name()
);
futures::future::err(())
let bind_result = match bind() {
Ok((listener, local_addr)) => {
// Send local address
match local_addr_tx.send(Ok(local_addr)) {
Ok(_) => futures::future::ok((listener, local_addr)),
Err(_) => {
warn!(
"Thread {:?} unable to reach receiver, closing server",
thread::current().name()
);
futures::future::err(())
}
}
}
}
Err(err) => {
// Send error
let _send_result = local_addr_tx.send(Err(err));
Err(err) => {
// Send error
let _send_result = local_addr_tx.send(Err(err));

futures::future::err(())
}
};
futures::future::err(())
}
};

bind_result.and_then(move |(listener, local_addr)| {
let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);

let mut http = server::conn::Http::new();
http.keep_alive(keep_alive);
let tcp_stream = SuspendableStream::new(listener.incoming());

tcp_stream
.for_each(move |socket| {
let service = ServerHandler::new(
jsonrpc_handler.clone(),
cors_domains.clone(),
cors_max_age,
allowed_headers.clone(),
allowed_hosts.clone(),
request_middleware.clone(),
rest_api,
health_api.clone(),
max_request_body_size,
keep_alive,
);
tokio::spawn(
http.serve_connection(socket, service)
.map_err(|e| error!("Error serving connection: {:?}", e)),
);
Ok(())
})
.map_err(|e| {
warn!("Incoming streams error, closing sever: {:?}", e);
})
.select(shutdown_signal
.map_err(|e| {
debug!("Shutdown signaller dropped, closing server: {:?}", e);
}))
.map(|_| ())
.map_err(|_| ())
bind_result.and_then(move |(listener, local_addr)| {
let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);

let mut http = server::conn::Http::new();
http.keep_alive(keep_alive);
let tcp_stream = SuspendableStream::new(listener.incoming());

tcp_stream
.for_each(move |socket| {
let service = ServerHandler::new(
jsonrpc_handler.clone(),
cors_domains.clone(),
cors_max_age,
allowed_headers.clone(),
allowed_hosts.clone(),
request_middleware.clone(),
rest_api,
health_api.clone(),
max_request_body_size,
keep_alive,
);
tokio::spawn(
http.serve_connection(socket, service)
.map_err(|e| error!("Error serving connection: {:?}", e)),
);
Ok(())
})
.map_err(|e| {
warn!("Incoming streams error, closing sever: {:?}", e);
})
.select(shutdown_signal.map_err(|e| {
debug!("Shutdown signaller dropped, closing server: {:?}", e);
}))
.map(|_| ())
.map_err(|_| ())
})
})
}).and_then(|_| {
done_tx.send(())
}));
.and_then(|_| done_tx.send(())),
);
}

#[cfg(unix)]
Expand Down Expand Up @@ -586,12 +590,12 @@ pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>)
impl CloseHandle {
/// Shutdown a running server
pub fn close(self) {
if let Some(executors) = self.0.lock().take() {
for (executor, closer) in executors {
executor.close();
let _ = closer.send(());
}
}
if let Some(executors) = self.0.lock().take() {
for (executor, closer) in executors {
executor.close();
let _ = closer.send(());
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion http/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,8 @@ fn close_handle_makes_wait_return() {

close_handle.close();

rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close");
rx.recv_timeout(Duration::from_secs(10))
.expect("Expected server to close");
}

#[test]
Expand Down

0 comments on commit ce8cf50

Please sign in to comment.