Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Connection to a synchronous state machine #142

Merged
merged 41 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9bd4a34
Fix clippy warnings
thomaseizinger Oct 25, 2022
9af35b8
Remove unnecessary `Option`
thomaseizinger Oct 8, 2022
54296c5
Make `gargabe_collect` not async
thomaseizinger Oct 8, 2022
6234e6f
Handle socket closing outside of `on_control_command`
thomaseizinger Oct 8, 2022
d5d5ef4
Handle GoAway logic outside of `on_stream_command`
thomaseizinger Oct 8, 2022
6a55933
Remove `async` from a bunch of functions
thomaseizinger Oct 8, 2022
c1e805d
Introduce `Connection::poll` function
thomaseizinger Oct 8, 2022
e02bdbc
Minimise diff
thomaseizinger Oct 8, 2022
3a6a374
Split `on_control_command`
thomaseizinger Oct 11, 2022
5e64609
Use whitespace
thomaseizinger Oct 11, 2022
3817398
Split `on_stream_command`
thomaseizinger Oct 11, 2022
0ebd9cc
Prioritise control and stream commands over reading the socket
thomaseizinger Oct 11, 2022
b37e484
Fail if we want to double close a connection
thomaseizinger Oct 11, 2022
f776a43
Introduce internal `ConnectionState` enum
thomaseizinger Oct 11, 2022
d63cde2
Track closed state in connection state enum
thomaseizinger Oct 11, 2022
a1db18e
Inline `on_close_connection`
thomaseizinger Oct 11, 2022
fd886ee
Implement shutdown procedurally
thomaseizinger Oct 12, 2022
527012d
Don't pause `control_receiver`
thomaseizinger Oct 12, 2022
f9d3f4b
Implement `ConnectionState::poll`
thomaseizinger Oct 12, 2022
2df23f0
Implement `into_stream` by directly calling `poll_next`
thomaseizinger Oct 12, 2022
6ca0d1f
Improve naming of `poll` fn on `ConnectionState`
thomaseizinger Oct 12, 2022
bb96cd9
Improve docs
thomaseizinger Oct 12, 2022
ff27080
Don't fail if connection got closed gracefully
thomaseizinger Oct 12, 2022
527df19
Rename `Failing` to `Cleanup`
thomaseizinger Oct 12, 2022
19ee6e8
Add docs to state variants
thomaseizinger Oct 12, 2022
370f5bc
Track reply sender outside of `close` function
thomaseizinger Oct 20, 2022
7dcecde
Handle `ControlCommand` outside of `ConnectionState::poll`
thomaseizinger Oct 20, 2022
fe7d000
Introduce `Frame::close_stream` ctor
thomaseizinger Oct 20, 2022
2bae656
Move `Drop` impl from `Active` to `Connection`
thomaseizinger Oct 20, 2022
47c684b
Implement connection closing as manual state machine
thomaseizinger Oct 20, 2022
8279c27
Implement connection cleanup as manual state machine
thomaseizinger Oct 25, 2022
ce57e25
Don't require `T` to be `Send + 'static'`
thomaseizinger Oct 25, 2022
0ac90a0
Rewrite `Control` to be a layer on top of `Connection`
thomaseizinger Oct 24, 2022
79c1479
Make `control` as sister module of `connection`
thomaseizinger Oct 25, 2022
3941cdc
Add test for poll-based API
thomaseizinger Oct 25, 2022
a0ba23b
Create workspace
thomaseizinger Nov 3, 2022
654e38a
Remove comment and improve variable naming
thomaseizinger Nov 14, 2022
1525cf8
Fix typo
thomaseizinger Nov 14, 2022
27ed7ac
Match exhaustively
thomaseizinger Nov 14, 2022
1ca32e6
Use doc link
thomaseizinger Nov 14, 2022
63938b1
Bump version and add changelog entry
thomaseizinger Nov 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 11 additions & 55 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
// command processing instead of having to scan the whole collection of
// `Stream`s on each loop iteration, which is not great.

mod closing;
mod control;
mod stream;

Expand All @@ -109,6 +110,7 @@ use std::collections::VecDeque;
use std::task::Context;
use std::{fmt, sync::Arc, task::Poll};

use crate::connection::closing::Closing;
pub use control::Control;
pub use stream::{Packet, State, Stream};

Expand Down Expand Up @@ -219,7 +221,7 @@ enum ConnectionState<T> {
Active(Active<T>),
/// Our user requested to shutdown the connection, we are working on it.
Closing {
inner: BoxFuture<'static, Result<()>>,
inner: Closing<T>,
reply: oneshot::Sender<()>,
},
/// An error occurred and we are cleaning up our resources.
Expand Down Expand Up @@ -256,7 +258,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send + 'static> ConnectionState<T> {
}
Poll::Ready(Some(ControlCommand::CloseConnection(reply))) => {
*self = ConnectionState::Closing {
inner: active.close().boxed(),
inner: active.close(),
reply,
};
continue;
Expand Down Expand Up @@ -449,59 +451,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}

