Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: tokio v1.27 #1062

Merged
merged 9 commits into from
Mar 30, 2023
31 changes: 6 additions & 25 deletions server/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ use std::task::{Context, Poll};
use futures_util::future::FutureExt;
use jsonrpsee_core::Error;
use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError};
use tokio::time::{self, Duration, Interval};

/// Polling for server stop monitor interval in milliseconds.
const STOP_MONITOR_POLLING_INTERVAL: Duration = Duration::from_millis(1000);

/// This is a flexible collection of futures that need to be driven to completion
/// alongside some other future, such as connection handlers that need to be
Expand All @@ -47,16 +43,11 @@ const STOP_MONITOR_POLLING_INTERVAL: Duration = Duration::from_millis(1000);
/// `select_with` providing some other future, the result of which you need.
pub(crate) struct FutureDriver<F> {
futures: Vec<F>,
stop_monitor_heartbeat: Interval,
}

impl<F> Default for FutureDriver<F> {
fn default() -> Self {
let mut heartbeat = time::interval(STOP_MONITOR_POLLING_INTERVAL);

heartbeat.set_missed_tick_behavior(time::MissedTickBehavior::Skip);

FutureDriver { futures: Vec::new(), stop_monitor_heartbeat: heartbeat }
FutureDriver { futures: Vec::new() }
}
}

Expand Down Expand Up @@ -94,12 +85,6 @@ where
}
}
}

fn poll_stop_monitor_heartbeat(&mut self, cx: &mut Context) {
// We don't care about the ticks of the heartbeat, it's here only
// to periodically wake the `Waker` on `cx`.
let _ = self.stop_monitor_heartbeat.poll_tick(cx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we relied on this to make sure that we wake up:

  • when there's no scheduled tasks that would yield to the executor (call waker.wake())
  • for at most one second

To remove tokio::time::Interval we are polling the FutureDriver alongside other operations that we do in the server:

  • try_accept_conn: Await incoming connection
  • wait_for_permit: Await backpressure for some internal buffer space
  • try_recv: Can wait for soketto::Pong, soketto::Data, and soketto::Closed

Could we still get in a situation where we have no other competing tasks to drive the FutureDriver?
(ie, we don't have soketto::Pong to wake us up -- or is configured at 100seconds -- and we don't receive soketto::Data)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After convincing myself that this DriverSelect thing (ie select_with) will poll the futures in itself as needed, maybe I'm missing something but why does it matter if this thing doesn't wake up for a while if none of the futures in it need to progress?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was concerned that the FutureDriver will not be polled on time to handle a Ready future from its internal vector. Meaning that if we don't get enough traction from the try_recv, we previously made sure to have some progress with the tokio::Interval.

I was trying to imagine the unlikely scenario where soketto::recv from try_recv won't generate events for 10 minutes, but our futures from the driver are all ready. In this case, the tokio::Interval was making sure to advance things a bit sooner.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My current understanding is now that when any of those internal futures makes progress, the driver task will be polled again (or whatever task contains the Driverselect, anyway) and they will all re-run. So, no futures in this list should be ignored, basically. If an internal item is Ready, then it'll have called wake() to make everything get polled again.

Copy link
Member Author

@niklasad1 niklasad1 Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, that's a good question/observation but I found that quite unlikely because both batch and calls are executed as one future/task.

Then in each loop iteration both try_recv and wait_for_permit are awaited on which checks "the future vec" but it could be possible to spawn huge futures and then "never send some data again/backpressure kicks in" then those would never woken up again but I don't see it as an issue because it's up to the client enforce that.

If it's idle fine but we could have a few ready tasks in the future vec.

}
}

impl<F> Future for FutureDriver<F>
Expand Down Expand Up @@ -140,12 +125,13 @@ where
let this = Pin::into_inner(self);

this.driver.drive(cx);
this.driver.poll_stop_monitor_heartbeat(cx);

this.selector.poll_unpin(cx)
}
}

/// Represent a stop handle which is a wrapper over a `multi-consumer receiver`
/// and cloning [`StopHandle`] will get a separate instance of the underlying receiver.
#[derive(Debug, Clone)]
pub(crate) struct StopHandle(watch::Receiver<()>);

Expand All @@ -154,14 +140,9 @@ impl StopHandle {
Self(rx)
}

pub(crate) fn shutdown_requested(&self) -> bool {
// if a message has been seen, it means that `stop` has been called.
self.0.has_changed().unwrap_or(true)
}

pub(crate) async fn shutdown(&mut self) {
// Err(_) implies that the `sender` has been dropped.
// Ok(_) implies that `stop` has been called.
/// A future that resolves when server has been stopped
/// it consumes the stop handle.
pub(crate) async fn shutdown(mut self) {
let _ = self.0.changed().await;
}
}
Expand Down
110 changes: 46 additions & 64 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use crate::future::{ConnectionGuard, FutureDriver, ServerHandle, StopHandle};
use crate::future::{ConnectionGuard, ServerHandle, StopHandle};
use crate::logger::{Logger, TransportProtocol};
use crate::transport::{http, ws};

use futures_util::future::{BoxFuture, FutureExt};
use futures_util::future::{Either, FutureExt};
use futures_util::io::{BufReader, BufWriter};

use futures_util::stream::{FuturesUnordered, StreamExt};
use hyper::body::HttpBody;
use jsonrpsee_core::id_providers::RandomIntegerIdProvider;

Expand Down Expand Up @@ -127,12 +128,15 @@ where

