Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC][libp2p-swarm] Let network I/O run ahead of NetworkBehaviour polling. #1585

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,31 +618,59 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,

// After the network had a chance to make progress, try to deliver
// the pending event emitted by the behaviour in the previous iteration
// to the connection handler(s). The pending event must be delivered
// before polling the behaviour again. If the targeted peer
// meanwhie disconnected, the event is discarded.
// to the connection handler(s).
//
// The pending event must be delivered before polling the behaviour again.
// If the targeted peer meanwhile disconnected, the event is discarded.
//
// If the pending event cannot be delivered because the connection
// is busy, continue to poll the network. That means the following
// if a single connection (handler) is slow (i.e. does not accept the pending
// event):
//
// 1. All connections are temporarily unable to get new data to send
// from the behaviour (since we only buffer a single pending event).
// This is a problem that still needs to be resolved by providing
// a means for the swarm to exercise connection-specific back-pressure
// on the `NetworkBehaviour`.
//
// 2. New connections continue to get accepted and in general
// network I/O progresses as long as the events emitted by
// the swarm are consumed, i.e. back-pressure on the network
// I/O can only be triggered by the code consuming swarm events.
//
// 3. All connections can continue to receive data and send data
// in the form of events to the `NetworkBehaviour`. All received data
// can be continuously fed into `NetworkBehaviour::inject_event`. It
// may be desirable in the future to allow `inject_event` to signal
// back-pressure via a return value.
//
// 4. As a direct consequence of (3.), `NetworkBehaviour::inject_event` can
// be called any number of times before `NetworkBehaviour::poll` is
// called again (in the absence of a means for `inject_event` to signal
// back-pressure).
if let Some((peer_id, handler, event)) = this.pending_event.take() {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
match handler {
PendingNotifyHandler::One(conn_id) =>
if let Some(mut conn) = peer.connection(conn_id) {
if let Some(event) = notify_one(&mut conn, event, cx) {
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
continue
}
},
PendingNotifyHandler::Any(ids) => {
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
continue
}
}
PendingNotifyHandler::All(ids) => {
if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::All(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
continue
}
}
}
Expand Down Expand Up @@ -709,7 +737,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
if let Some(event) = notify_one(&mut conn, event, cx) {
let handler = PendingNotifyHandler::One(connection);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
continue
}
}
}
Expand All @@ -718,15 +746,15 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
continue
}
}
NotifyHandler::All => {
let ids = peer.connections().into_ids().collect();
if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::All(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
continue
}
}
}
Expand Down