Skip to content

Commit

Permalink
Tests for HandledNode (libp2p#546)
Browse files Browse the repository at this point in the history
* Add unit tests for core::nodes::NodeStream

* Move DummyMuxer to core/tests

* Address grumbles

* Impl Debug for SubstreamRef<P>

* Add test for poll()

* Don't need to open a substream

* pretty printer test

* More tests for NodeStream poll()

* ListenerStream unit tests: transport() and listeners()

* Tests for nodes/listeners.rs

* Add a few tests to help illustrate the "drowning" behaviour of busy listeners

* Tests for HandledNode

* Address grumbles

* Remove non-project specific stuff

* Address grumbles

* Prefer freestanding function

* Untangle the code for old shutdown test from the new tests
Add HandlerState and use it in TestBuilder
Shorter test names

* WIP – tests pass

* Use a newtype to lighten up the function signatures a bit
Test NotReady case

* Cleanup Event enum
Track events as they reach the Handler
Describe complex test logic

* Assert on the event trace

* More tests for poll()

* Switch to using usize as the OutboundOpenInfo so we can assert on event contents
More tests for poll()

* whitespace

* Move Handler related code to dummy_handler

* Fixes broken test after upstream changes

* Clarify the behaviour of is_shutting_down

* Fix broken test

* Fix tests after recent changes on master

* no tabs

* whitespace

* rustfmt

* Add public HandledNode.handler() method that returns a ref to the NodeHandler
Don't use private members in tests

* Add HandledNode.handler_mut that returns a mutable ref to the NodeHandler

* Remove debugging stmts

* Fix parse error
  • Loading branch information
dvdplm authored and tomaka committed Nov 2, 2018
1 parent 9249e31 commit 437a8c0
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 35 deletions.
334 changes: 305 additions & 29 deletions core/src/nodes/handled_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub trait NodeHandler {
/// Injects an event coming from the outside into the handler.
fn inject_event(&mut self, event: Self::InEvent);

/// Indicates that the node that it should shut down. After that, it is expected that `poll()`
/// Indicates to the node that it should shut down. After that, it is expected that `poll()`
/// returns `Ready(None)` as soon as possible.
///
/// This method allows an implementation to perform a graceful shutdown of the substreams, and
Expand Down Expand Up @@ -176,6 +176,16 @@ where
}
}

/// Returns a reference to the `NodeHandler`
pub fn handler(&self) -> &THandler{
&self.handler
}

/// Returns a mutable reference to the `NodeHandler`
pub fn handler_mut(&mut self) -> &mut THandler{
&mut self.handler
}

/// Injects an event to the handler.
#[inline]
pub fn inject_event(&mut self, event: THandler::InEvent) {
Expand Down Expand Up @@ -286,52 +296,88 @@ where
}
}
}

Ok(Async::NotReady)
}
}

#[cfg(test)]
mod tests {
use super::*;
use muxing::{StreamMuxer, Shutdown};
use std::marker::PhantomData;
use tokio::runtime::current_thread;
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use tests::dummy_handler::{Handler, HandlerState, Event};
use std::marker::PhantomData;

// Concrete `HandledNode`
type TestHandledNode = HandledNode<DummyMuxer, Handler>;

// TODO: move somewhere? this could be useful as a dummy
struct InstaCloseMuxer;
impl StreamMuxer for InstaCloseMuxer {
type Substream = ();
type OutboundSubstream = ();
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> { Ok(Async::Ready(None)) }
fn open_outbound(&self) -> Self::OutboundSubstream { () }
fn poll_outbound(&self, _: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> { Ok(Async::Ready(None)) }
fn destroy_outbound(&self, _: Self::OutboundSubstream) {}
fn read_substream(&self, _: &mut Self::Substream, _: &mut [u8]) -> Poll<usize, IoError> { panic!() }
fn write_substream(&self, _: &mut Self::Substream, _: &[u8]) -> Poll<usize, IoError> { panic!() }
fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { panic!() }
fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { panic!() }
fn destroy_substream(&self, _: Self::Substream) { panic!() }
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { Ok(Async::Ready(())) }
fn flush_all(&self) -> Poll<(), IoError> { Ok(Async::Ready(())) }
struct TestBuilder {
muxer: DummyMuxer,
handler: Handler,
want_open_substream: bool,
substream_user_data: usize,
}

impl TestBuilder {
fn new() -> Self {
TestBuilder {
muxer: DummyMuxer::new(),
handler: Handler::default(),
want_open_substream: false,
substream_user_data: 0,
}
}

fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
self.muxer.set_inbound_connection_state(state);
self
}

fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
self.muxer.set_outbound_connection_state(state);
self
}

fn with_handler_state(&mut self, state: HandlerState) -> &mut Self {
self.handler.state = Some(state);
self
}

fn with_open_substream(&mut self, user_data: usize) -> &mut Self {
self.want_open_substream = true;
self.substream_user_data = user_data;
self
}

fn handled_node(&mut self) -> TestHandledNode {
let mut h = HandledNode::new(self.muxer.clone(), self.handler.clone());
if self.want_open_substream {
h.node.get_mut().open_substream(self.substream_user_data).expect("open substream should work");
}
h
}
}

// Set the state of the `Handler` after `inject_outbound_closed` is called
fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_state: HandlerState) {
handled_node.handler.next_outbound_state = Some(next_state);
}

