Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: tokio v1.27 #1062

Merged
merged 9 commits into from
Mar 30, 2023
26 changes: 11 additions & 15 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::future::{ConnectionGuard, FutureDriver, ServerHandle, StopHandle};
use crate::future::{ConnectionGuard, ServerHandle, StopHandle};
use crate::logger::{Logger, TransportProtocol};
use crate::transport::{http, ws};

use futures_util::future::{BoxFuture, Either, FutureExt};
use futures_util::future::{Either, FutureExt};
use futures_util::io::{BufReader, BufWriter};

use futures_util::stream::{FuturesUnordered, StreamExt};
use hyper::body::HttpBody;
use jsonrpsee_core::id_providers::RandomIntegerIdProvider;

Expand Down Expand Up @@ -129,12 +130,12 @@ where
let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
let listener = self.listener;

let mut connections = FutureDriver::default();
let mut connections = FuturesUnordered::new();
let stopped = stop_handle.clone().shutdown();
tokio::pin!(stopped);

loop {
match try_accept_conn(&listener, &mut connections, stopped).await {
match try_accept_conn(&listener, stopped).await {
AcceptConnection::Established { socket, remote_addr, stop } => {
let data = ProcessConnection {
remote_addr,
Expand Down Expand Up @@ -167,7 +168,7 @@ where
}
}

connections.await;
while connections.next().await.is_some() {}
Copy link
Collaborator

@jsdw jsdw Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, so if I understand this right, FuturesUnordered won't poll anything until you tell it to, so you are spawning tasks into it (so that they can progress independently) and then this line just makes sure that all tasks are completed before the function ends?

(I think that seems perfectly reasonable to me! Perhaps worth a comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, exactly

}
}

Expand Down Expand Up @@ -718,7 +719,7 @@ fn process_connection<'a, L: Logger, B, U>(
connection_guard: &ConnectionGuard,
cfg: ProcessConnection<L>,
socket: TcpStream,
connections: &mut FutureDriver<BoxFuture<'a, ()>>,
connections: &mut FuturesUnordered<tokio::task::JoinHandle<()>>,
) where
B: Layer<TowerService<L>> + Send + 'static,
<B as Layer<TowerService<L>>>::Service: Send
Expand All @@ -741,7 +742,7 @@ fn process_connection<'a, L: Logger, B, U>(
Some(conn) => conn,
None => {
tracing::warn!("Too many connections. Please try again later.");
connections.add(http::reject_connection(socket).in_current_span().boxed());
connections.push(tokio::spawn(http::reject_connection(socket).in_current_span()));
return;
}
};
Expand Down Expand Up @@ -774,7 +775,7 @@ fn process_connection<'a, L: Logger, B, U>(

let service = service_builder.service(tower_service);

connections.add(Box::pin(to_http_service(socket, service, cfg.stop_handle).in_current_span()));
connections.push(tokio::spawn(to_http_service(socket, service, cfg.stop_handle).in_current_span()));
}

// Attempts to create a HTTP connection from a socket.
Expand Down Expand Up @@ -809,16 +810,11 @@ enum AcceptConnection<S> {
Err((std::io::Error, S)),
}

async fn try_accept_conn<S, F>(
listener: &TcpListener,
connections: &mut FutureDriver<F>,
stopped: S,
) -> AcceptConnection<S>
async fn try_accept_conn<S>(listener: &TcpListener, stopped: S) -> AcceptConnection<S>
where
F: Future + Unpin,
S: Future + Unpin,
{
let accept = connections.select_with(listener.accept());
let accept = listener.accept();
tokio::pin!(accept);

match futures_util::future::select(accept, stopped).await {
Expand Down
6 changes: 2 additions & 4 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ pub(crate) async fn background_task<L: Logger>(
// Spawn another task that sends out the responses on the Websocket.
tokio::spawn(send_task(rx, sender, stop_handle.clone(), ping_interval, conn_rx));

tracing::info!("background task");

// Buffer for incoming data.
let mut data = Vec::with_capacity(100);
let mut method_executor = FutureDriver::default();
Expand Down Expand Up @@ -476,7 +474,7 @@ enum Receive<S> {

async fn try_recv<F, S>(
receiver: &mut Receiver,
mut data: &mut Vec<u8>,
data: &mut Vec<u8>,
method_executor: &mut FutureDriver<F>,
stopped: S,
) -> Receive<S>
Expand All @@ -487,7 +485,7 @@ where
let receive = async {
// Identical loop to `soketto::receive_data` with debug logs for `Pong` frames.
loop {
match receiver.receive(&mut data).await? {
match receiver.receive(data).await? {
soketto::Incoming::Data(d) => break Ok(d),
soketto::Incoming::Pong(_) => tracing::debug!("Received pong"),
soketto::Incoming::Closed(_) => {
Expand Down