Skip to content

Commit

Permalink
Fix the polling process in handled node (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka authored Oct 20, 2018
1 parent 46dd6b6 commit de26ba1
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions core/src/nodes/handled_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ where
node: Fuse<NodeStream<TMuxer, THandler::OutboundOpenInfo>>,
/// Handler that processes substreams.
handler: THandler,
/// If true, `handler` has returned `Ready(None)` and therefore shouldn't be polled again.
handler_is_done: bool,
// True, if the node is shutting down.
is_shutting_down: bool
}
Expand All @@ -158,6 +160,7 @@ where
HandledNode {
node: NodeStream::new(muxer).fuse(),
handler,
handler_is_done: false,
is_shutting_down: false
}
}
Expand Down Expand Up @@ -196,13 +199,11 @@ where
/// After this method returns, `is_shutting_down()` should return true.
pub fn shutdown(&mut self) {
self.node.get_mut().shutdown_all();
self.is_shutting_down = true;

for user_data in self.node.get_mut().cancel_outgoing() {
self.handler.inject_outbound_closed(user_data);
}

self.handler.shutdown()
self.handler.shutdown();
self.is_shutting_down = true;
}
}

Expand All @@ -216,10 +217,14 @@ where

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.node.is_done() && self.handler_is_done {
return Ok(Async::Ready(None));
}

let mut node_not_ready = false;

match self.node.poll()? {
Async::NotReady => (),
Async::NotReady => node_not_ready = true,
Async::Ready(Some(NodeEvent::InboundSubstream { substream })) => {
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener)
}
Expand All @@ -228,8 +233,8 @@ where
self.handler.inject_substream(substream, endpoint)
}
Async::Ready(None) => {
node_not_ready = true;
if !self.is_shutting_down {
self.is_shutting_down = true;
self.handler.shutdown()
}
}
Expand All @@ -241,7 +246,7 @@ where
}
}

match self.handler.poll()? {
match if self.handler_is_done { Async::Ready(None) } else { self.handler.poll()? } {
Async::NotReady => {
if node_not_ready {
break
Expand All @@ -261,7 +266,12 @@ where
return Ok(Async::Ready(Some(event)));
}
Async::Ready(None) => {
return Ok(Async::Ready(None))
self.handler_is_done = true;
if !self.is_shutting_down {
self.is_shutting_down = true;
self.node.get_mut().cancel_outgoing();
self.node.get_mut().shutdown_all();
}
}
}
}
Expand Down

0 comments on commit de26ba1

Please sign in to comment.