#[test]
fn proper_shutdown() {
// Test that `shutdown()` is properly called on the handler once a node stops.
struct Handler<T> {
struct ShutdownHandler<T> {
did_substream_attempt: bool,
inbound_closed: bool,
substream_attempt_cancelled: bool,
shutdown_called: bool,
marker: PhantomData<T>,
};
impl<T> NodeHandler for Handler<T> {
marker: PhantomData<T>
}
impl<T> NodeHandler for ShutdownHandler<T> {
type InEvent = ();
type OutEvent = ();
type Substream = T;
type OutboundOpenInfo = ();
fn inject_substream(&mut self, _: T, _: NodeHandlerEndpoint<()>) { panic!() }
fn inject_substream(&mut self, _: Self::Substream, _: NodeHandlerEndpoint<Self::OutboundOpenInfo>) { panic!() }
fn inject_inbound_closed(&mut self) {
assert!(!self.inbound_closed);
self.inbound_closed = true;
Expand All @@ -357,13 +403,20 @@ mod tests {
}
}
}
impl<T> Drop for Handler<T> {

impl<T> Drop for ShutdownHandler<T> {
fn drop(&mut self) {
assert!(self.shutdown_called);
if self.did_substream_attempt {
assert!(self.shutdown_called);
}
}
}

let handled = HandledNode::new(InstaCloseMuxer, Handler {
// Test that `shutdown()` is properly called on the handler once a node stops.
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Closed);
muxer.set_outbound_connection_state(DummyConnectionState::Closed);
let handled = HandledNode::new(muxer, ShutdownHandler {
did_substream_attempt: false,
inbound_closed: false,
substream_attempt_cancelled: false,
Expand All @@ -373,4 +426,227 @@ mod tests {

current_thread::Runtime::new().unwrap().block_on(handled.for_each(|_| Ok(()))).unwrap();
}

#[test]
fn can_inject_event() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.handled_node();

let event = Event::Custom("banana");
handled.inject_event(event.clone());
assert_eq!(handled.handler().events, vec![event]);
}

#[test]
fn knows_if_inbound_is_closed() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop
.handled_node();
handled.poll().expect("poll failed");
assert!(!handled.is_inbound_open())
}

#[test]
fn knows_if_outbound_is_closed() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop
.with_open_substream(987) // without at least one substream we do not poll_outbound so we never get the event
.handled_node();

handled.poll().expect("poll failed");
assert!(!handled.is_outbound_open());
}

#[test]
fn is_shutting_down_is_true_when_called_shutdown_on_the_handled_node() {
let mut handled = TestBuilder::new()
.with_handler_state(HandlerState::Ready(None)) // Stop the loop towards the end of the first run
.handled_node();
assert!(!handled.is_shutting_down());
handled.poll().expect("poll should work");
handled.shutdown();
assert!(handled.is_shutting_down());
}

#[test]
fn is_shutting_down_is_true_when_in_and_outbounds_are_closed() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_open_substream(123) // avoid infinite loop
.handled_node();

handled.poll().expect("poll should work");

// Shutting down (in- and outbound are closed, and the handler is shutdown)
assert!(handled.is_shutting_down());
}

#[test]
fn is_shutting_down_is_true_when_handler_is_gone() {
// when in-/outbound NodeStreams are open or Async::Ready(None) we reach the handlers `poll()` and initiate shutdown.
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Pending)
.with_handler_state(HandlerState::Ready(None)) // avoid infinite loop
.handled_node();

handled.poll().expect("poll should work");

assert!(handled.is_shutting_down());
}

#[test]
fn is_shutting_down_is_true_when_handler_is_gone_even_if_in_and_outbounds_are_open() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Opened)
.with_muxer_outbound_state(DummyConnectionState::Opened)
.with_open_substream(123)
.with_handler_state(HandlerState::Ready(None))
.handled_node();