/// Gracefully close the connection to the remote.
async fn close(mut self) -> Result<()> {
// Prevent more control commands being sent.
self.control_receiver.close();

// Drain all remaining control commands.
while let Some(cmd) = self.control_receiver.next().await {
match cmd {
ControlCommand::OpenStream(reply) => {
let _ = reply.send(Err(ConnectionError::Closed));
}
ControlCommand::CloseConnection(reply) => {
let _ = reply.send(());
}
}
}

// Prevent any more work being sent from streams.
self.stream_receiver.close();

// Drain all remaining stream commands.
while let Some(cmd) = self.stream_receiver.next().await {
match cmd {
StreamCommand::SendFrame(frame) => {
self.on_send_frame(frame);
}
StreamCommand::CloseStream { id, ack } => {
self.on_close_stream(id, ack);
}
}
}

// Send `GoAway` frame.
log::debug!("{}: sending term", self.id);
self.pending_frames.push_back(Frame::term().into());

// Drain all remaining frames.
while let Some(pending_frame) = self.pending_frames.pop_front() {
self.socket
.feed(pending_frame)
.await
.or(Err(ConnectionError::Closed))?;
}

self.socket.flush().await.or(Err(ConnectionError::Closed))?;

// Close the socket.
self.socket
.get_mut()
.close()
.await
.or(Err(ConnectionError::Closed))?;

Ok(())
fn close(self) -> Closing<T> {
Closing::new(
self.control_receiver,
self.stream_receiver,
self.pending_frames,
self.socket,
)
}

/// Cleanup all our resources.
Expand Down
117 changes: 117 additions & 0 deletions src/connection/closing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use crate::connection::Result;
use crate::connection::{ControlCommand, StreamCommand};
use crate::frame::Frame;
use crate::{frame, ConnectionError};
use futures::channel::mpsc;
use futures::stream::Fuse;
use futures::{ready, AsyncRead, AsyncWrite, SinkExt, StreamExt};
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// A [`Future`] that gracefully closes the yamux connection.
#[must_use]
pub struct Closing<T> {
state: State,
control_receiver: mpsc::Receiver<ControlCommand>,
stream_receiver: mpsc::Receiver<StreamCommand>,
pending_frames: VecDeque<Frame<()>>,
socket: Fuse<frame::Io<T>>,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question not suggestion: Why deliberately implement this as a state machine instead of a procedural async/await?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that it can be named without boxing it up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the drawback of boxing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the drawback of boxing it?

  • Performance
  • We have to decide whether we add the Send bound. In the current design, we get to delete the YamuxLocal stuff in rust-libp2p because the Send bound is inferred.

I don't feel strongly about either but it felt like a nice improvement as I went along. Once we get "impl Trait in type-alias" in Rust at least the boxing would go away.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the drawback of boxing it?

* Performance

Is there any benchmark proving this? Is performance relevant when closing a connection?

We have to decide whether we add the Send bound. In the current design, we get to delete the YamuxLocal stuff in rust-libp2p because the Send bound is inferred.

The infectious-send-when-boxing problem is reason enough to not box in my eyes 👍


impl<T> Closing<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) fn new(
control_receiver: mpsc::Receiver<ControlCommand>,
stream_receiver: mpsc::Receiver<StreamCommand>,
pending_frames: VecDeque<Frame<()>>,
socket: Fuse<frame::Io<T>>,
) -> Self {
Self {
state: State::ClosingControlReceiver,
control_receiver,
stream_receiver,
pending_frames,
socket,
}
}
}

impl<T> Future for Closing<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();

loop {
match this.state {
State::ClosingControlReceiver => {
this.control_receiver.close();
this.state = State::DrainingControlReceiver;
}
State::DrainingControlReceiver => {
match ready!(this.control_receiver.poll_next_unpin(cx)) {
Some(ControlCommand::OpenStream(reply)) => {
let _ = reply.send(Err(ConnectionError::Closed));
}
Some(ControlCommand::CloseConnection(reply)) => {
let _ = reply.send(());
}
None => this.state = State::ClosingStreamReceiver,
}
}
State::ClosingStreamReceiver => {
this.stream_receiver.close();
this.state = State::DrainingStreamReceiver;
}

State::DrainingStreamReceiver => {
this.stream_receiver.close();

match ready!(this.stream_receiver.poll_next_unpin(cx)) {
Some(StreamCommand::SendFrame(frame)) => {
this.pending_frames.push_back(frame.into())
}
Some(StreamCommand::CloseStream { id, ack }) => this
.pending_frames
.push_back(Frame::close_stream(id, ack).into()),
None => this.state = State::SendingTermFrame,
}
}
State::SendingTermFrame => {
this.pending_frames.push_back(Frame::term().into());
this.state = State::FlushingPendingFrames;
}
State::FlushingPendingFrames => {
ready!(this.socket.poll_ready_unpin(cx))?;

match this.pending_frames.pop_front() {
Some(frame) => this.socket.start_send_unpin(frame)?,
None => this.state = State::ClosingSocket,
}
}
State::ClosingSocket => {
ready!(this.socket.poll_close_unpin(cx))?;

return Poll::Ready(Ok(()));
}
}
}
}
}

pub enum State {
ClosingControlReceiver,
DrainingControlReceiver,
ClosingStreamReceiver,
DrainingStreamReceiver,
SendingTermFrame,
FlushingPendingFrames,
ClosingSocket,
}