Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More precise errors in the nodes module #765

Merged
merged 5 commits into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 56 additions & 39 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ use crate::{
muxing::StreamMuxer,
nodes::{
node::Substream,
handled_node_tasks::{HandledNodesEvent, HandledNodesTasks},
handled_node_tasks::{HandledNodesEvent, HandledNodesTasks, TaskClosedEvent},
handled_node_tasks::{Task as HandledNodesTask, TaskId},
handled_node::NodeHandler
handled_node::{HandledNodeError, NodeHandler}
}
};
use fnv::FnvHashMap;
use futures::prelude::*;
use std::{collections::hash_map::Entry, error, fmt, io, mem};
use std::{collections::hash_map::Entry, error, fmt, mem};

// TODO: make generic over PeerId

/// Implementation of `Stream` that handles a collection of nodes.
pub struct CollectionStream<TInEvent, TOutEvent, THandler> {
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
/// Object that handles the tasks.
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler>,
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>,
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
/// must always be in the `Connected` state.
nodes: FnvHashMap<PeerId, TaskId>,
Expand All @@ -46,7 +46,7 @@ pub struct CollectionStream<TInEvent, TOutEvent, THandler> {
tasks: FnvHashMap<TaskId, TaskState>,
}

impl<TInEvent, TOutEvent, THandler> fmt::Debug for CollectionStream<TInEvent, TOutEvent, THandler> {
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let mut list = f.debug_list();
for (id, task) in &self.tasks {
Expand All @@ -73,10 +73,10 @@ enum TaskState {
}

/// Event that can happen on the `CollectionStream`.
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a> {
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr> {
/// A connection to a node has succeeded. You must use the provided event in order to accept
/// the connection.
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler>),
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>),

/// A connection to a node has been closed.
///
Expand All @@ -94,15 +94,15 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a> {
/// Identifier of the node.
peer_id: PeerId,
/// The error that happened.
error: io::Error,
error: HandledNodeError<THandlerErr>,
},

/// An error happened on the future that was trying to reach a node.
ReachError {
/// Identifier of the reach attempt that failed.
id: ReachAttemptId,
/// Error that happened on the future.
error: io::Error,
error: TReachErr,
/// The handler that was passed to `add_reach_attempt`.
handler: THandler,
},
Expand All @@ -116,8 +116,10 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a> {
},
}

impl<'a, TInEvent, TOutEvent, THandler> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent, THandler>
where TOutEvent: fmt::Debug
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>
where TOutEvent: fmt::Debug,
TReachErr: fmt::Debug,
THandlerErr: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Expand Down Expand Up @@ -155,16 +157,16 @@ where TOutEvent: fmt::Debug

/// Event that happens when we reach a node.
#[must_use = "The node reached event is used to accept the newly-opened connection"]
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a> {
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr: 'a> {
/// Peer id we connected to.
peer_id: PeerId,
/// The task id that reached the node.
id: TaskId,
/// The `CollectionStream` we are referencing.
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler>,
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>,
}

impl<'a, TInEvent, TOutEvent, THandler> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler> {
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
/// Returns the peer id of the node that has been reached.
#[inline]
pub fn peer_id(&self) -> &PeerId {
Expand Down Expand Up @@ -227,7 +229,7 @@ impl<'a, TInEvent, TOutEvent, THandler> CollectionReachEvent<'a, TInEvent, TOutE
}
}

impl<'a, TInEvent, TOutEvent, THandler> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler> {
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("CollectionReachEvent")
.field("peer_id", &self.peer_id)
Expand All @@ -236,7 +238,7 @@ impl<'a, TInEvent, TOutEvent, THandler> fmt::Debug for CollectionReachEvent<'a,
}
}

