Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dp/chore/test-raw…
Browse files Browse the repository at this point in the history
…_swarm

* upstream/master:
  Add a IdentifyTransport (libp2p#569)
  Tests for HandledNode (libp2p#546)
  Some minor fixes reported by clippy (libp2p#600)
  • Loading branch information
dvdplm committed Nov 2, 2018
2 parents 1fb4677 + 4225d26 commit 83fcc90
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 34 deletions.
4 changes: 2 additions & 2 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
Entry::Vacant(_) => Err(()),
Entry::Occupied(entry) => {
match entry.get() {
&TaskState::Connected(_) => return Err(()),
&TaskState::Pending => (),
TaskState::Connected(_) => return Err(()),
TaskState::Pending => (),
};

entry.remove();
Expand Down
44 changes: 24 additions & 20 deletions core/src/nodes/handled_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ where
}
}

/// Returns a reference to the `NodeHandler`
pub fn handler(&self) -> &THandler{
&self.handler
}

/// Returns a mutable reference to the `NodeHandler`
pub fn handler_mut(&mut self) -> &mut THandler{
&mut self.handler
}

/// Injects an event to the handler.
#[inline]
pub fn inject_event(&mut self, event: THandler::InEvent) {
Expand Down Expand Up @@ -375,10 +385,6 @@ mod tests {
}
}

fn did_see_event(handled_node: &mut TestHandledNode, event: &InEvent) -> bool {
handled_node.handler.events.contains(event)
}

// Set the state of the `Handler` after `inject_outbound_closed` is called
fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_state: HandlerState) {
handled_node.handler.next_outbound_state = Some(next_state);
Expand Down Expand Up @@ -456,7 +462,7 @@ mod tests {

let event = InEvent::Custom("banana");
handled.inject_event(event.clone());
assert!(did_see_event(&mut handled, &event));
assert_eq!(handled.handler().events, vec![event]);
}

#[test]
Expand Down Expand Up @@ -558,9 +564,7 @@ mod tests {
.handled_node();

assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => {
assert_matches!(event, OutEvent::Custom(s) => {
assert_eq!(s, "pineapple");
});
assert_matches!(event, OutEvent::Custom("pineapple"))
});
}

Expand All @@ -578,7 +582,7 @@ mod tests {
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("pear"))))
);
handled.poll().expect("poll works");
assert_eq!(handled.handler.events, vec![InEvent::OutboundClosed]);
assert_eq!(handled.handler().events, vec![InEvent::OutboundClosed]);
}

#[test]
Expand All @@ -595,8 +599,8 @@ mod tests {
// closed, `inbound_finished` is set to true.
// - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls
// `inject_inbound_close`, but that's irrelevant here)
// - back in `poll()` we call `handler.poll()` which does nothing
// because `HandlerState` is `NotReady`: loop continues
// - back in `poll()` we call `handler.poll()` which does nothing because
// `HandlerState` is `NotReady`: loop continues
// - polls the node again which now skips the inbound block because
// `inbound_finished` is true.
// - Now `poll_outbound()` is called which returns `Async::Ready(None)`
Expand All @@ -615,7 +619,7 @@ mod tests {
// – …and causes the Handler to yield Async::Ready(None)
// – which in turn makes the HandledNode to yield Async::Ready(None) as well
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
assert_eq!(handled.handler.events, vec![
assert_eq!(handled.handler().events, vec![
InEvent::InboundClosed, InEvent::OutboundClosed
]);
}
Expand All @@ -627,9 +631,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::InboundClosed]);
assert_eq!(h.handler().events, vec![InEvent::InboundClosed]);
}

#[test]
Expand All @@ -641,9 +645,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::OutboundClosed]);
assert_eq!(h.handler().events, vec![InEvent::OutboundClosed]);
}

#[test]
Expand All @@ -655,9 +659,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::Substream(Some(1))]);
assert_eq!(h.handler().events, vec![InEvent::Substream(Some(1))]);
}

#[test]
Expand All @@ -668,8 +672,8 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::Substream(None)]);
assert_eq!(h.handler().events, vec![InEvent::Substream(None)]);
}
}
8 changes: 3 additions & 5 deletions core/src/nodes/handled_node_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<TInEvent, TOutEvent, THandler> HandledNodesTasks<TInEvent, TOutEvent, THand
/// Returns `None` if the task id is invalid.
#[inline]
pub fn task(&mut self, id: TaskId) -> Option<Task<TInEvent>> {
match self.tasks.entry(id.clone()) {
match self.tasks.entry(id) {
Entry::Occupied(inner) => Some(Task { inner }),
Entry::Vacant(_) => None,
}
Expand Down Expand Up @@ -404,8 +404,7 @@ where
println!("[NodeTask, poll] NodeTaskInner::Future: AsyncReady(Some) – injecting event");
node.inject_event(event);
}
if let Err(e) = self.events_tx.unbounded_send((event, self.id)) {
println!("[NodeTask, poll] NodeTaskInner::Future: AsyncReady(Some) – Error sending NodeReached={:?}", e);
if self.events_tx.unbounded_send((event, self.id)).is_err() {
node.shutdown();
}
self.inner = NodeTaskInner::Node(node);
Expand Down Expand Up @@ -468,8 +467,7 @@ where
// The only possible event here is a NodeHandlerEvent::Custom(event)
println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Async::Ready(Some) –– sending a NodeEvent on events_tx and keep looping");
let event = InToExtMessage::NodeEvent(event);
if let Err(e) = self.events_tx.unbounded_send((event, self.id)) {
println!("[NodeTask, poll] NodeTaskInner::Node; polled handled node; Async::Ready(Some); error sending on events_tx channel={:?}. Shutting down the node.", e);
if self.events_tx.unbounded_send((event, self.id)).is_err() {
node.shutdown();
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/nodes/raw_swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ impl<'a, TInEvent, TOutEvent, THandler> PeerPendingConnect<'a, TInEvent, TOutEve
#[inline]
pub fn interrupt(self) {
let attempt = self.attempt.remove();
if let Err(_) = self.active_nodes.interrupt(attempt.id) {
if self.active_nodes.interrupt(attempt.id).is_err() {
// TODO: improve proof or remove; this is too complicated right now
panic!("We retreived this attempt.id from out_reach_attempts. We insert in \
out_reach_attempts only at the same time as we call add_reach_attempt. \
Expand Down
12 changes: 6 additions & 6 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ where

let (listening_stream, new_addr) = match self.transport.listen_on(addr) {
Ok((l, new_addr)) => (l, new_addr),
Err((trans, addr)) => {
Err((transport, addr)) => {
let builder = AndThen {
transport: trans,
upgrade: upgrade,
transport,
upgrade,
};

return Err((builder, addr));
Expand Down Expand Up @@ -96,10 +96,10 @@ where

let dialed_fut = match self.transport.dial(addr.clone()) {
Ok(f) => f,
Err((trans, addr)) => {
Err((transport, addr)) => {
let builder = AndThen {
transport: trans,
upgrade: upgrade,
transport,
upgrade,
};

return Err((builder, addr));
Expand Down
Loading

0 comments on commit 83fcc90

Please sign in to comment.