Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

substream-set: Poll substreams fairly with poll indexes #228

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions src/substream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ use crate::transport::websocket;

use bytes::{Buf, Bytes, BytesMut};
use futures::{Sink, Stream};
use indexmap::{map::Entry, IndexMap};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use unsigned_varint::{decode, encode};

use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
collections::VecDeque,
fmt,
hash::Hash,
io::ErrorKind,
pin::Pin,
task::{Context, Poll},
task::{Context, Poll, Waker},
};

/// Logging target for the file.
Expand Down Expand Up @@ -765,7 +766,9 @@ where
K: SubstreamSetKey,
S: Stream<Item = Result<BytesMut, SubstreamError>> + Unpin,
{
substreams: HashMap<K, S>,
substreams: IndexMap<K, S>,
poll_index: usize,
waker: Option<Waker>,
}

impl<K, S> SubstreamSet<K, S>
Expand All @@ -776,7 +779,9 @@ where
/// Create new [`SubstreamSet`].
pub fn new() -> Self {
Self {
substreams: HashMap::new(),
substreams: IndexMap::new(),
poll_index: 0,
waker: None,
}
}

Expand All @@ -785,6 +790,7 @@ where
match self.substreams.entry(key) {
Entry::Vacant(entry) => {
entry.insert(substream);
self.waker.take().map(|waker| waker.wake());
}
Entry::Occupied(_) => {
tracing::error!(?key, "substream already exists");
Expand All @@ -795,7 +801,15 @@ where

/// Remove substream from the set.
pub fn remove(&mut self, key: &K) -> Option<S> {
self.substreams.remove(key)
// The `swap_remove()` changes the order of elements in the map,
// however it completes in O(1). This is acceptable since the
// alternative of calling `shift_remove()` would be O(n).
let Some(substream) = self.substreams.swap_remove(key) else {
return None;
};

self.waker.take().map(|waker| waker.wake());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to wake the task when removing a substream. I.e., all the remaining futures in the map were already registered in the task context.

Some(substream)
}

/// Get mutable reference to stored substream.
Expand Down Expand Up @@ -825,8 +839,15 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::into_inner(self);

// TODO: poll the streams more randomly
for (key, mut substream) in inner.substreams.iter_mut() {
let len = inner.substreams.len();

for _ in 0..len {
let index = inner.poll_index % len;
inner.poll_index = (inner.poll_index + 1) % len;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just toss the starting index randomly when poll_next is called? This way we won't need to keep the last starting index, and the polling will be even more random than when just offsetting by 1. I.e., when we offset the index by one there are still more chances to poll substreams from the first half of the list until the starting index offsets significantly.

Also, I would even keep a HashMap for additional shuffling instead of using IndexMap that keeps the insertion order. (We will need to traverse the HashMap twice to implement iteration starting from some offset. This should not be a problem, but double check the order of the HashMap iteration is the same if the map hasn't changed between the iterations, please.)


let (key, mut substream) =
inner.substreams.get_index_mut(index).expect("Index within range; qed");

match Pin::new(&mut substream).poll_next(cx) {
Poll::Pending => continue,
Poll::Ready(Some(data)) => return Poll::Ready(Some((*key, data))),
Expand All @@ -835,6 +856,7 @@ where
}
}

inner.waker = Some(cx.waker().clone());
Poll::Pending
}
}
Expand Down