let mut id: u32 = 0;
let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
let mut connections = FutureDriver::default();
let mut incoming = Monitored::new(Incoming(self.listener), &stop_handle);
let listener = self.listener;

let mut connections = FuturesUnordered::new();
let stopped = stop_handle.clone().shutdown();
tokio::pin!(stopped);

loop {
match connections.select_with(&mut incoming).await {
Ok((socket, remote_addr)) => {
match try_accept_conn(&listener, stopped).await {
AcceptConnection::Established { socket, remote_addr, stop } => {
let data = ProcessConnection {
remote_addr,
methods: methods.clone(),
Expand All @@ -154,15 +158,21 @@ where
};
process_connection(&self.service_builder, &connection_guard, data, socket, &mut connections);
id = id.wrapping_add(1);
stopped = stop;
}
Err(MonitoredError::Selector(err)) => {
tracing::error!("Error while awaiting a new connection: {:?}", err);
AcceptConnection::Err((e, stop)) => {
tracing::error!("Error while awaiting a new connection: {:?}", e);
stopped = stop;
}
Err(MonitoredError::Shutdown) => break,
AcceptConnection::Shutdown => break,
}
}

connections.await;
// FuturesUnordered won't poll anything until this line but because the
// tasks are spawned (so that they can progress independently)
// then this just makes sure that all tasks are completed before
// returning from this function.
while connections.next().await.is_some() {}
Copy link
Collaborator

@jsdw jsdw Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, so if I understand this right, FuturesUnordered won't poll anything until you tell it to, so you are spawning tasks into it (so that they can progress independently) and then this line just makes sure that all tasks are completed before the function ends?

(I think that seems perfectly reasonable to me! Perhaps worth a comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, exactly

}
}

Expand Down Expand Up @@ -668,56 +678,6 @@ impl<L: Logger> hyper::service::Service<hyper::Request<hyper::Body>> for TowerSe
}
}

/// This is a glorified select listening for new messages, while also checking the `stop_receiver` signal.
struct Monitored<'a, F> {
future: F,
stop_monitor: &'a StopHandle,
}

impl<'a, F> Monitored<'a, F> {
fn new(future: F, stop_monitor: &'a StopHandle) -> Self {
Monitored { future, stop_monitor }
}
}

enum MonitoredError<E> {
Shutdown,
Selector(E),
}

struct Incoming(TcpListener);

impl<'a> Future for Monitored<'a, Incoming> {
type Output = Result<(TcpStream, SocketAddr), MonitoredError<std::io::Error>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);

if this.stop_monitor.shutdown_requested() {
return Poll::Ready(Err(MonitoredError::Shutdown));
}

this.future.0.poll_accept(cx).map_err(MonitoredError::Selector)
}
}

impl<'a, 'f, F, T, E> Future for Monitored<'a, Pin<&'f mut F>>
where
F: Future<Output = Result<T, E>>,
{
type Output = Result<T, MonitoredError<E>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);

if this.stop_monitor.shutdown_requested() {
return Poll::Ready(Err(MonitoredError::Shutdown));
}

this.future.poll_unpin(cx).map_err(MonitoredError::Selector)
}
}

struct ProcessConnection<L> {
/// Remote server address.
remote_addr: SocketAddr,
Expand Down Expand Up @@ -763,7 +723,7 @@ fn process_connection<'a, L: Logger, B, U>(
connection_guard: &ConnectionGuard,
cfg: ProcessConnection<L>,
socket: TcpStream,
connections: &mut FutureDriver<BoxFuture<'a, ()>>,
connections: &mut FuturesUnordered<tokio::task::JoinHandle<()>>,
) where
B: Layer<TowerService<L>> + Send + 'static,
<B as Layer<TowerService<L>>>::Service: Send
Expand All @@ -786,7 +746,7 @@ fn process_connection<'a, L: Logger, B, U>(
Some(conn) => conn,
None => {
tracing::warn!("Too many connections. Please try again later.");
connections.add(http::reject_connection(socket).in_current_span().boxed());
connections.push(tokio::spawn(http::reject_connection(socket).in_current_span()));
return;
}
};
Expand Down Expand Up @@ -819,11 +779,11 @@ fn process_connection<'a, L: Logger, B, U>(

let service = service_builder.service(tower_service);

connections.add(Box::pin(try_accept_connection(socket, service, cfg.stop_handle).in_current_span()));
connections.push(tokio::spawn(to_http_service(socket, service, cfg.stop_handle).in_current_span()));
}

// Attempts to create a HTTP connection from a socket.
async fn try_accept_connection<S, B>(socket: TcpStream, service: S, mut stop_handle: StopHandle)
async fn to_http_service<S, B>(socket: TcpStream, service: S, stop_handle: StopHandle)
where
S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<B>> + Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Expand All @@ -847,3 +807,25 @@ where
}
}
}

enum AcceptConnection<S> {
Shutdown,
Established { socket: TcpStream, remote_addr: SocketAddr, stop: S },
Err((std::io::Error, S)),
}

async fn try_accept_conn<S>(listener: &TcpListener, stopped: S) -> AcceptConnection<S>
where
S: Future + Unpin,
{
let accept = listener.accept();
tokio::pin!(accept);

match futures_util::future::select(accept, stopped).await {
Either::Left((res, stop)) => match res {
Ok((socket, remote_addr)) => AcceptConnection::Established { socket, remote_addr, stop },
Err(e) => AcceptConnection::Err((e, stop)),
},
Either::Right(_) => AcceptConnection::Shutdown,
}
}
Loading