impl<'a, TInEvent, TOutEvent, THandler> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler> {
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
fn drop(&mut self) {
let task_state = self.parent.tasks.remove(&self.id);
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
Expand All @@ -262,7 +264,7 @@ pub enum CollectionNodeAccept {
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReachAttemptId(TaskId);

impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandler> {
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
/// Creates a new empty collection.
#[inline]
pub fn new() -> Self {
Expand All @@ -280,9 +282,10 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
-> ReachAttemptId
where
TFut: Future<Item = (PeerId, TMuxer)> + Send + 'static,
TFut::Error: std::error::Error + Send + Sync + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr> + Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
Expand Down Expand Up @@ -369,7 +372,7 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
/// > borrowed if necessary.
pub fn poll(&mut self) -> Async<CollectionEvent<TInEvent, TOutEvent, THandler>> {
pub fn poll(&mut self) -> Async<CollectionEvent<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>> {
let item = match self.inner.poll() {
Async::Ready(item) => item,
Async::NotReady => return Async::NotReady,
Expand All @@ -378,16 +381,27 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
match item {
HandledNodesEvent::TaskClosed { id, result, handler } => {
match (self.tasks.remove(&id), result, handler) {
(Some(TaskState::Pending), Err(err), Some(handler)) => {
(Some(TaskState::Pending), Err(TaskClosedEvent::Reach(err)), Some(handler)) => {
Async::Ready(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: err,
handler,
})
},
(Some(TaskState::Pending), _, _) => {
// TODO: this variant shouldn't happen; prove this
panic!()
(Some(TaskState::Pending), Ok(()), _) => {
panic!("The API of HandledNodesTasks guarantees that a task cannot \
gracefully closed before being connected to a node, in which case \
its state should be Connected and not Pending; QED");
},
(Some(TaskState::Pending), Err(TaskClosedEvent::Node(_)), _) => {
panic!("We switch the task state to Connected once we're connected, and \
a TaskClosedEvent::Node can only happen after we're \
connected; QED");
},
(Some(TaskState::Pending), Err(TaskClosedEvent::Reach(_)), None) => {
// TODO: this could be improved in the API of HandledNodesTasks
panic!("The HandledNodesTasks is guaranteed to always return the handler \
when producing a TaskClosedEvent::Reach error");
},
(Some(TaskState::Connected(peer_id)), Ok(()), _handler) => {
debug_assert!(_handler.is_none());
Expand All @@ -397,7 +411,7 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
peer_id,
})
},
(Some(TaskState::Connected(peer_id)), Err(err), _handler) => {
(Some(TaskState::Connected(peer_id)), Err(TaskClosedEvent::Node(err)), _handler) => {
debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
Expand All @@ -406,6 +420,10 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
error: err,
})
},
(Some(TaskState::Connected(_)), Err(TaskClosedEvent::Reach(_)), _) => {
panic!("A TaskClosedEvent::Reach can only happen before we are connected \
to a node; therefore the TaskState won't be Connected; QED");
},
(None, _, _) => {
panic!("self.tasks is always kept in sync with the tasks in self.inner; \
when we add a task in self.inner we add a corresponding entry in \
Expand Down Expand Up @@ -506,11 +524,10 @@ mod tests {
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Builder;
use nodes::NodeHandlerEvent;
use std::sync::Arc;
use std::{io, sync::Arc};
use parking_lot::Mutex;
use void::Void;

type TestCollectionStream = CollectionStream<InEvent, OutEvent, Handler>;
type TestCollectionStream = CollectionStream<InEvent, OutEvent, Handler, io::Error, io::Error>;

#[test]
fn has_connection_is_false_before_a_connection_has_been_made() {
Expand All @@ -532,7 +549,7 @@ mod tests {
assert!(cs.peer_mut(&peer_id).is_none());

let handler = Handler::default();
let fut = future::ok::<_, Void>((peer_id.clone(), DummyMuxer::new()));
let fut = future::ok((peer_id.clone(), DummyMuxer::new()));
cs.add_reach_attempt(fut, handler);
assert!(cs.peer_mut(&peer_id).is_none()); // task is pending
}
Expand All @@ -546,7 +563,7 @@ mod tests {
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Opened);

let fut = future::ok::<_, Void>((peer_id, muxer));
let fut = future::ok((peer_id, muxer));
cs.add_reach_attempt(fut, Handler::default());
let mut rt = Runtime::new().unwrap();
let mut poll_count = 0;
Expand All @@ -570,7 +587,7 @@ mod tests {
fn accepting_a_node_yields_new_entry() {
let mut cs = TestCollectionStream::new();
let peer_id = PeerId::random();
let fut = future::ok::<_, Void>((peer_id.clone(), DummyMuxer::new()));
let fut = future::ok((peer_id.clone(), DummyMuxer::new()));
cs.add_reach_attempt(fut, Handler::default());

let mut rt = Runtime::new().unwrap();
Expand Down Expand Up @@ -622,7 +639,7 @@ mod tests {
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Opened);

let fut = future::ok::<_, Void>((task_peer_id.clone(), muxer));
let fut = future::ok((task_peer_id.clone(), muxer));
cs.lock().add_reach_attempt(fut, handler);

let mut rt = Builder::new().core_threads(1).build().unwrap();
Expand Down Expand Up @@ -710,7 +727,7 @@ mod tests {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer));
let task_inner_fut = future::ok((peer_id.clone(), muxer));
let mut handler = Handler::default();
handler.next_states = vec![HandlerState::Err]; // triggered when sending a NextState event

Expand Down Expand Up @@ -756,7 +773,7 @@ mod tests {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer));
let task_inner_fut = future::ok((peer_id.clone(), muxer));
let mut handler = Handler::default();
handler.next_states = vec![HandlerState::Ready(None)]; // triggered when sending a NextState event

Expand Down Expand Up @@ -802,7 +819,7 @@ mod tests {
#[test]
fn interrupting_a_pending_connection_attempt_is_ok() {
let mut cs = TestCollectionStream::new();
let fut = future::empty::<_, Void>();
let fut = future::empty();
let reach_id = cs.add_reach_attempt(fut, Handler::default());
let interrupt = cs.interrupt(reach_id);
assert!(interrupt.is_ok());
Expand All @@ -811,7 +828,7 @@ mod tests {
#[test]
fn interrupting_a_connection_attempt_twice_is_err() {
let mut cs = TestCollectionStream::new();
let fut = future::empty::<_, Void>();
let fut = future::empty();
let reach_id = cs.add_reach_attempt(fut, Handler::default());
assert!(cs.interrupt(reach_id).is_ok());
assert_matches!(cs.interrupt(reach_id), Err(InterruptError::ReachAttemptNotFound))
Expand All @@ -822,7 +839,7 @@ mod tests {
let cs = Arc::new(Mutex::new(TestCollectionStream::new()));
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
let task_inner_fut = future::ok::<_, Void>((peer_id.clone(), muxer));
let task_inner_fut = future::ok((peer_id.clone(), muxer));
let handler = Handler::default();

let reach_id = cs.lock().add_reach_attempt(task_inner_fut, handler);
Expand Down
Loading