Skip to content

Commit

Permalink
Test error conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
dvdplm committed Nov 2, 2018
1 parent f5dc68c commit 1fb4677
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 9 deletions.
4 changes: 2 additions & 2 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl

match item {
HandledNodesEvent::TaskClosed { id, result, handler } => {
println!("[CollectionStream, poll] TaskClosed id={:?}, result={:?} – returning Async::Ready", id, result);
println!("[CollectionStream, poll] TaskClosed id={:?}, result={:?} – returning Async::Ready(…) <––", id, result);
match (self.tasks.remove(&id), result, handler) {
(Some(TaskState::Pending), Err(err), Some(handler)) => {
println!("[CollectionStream, poll] TaskState::Pending with error");
Expand All @@ -410,7 +410,7 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
})
},
(Some(TaskState::Connected(peer_id)), Err(err), _handler) => {
println!("[CollectionStream, poll] TaskState::Connected, Err({:?}", err);
println!("[CollectionStream, poll] TaskState::Connected, Err({:?} <–––", err);
debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
Expand Down
2 changes: 1 addition & 1 deletion core/src/nodes/handled_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ where
}
}
Async::Ready(Some(NodeHandlerEvent::Custom(event))) => {
println!("[HandledNode, poll] handler; Async::Ready(Some(Custom)); returning the event up the stack <––––");
println!("[HandledNode, poll] handler; Async::Ready(Some(Custom)); returning the event up the stack");
return Ok(Async::Ready(Some(event)));
}
Async::Ready(None) => {
Expand Down
4 changes: 2 additions & 2 deletions core/src/nodes/handled_node_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,13 @@ where
}
}
Ok(Async::Ready(None)) => {
println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Async::Ready(None) – sending TaskClosed and returning Async::Ready(()) <–––");
println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Async::Ready(None) – sending TaskClosed and returning Async::Ready(())");
let event = InToExtMessage::TaskClosed(Ok(()), None);
let _ = self.events_tx.unbounded_send((event, self.id));
return Ok(Async::Ready(())); // End the task.
}
Err(err) => {
println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Err");
println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Err – Sending TaskClosed(Err) <–––");
let event = InToExtMessage::TaskClosed(Err(err), None);
let _ = self.events_tx.unbounded_send((event, self.id));
return Ok(Async::Ready(())); // End the task.
Expand Down
115 changes: 112 additions & 3 deletions core/src/nodes/raw_swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ where
.finish()
}
RawSwarmEvent::NodeEvent { ref peer_id, ref event } => {
f.debug_struct("UnknownPeerDialError")
f.debug_struct("NodeEvent")
.field("peer_id", peer_id)
.field("event", event)
.finish()
Expand Down Expand Up @@ -691,8 +691,9 @@ where
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
}
Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => {
println!("[RawSwarm, poll] active_nodes.poll() – Ready(NodeEvent) – returning out event and continuing");
println!("[RawSwarm, poll] active_nodes.poll() – Ready(CollectionEvent::NodeEvent) – returning out event wrapped in a NodeEvent and continuing");
action = Default::default();
println!("[RawSwarm, poll] active_nodes.poll() – Ready(CollectionEvent::NodeEvent) – default action");
out_event = RawSwarmEvent::NodeEvent { peer_id, event };
}
};
Expand Down Expand Up @@ -1301,7 +1302,7 @@ mod tests {
use super::*;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::runtime::Runtime;
use tokio::runtime::{Builder, Runtime};
use PublicKey;
use tests::dummy_transport::DummyTransport;
use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent};
Expand Down Expand Up @@ -1598,4 +1599,112 @@ mod tests {
})).expect("tokio works");
}
}

#[test]
fn yields_node_error_when_there_is_an_error_after_successful_connect() {
let mut transport = DummyTransport::new();
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 159 }).collect()).into_peer_id();
transport.set_next_peer_id(&peer_id);
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport)));

{
// Set up an outgoing connection with a PeerId we know
let swarm1 = swarm.clone();
let mut swarm1 = swarm1.lock();
let peer = swarm1.peer(peer_id.clone());
let addr = "/unix/reachable".parse().expect("bad multiaddr");
let mut handler = Handler::default();
// Force an error
handler.next_states = vec![ HandlerState::Err ];
peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer");
}

// Ensure we run on a single thread
let mut rt = Builder::new().core_threads(1).build().unwrap();

// Drive it forward until we connect to the node.
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
// Push the Handler into an error state on the next poll
swarm.broadcast_event(&InEvent::NextState);
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::Connected { .. });
// We're connected, we can move on
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}

// Poll again. It is going to be a NodeError because of how the
// handler's next state was set up.
let swarm_fut = swarm.clone();
let expected_peer_id = peer_id.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeError { peer_id, .. }) => {
assert_eq!(peer_id, expected_peer_id);
});
Ok(Async::Ready(()))
})).expect("tokio works");
}

#[test]
fn yields_node_closed_when_the_node_closes_after_successful_connect() {
let mut transport = DummyTransport::new();
let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 159 }).collect()).into_peer_id();
transport.set_next_peer_id(&peer_id);
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport)));

{
// Set up an outgoing connection with a PeerId we know
let swarm1 = swarm.clone();
let mut swarm1 = swarm1.lock();
let peer = swarm1.peer(peer_id.clone());
let addr = "/unix/reachable".parse().expect("bad multiaddr");
let mut handler = Handler::default();
// Force handler to close
handler.next_states = vec![ HandlerState::Ready(None) ];
peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer");
}

// Ensure we run on a single thread
let mut rt = Builder::new().core_threads(1).build().unwrap();

// Drive it forward until we connect to the node.
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
// Push the Handler into the closed state on the next poll
swarm.broadcast_event(&InEvent::NextState);
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::Connected { .. });
// We're connected, we can move on
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}

// Poll again. It is going to be a NodeClosed because of how the
// handler's next state was set up.
let swarm_fut = swarm.clone();
let expected_peer_id = peer_id.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => {
assert_eq!(peer_id, expected_peer_id);
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
}
2 changes: 1 addition & 1 deletion core/src/tests/dummy_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl NodeHandler for Handler {
HandlerState::NotReady => Ok(Async::NotReady),
HandlerState::Ready(None) => Ok(Async::Ready(None)),
HandlerState::Ready(Some(event)) => Ok(Async::Ready(Some(event.clone()))),
HandlerState::Err => {Err(io::Error::new(io::ErrorKind::Other, "oh noes"))},
HandlerState::Err => Err(io::Error::new(io::ErrorKind::Other, "oh noes")),
},
None => Ok(Async::NotReady)
}
Expand Down

0 comments on commit 1fb4677

Please sign in to comment.