diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index deafe3fd473..2020833bdba 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -386,7 +386,7 @@ impl CollectionStream { - println!("[CollectionStream, poll] TaskClosed id={:?}, result={:?} – returning Async::Ready", id, result); + println!("[CollectionStream, poll] TaskClosed id={:?}, result={:?} – returning Async::Ready(…) <––", id, result); match (self.tasks.remove(&id), result, handler) { (Some(TaskState::Pending), Err(err), Some(handler)) => { println!("[CollectionStream, poll] TaskState::Pending with error"); @@ -410,7 +410,7 @@ impl CollectionStream { - println!("[CollectionStream, poll] TaskState::Connected, Err({:?}", err); + println!("[CollectionStream, poll] TaskState::Connected, Err({:?} <–––", err); debug_assert!(_handler.is_none()); let _node_task_id = self.nodes.remove(&peer_id); debug_assert_eq!(_node_task_id, Some(id)); diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index 9ec6006723b..5e988e0c1db 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -294,7 +294,7 @@ where } } Async::Ready(Some(NodeHandlerEvent::Custom(event))) => { - println!("[HandledNode, poll] handler; Async::Ready(Some(Custom)); returning the event up the stack <––––"); + println!("[HandledNode, poll] handler; Async::Ready(Some(Custom)); returning the event up the stack"); return Ok(Async::Ready(Some(event))); } Async::Ready(None) => { diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index 5a9dc31ab6e..e73bc56cd3a 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -474,13 +474,13 @@ where } } Ok(Async::Ready(None)) => { - println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Async::Ready(None) – sending TaskClosed and returning Async::Ready(()) <–––"); + println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Async::Ready(None) – sending TaskClosed and returning Async::Ready(())"); let event = InToExtMessage::TaskClosed(Ok(()), None); let _ = self.events_tx.unbounded_send((event, self.id)); return Ok(Async::Ready(())); // End the task. } Err(err) => { - println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Err"); + println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Err – Sending TaskClosed(Err) <–––"); let event = InToExtMessage::TaskClosed(Err(err), None); let _ = self.events_tx.unbounded_send((event, self.id)); return Ok(Async::Ready(())); // End the task. diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 70c77bf677d..99cffcecaa9 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -250,7 +250,7 @@ where .finish() } RawSwarmEvent::NodeEvent { ref peer_id, ref event } => { - f.debug_struct("UnknownPeerDialError") + f.debug_struct("NodeEvent") .field("peer_id", peer_id) .field("event", event) .finish() @@ -691,8 +691,9 @@ where out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint }; } Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => { - println!("[RawSwarm, poll] active_nodes.poll() – Ready(NodeEvent) – returning out event and continuing"); + println!("[RawSwarm, poll] active_nodes.poll() – Ready(CollectionEvent::NodeEvent) – returning out event wrapped in a NodeEvent and continuing"); action = Default::default(); + println!("[RawSwarm, poll] active_nodes.poll() – Ready(CollectionEvent::NodeEvent) – default action"); out_event = RawSwarmEvent::NodeEvent { peer_id, event }; } }; @@ -1301,7 +1302,7 @@ mod tests { use super::*; use std::sync::Arc; use parking_lot::Mutex; - use tokio::runtime::Runtime; + use tokio::runtime::{Builder, Runtime}; use PublicKey; use tests::dummy_transport::DummyTransport; use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent}; @@ -1598,4 +1599,112 @@ mod tests { })).expect("tokio works"); } } + + #[test] + fn yields_node_error_when_there_is_an_error_after_successful_connect() { + let mut transport = DummyTransport::new(); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 159 }).collect()).into_peer_id(); + transport.set_next_peer_id(&peer_id); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport))); + + { + // Set up an outgoing connection with a PeerId we know + let swarm1 = swarm.clone(); + let mut swarm1 = swarm1.lock(); + let peer = swarm1.peer(peer_id.clone()); + let addr = "/unix/reachable".parse().expect("bad multiaddr"); + let mut handler = Handler::default(); + // Force an error + handler.next_states = vec![ HandlerState::Err ]; + peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer"); + } + + // Ensure we run on a single thread + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Drive it forward until we connect to the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + // Push the Handler into an error state on the next poll + swarm.broadcast_event(&InEvent::NextState); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::Connected { .. }); + // We're connected, we can move on + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } + + // Poll again. It is going to be a NodeError because of how the + // handler's next state was set up. + let swarm_fut = swarm.clone(); + let expected_peer_id = peer_id.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeError { peer_id, .. }) => { + assert_eq!(peer_id, expected_peer_id); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + } + + #[test] + fn yields_node_closed_when_the_node_closes_after_successful_connect() { + let mut transport = DummyTransport::new(); + let peer_id = PublicKey::Rsa((0 .. 128).map(|_| -> u8 { 159 }).collect()).into_peer_id(); + transport.set_next_peer_id(&peer_id); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport))); + + { + // Set up an outgoing connection with a PeerId we know + let swarm1 = swarm.clone(); + let mut swarm1 = swarm1.lock(); + let peer = swarm1.peer(peer_id.clone()); + let addr = "/unix/reachable".parse().expect("bad multiaddr"); + let mut handler = Handler::default(); + // Force handler to close + handler.next_states = vec![ HandlerState::Ready(None) ]; + peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer"); + } + + // Ensure we run on a single thread + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Drive it forward until we connect to the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + // Push the Handler into the closed state on the next poll + swarm.broadcast_event(&InEvent::NextState); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::Connected { .. }); + // We're connected, we can move on + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } + + // Poll again. It is going to be a NodeClosed because of how the + // handler's next state was set up. + let swarm_fut = swarm.clone(); + let expected_peer_id = peer_id.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => { + assert_eq!(peer_id, expected_peer_id); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + } } diff --git a/core/src/tests/dummy_handler.rs b/core/src/tests/dummy_handler.rs index 1000fa8a59c..6752740a243 100644 --- a/core/src/tests/dummy_handler.rs +++ b/core/src/tests/dummy_handler.rs @@ -138,7 +138,7 @@ impl NodeHandler for Handler { HandlerState::NotReady => Ok(Async::NotReady), HandlerState::Ready(None) => Ok(Async::Ready(None)), HandlerState::Ready(Some(event)) => Ok(Async::Ready(Some(event.clone()))), - HandlerState::Err => {Err(io::Error::new(io::ErrorKind::Other, "oh noes"))}, + HandlerState::Err => Err(io::Error::new(io::ErrorKind::Other, "oh noes")), }, None => Ok(Async::NotReady) }