handled.poll().expect("poll should work");

assert!(handled.is_shutting_down());
}

#[test]
fn poll_with_unready_node_stream_polls_handler() {
let mut handled = TestBuilder::new()
// make NodeStream return NotReady
.with_muxer_inbound_state(DummyConnectionState::Pending)
// make Handler return return Ready(None) so we break the infinite loop
.with_handler_state(HandlerState::Ready(None))
.handled_node();

assert_matches!(handled.poll(), Ok(Async::Ready(None)));
}

#[test]
fn poll_with_unready_node_stream_and_handler_emits_custom_event() {
let expected_event = Some(NodeHandlerEvent::Custom(Event::Custom("pineapple")));
let mut handled = TestBuilder::new()
// make NodeStream return NotReady
.with_muxer_inbound_state(DummyConnectionState::Pending)
// make Handler return return Ready(Some(…))
.with_handler_state(HandlerState::Ready(expected_event))
.handled_node();

assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => {
assert_matches!(event, Event::Custom("pineapple"))
});
}

#[test]
fn handler_emits_outbound_closed_when_opening_new_substream_on_closed_node() {
let open_event = Some(NodeHandlerEvent::OutboundSubstreamRequest(456));
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Ready(open_event))
.handled_node();

set_next_handler_outbound_state(
&mut handled,
HandlerState::Ready(Some(NodeHandlerEvent::Custom(Event::Custom("pear"))))
);
handled.poll().expect("poll works");
assert_eq!(handled.handler().events, vec![Event::OutboundClosed]);
}

#[test]
fn poll_returns_not_ready_when_node_stream_and_handler_is_not_ready() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_open_substream(12)
.with_handler_state(HandlerState::NotReady)
.handled_node();

// Under the hood, this is what happens when calling `poll()`:
// - we reach `node.poll_inbound()` and because the connection is
// closed, `inbound_finished` is set to true.
// - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls
// `inject_inbound_close`, but that's irrelevant here)
// - back in `poll()` we cal `handler.poll()` which does nothing because
// `HandlerState` is `NotReady`: loop continues
// - polls the node again which now skips the inbound block because
// `inbound_finished` is true.
// - Now `poll_outbound()` is called which returns `Async::Ready(None)`
// and sets `outbound_finished` to true. …calls destroy_outbound and
// yields Ready(OutboundClosed) …so the HandledNode calls
// `inject_outbound_closed`.
// - Now we have `inbound_finished` and `outbound_finished` set (and no
// more outbound substreams).
// - Next we poll the handler again which again does nothing because
// HandlerState is NotReady (and the node is still there)
// - HandledNode polls the node again: we skip inbound and there are no
// more outbound substreams so we skip that too; the addr is now
// Resolved so that part is skipped too
// - We reach the last section and the NodeStream yields Async::Ready(None)
// - Back in HandledNode the Async::Ready(None) triggers a shutdown
// – …and causes the Handler to yield Async::Ready(None)
// – which in turn makes the HandledNode to yield Async::Ready(None) as well
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
assert_eq!(handled.handler().events, vec![
Event::InboundClosed, Event::OutboundClosed
]);
}

#[test]
fn poll_yields_inbound_closed_event() {
let mut h = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler().events, vec![Event::InboundClosed]);
}

#[test]
fn poll_yields_outbound_closed_event() {
let mut h = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_open_substream(32)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler().events, vec![Event::OutboundClosed]);
}

#[test]
fn poll_yields_outbound_substream() {
let mut h = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Opened)
.with_open_substream(1)
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler().events, vec![Event::Substream(Some(1))]);
}

#[test]
fn poll_yields_inbound_substream() {
let mut h = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Opened)
.with_muxer_outbound_state(DummyConnectionState::Pending)
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler().events, vec![Event::Substream(None)]);
}
}
1 change: 0 additions & 1 deletion core/src/nodes/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ where
}
}
}

// Closing the node if there's no way we can do anything more.
if self.inbound_state == StreamState::Closed
&& self.outbound_state == StreamState::Closed
Expand Down
Loading

0 comments on commit 437a8c0

Please sign in to comment.