Skip to content

Commit

Permalink
Use vecdeque for fair polling (libp2p#610)
Browse files Browse the repository at this point in the history
* WIP

* Use pop_back()
Improve tests

* Avoid unwrapping

* Update core/src/nodes/listeners.rs

Co-Authored-By: dvdplm <dvdplm@gmail.com>
  • Loading branch information
dvdplm authored and tomaka committed Nov 8, 2018
1 parent c04d0fe commit d961e65
Showing 1 changed file with 72 additions and 46 deletions.
118 changes: 72 additions & 46 deletions core/src/nodes/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::prelude::*;
use std::fmt;
use void::Void;
use {Multiaddr, Transport};
use std::collections::VecDeque;

/// Implementation of `futures::Stream` that allows listening on multiaddresses.
///
Expand Down Expand Up @@ -81,7 +82,7 @@ where
/// Transport used to spawn listeners.
transport: TTrans,
/// All the active listeners.
listeners: Vec<Listener<TTrans>>,
listeners: VecDeque<Listener<TTrans>>,
}

/// A single active listener.
Expand Down Expand Up @@ -131,7 +132,7 @@ where
pub fn new(transport: TTrans) -> Self {
ListenersStream {
transport,
listeners: Vec::new(),
listeners: VecDeque::new(),
}
}

Expand All @@ -141,7 +142,7 @@ where
pub fn with_capacity(transport: TTrans, capacity: usize) -> Self {
ListenersStream {
transport,
listeners: Vec::with_capacity(capacity),
listeners: VecDeque::with_capacity(capacity),
}
}

Expand All @@ -158,7 +159,7 @@ where
.listen_on(addr)
.map_err(|(_, addr)| addr)?;

self.listeners.push(Listener {
self.listeners.push_back(Listener {
listener,
address: new_addr.clone(),
});
Expand All @@ -181,15 +182,17 @@ where
/// Provides an API similar to `Stream`, except that it cannot error.
pub fn poll(&mut self) -> Async<ListenersEvent<TTrans>> {
// We remove each element from `listeners` one by one and add them back.
for n in (0..self.listeners.len()).rev() {
let mut listener = self.listeners.swap_remove(n);
let mut remaining = self.listeners.len();
while let Some(mut listener) = self.listeners.pop_back() {
match listener.listener.poll() {
Ok(Async::NotReady) => {
self.listeners.push(listener);
self.listeners.push_front(listener);
remaining -= 1;
if remaining == 0 { break }
}
Ok(Async::Ready(Some((upgrade, send_back_addr)))) => {
let listen_addr = listener.address.clone();
self.listeners.push(listener);
self.listeners.push_front(listener);
return Async::Ready(ListenersEvent::Incoming {
upgrade,
listen_addr,
Expand Down Expand Up @@ -379,14 +382,17 @@ mod tests {
}

#[test]
fn listener_stream_poll_with_ready_listeners_is_ready() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
fn listener_stream_poll_with_ready_listeners_yields_upgrade() {
let mut transport = DummyTransport::new();
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let mut ls = ListenersStream::new(transport);

let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = "/ip4/127.0.0.2/tcp/4321".parse::<Multiaddr>().expect("bad multiaddr");
let mut ls = ListenersStream::new(t);
ls.listen_on(addr1).expect("listen_on failed");
ls.listen_on(addr2).expect("listen_on failed");

ls.listen_on(addr1).expect("listen_on works");
ls.listen_on(addr2).expect("listen_on works");
assert_eq!(ls.listeners.len(), 2);

assert_matches!(ls.poll(), Async::Ready(listeners_event) => {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
Expand All @@ -396,21 +402,7 @@ mod tests {
});
})
});
// TODO: When several listeners are continuously Async::Ready –
// admittetdly a corner case – the last one is processed first and then
// put back *last* on the pile. This means that at the next poll() it
// will get polled again and if it always has data to yield, it will
// effectively block all other listeners from being "heard". One way
// around this is to switch to using a `VecDeque` to keep the listeners
// collection, and instead of pushing the processed item to the end of
// the list, stick it on top so that it'll be processed *last* instead
// during the next poll. This might also get us a performance win as
// even in the normal case, the most recently polled listener is more
// unlikely to have anything to yield than the others so we might avoid
// a few unneeded poll calls.

// Make the second listener return NotReady so we get the first listener next poll()
set_listener_state(&mut ls, 1, ListenerState::Ok(Async::NotReady));

assert_matches!(ls.poll(), Async::Ready(listeners_event) => {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234");
Expand All @@ -419,7 +411,18 @@ mod tests {
});
})
});
assert_eq!(ls.listeners.len(), 2);

set_listener_state(&mut ls, 1, ListenerState::Ok(Async::NotReady));
assert_matches!(ls.poll(), Async::Ready(listeners_event) => {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
// Second time we poll this Listener, so we get `2` from the transport Stream
assert_eq!(tup, 2)
});
})
});

}

#[test]
Expand Down Expand Up @@ -450,48 +453,71 @@ mod tests {
}

#[test]
fn listener_stream_poll_chatty_listeners_may_drown_others() {
fn listener_stream_poll_chatty_listeners_each_get_their_turn() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let mut ls = ListenersStream::new(t);
// Create 4 Listeners
for n in 0..4 {
let addr = format!("/ip4/127.0.0.{}/tcp/123{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
let addr = format!("/ip4/127.0.0.{}/tcp/{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
ls.listen_on(addr).expect("listen_on failed");
}

// polling processes listeners in reverse order
// Only the last listener ever gets processed
for _n in 0..10 {
// Poll() processes listeners in reverse order. Each listener is polled
// in turn.
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.3/tcp/1233")
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
// Make last listener NotReady so now only the third listener is processed
// Doing it again yields them in the same order
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}

// Make last listener NotReady; it will become the first element and
// retried after trying the other Listeners.
set_listener_state(&mut ls, 3, ListenerState::Ok(Async::NotReady));
for _n in 0..10 {
for n in (0..3).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
for n in (0..3).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.2/tcp/1232")
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}

// Turning the last listener back on means we now have 4 "good"
// listeners, and each get their turn.
set_listener_state(&mut ls, 3, ListenerState::Ok(Async::Ready(Some(2))));
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
}

#[test]
fn listener_stream_poll_processes_listeners_as_expected_if_they_are_not_yielding_continuously() {
fn listener_stream_poll_processes_listeners_in_turn() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let mut ls = ListenersStream::new(t);
for n in 0..4 {
let addr = format!("/ip4/127.0.0.{}/tcp/123{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
let addr = format!("/ip4/127.0.0.{}/tcp/{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
ls.listen_on(addr).expect("listen_on failed");
}
// If the listeners do not yield items continuously (the normal case) we
// process them in the expected, reverse, order.

for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/123{}", n, n));
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n));
});
// kick the last listener (current) to NotReady state
set_listener_state(&mut ls, 3, ListenerState::Ok(Async::NotReady));
set_listener_state(&mut ls, 0, ListenerState::Ok(Async::NotReady));
}
// All Listeners are NotReady, so poll yields NotReady
assert_matches!(ls.poll(), Async::NotReady);
}
}

0 comments on commit d961e65

Please sign in to comment.