Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dp/chore/test-col…
Browse files Browse the repository at this point in the history
…lection_stream

* upstream/master:
  Use vecdeque for fair polling (libp2p#610)
  Add back the Swarm (libp2p#613)
  Remove dependency on tokio_current_thread (libp2p#606)
  Use chashmap from crates (libp2p#615)
  Fix grammar in core/nodes etc. (libp2p#608)
  Inject event by value in ProtocolsHandler (libp2p#605)
  Add a PeriodicPingHandler and a PingListenHandler (libp2p#574)
  Fix stack overflow when printing a SubstreamRef (libp2p#599)
  Add a peer id generator (libp2p#583)
  eg. -> e.g.; ie. -> i.e. via repren (libp2p#592)
  Add a IdentifyTransport (libp2p#569)
  Tests for HandledNode (libp2p#546)
  Some minor fixes reported by clippy (libp2p#600)
  Add a PeriodicIdentification protocol handler (libp2p#579)
  Add ProtocolsHandler trait (libp2p#573)
  Use paritytech/rust-secp256k1 (libp2p#598)
  Use websocket 0.21.0 (libp2p#597)
  Reexport multihash from the facade (libp2p#587)
  Add substrate to the list of projects using libp2p (libp2p#595)
  Remove spaces before semicolons (libp2p#591)
  • Loading branch information
dvdplm committed Nov 9, 2018
2 parents 0548c25 + d961e65 commit 3cb793c
Show file tree
Hide file tree
Showing 52 changed files with 2,745 additions and 654 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ secio-secp256k1 = ["libp2p-secio/secp256k1"]
bytes = "0.4"
futures = "0.1"
multiaddr = { path = "./misc/multiaddr" }
multihash = { path = "./misc/multihash" }
libp2p-mplex = { path = "./muxers/mplex" }
libp2p-identify = { path = "./protocols/identify" }
libp2p-kad = { path = "./protocols/kad" }
Expand All @@ -34,7 +35,6 @@ tokio-io = "0.1"
[target.'cfg(not(target_os = "emscripten"))'.dependencies]
libp2p-dns = { path = "./transports/dns" }
libp2p-tcp-transport = { path = "./transports/tcp" }
tokio-current-thread = "0.1"

[target.'cfg(target_os = "emscripten")'.dependencies]
stdweb = { version = "0.1.3", default-features = false }
Expand All @@ -54,6 +54,7 @@ members = [
"misc/multiaddr",
"misc/multihash",
"misc/multistream-select",
"misc/peer-id-generator",
"misc/rw-stream-sink",
"transports/dns",
"protocols/floodsub",
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ libp2p = { git = "https://github.com/libp2p/rust-libp2p" }
(open a pull request if you want your project to be added here)

- https://github.com/paritytech/polkadot
- https://github.com/paritytech/substrate
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rw-stream-sink = { path = "../misc/rw-stream-sink" }
smallvec = "0.6"
tokio-executor = "0.1.4"
tokio-io = "0.1"
tokio-timer = "0.2"
void = "1"

[dev-dependencies]
Expand Down
8 changes: 4 additions & 4 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//!
//! The main trait that this crate provides is `Transport`, which provides the `dial` and
//! `listen_on` methods and can be used to dial or listen on a multiaddress. The `swarm` crate
//! itself does not provide any concrete (ie. non-dummy, non-adapter) implementation of this trait.
//! itself does not provide any concrete (i.e. non-dummy, non-adapter) implementation of this trait.
//! It is implemented on structs that are provided by external crates, such as `TcpConfig` from
//! `tcp-transport`, `UdpConfig`, or `WebsocketConfig` (note: as of the writing of this
//! documentation, the last two structs don't exist yet).
Expand Down Expand Up @@ -132,7 +132,7 @@
//! extern crate tokio;
//!
//! use futures::{Future, Stream};
//! use libp2p_ping::{Ping, PingOutput};
//! use libp2p_ping::protocol::{Ping, PingOutput};
//! use libp2p_core::Transport;
//! use tokio::runtime::current_thread::Runtime;
//!
Expand Down Expand Up @@ -184,6 +184,7 @@ extern crate rw_stream_sink;
extern crate smallvec;
extern crate tokio_executor;
extern crate tokio_io;
extern crate tokio_timer;
extern crate void;

#[cfg(test)]
Expand All @@ -193,8 +194,6 @@ extern crate tokio;
#[cfg(test)]
extern crate tokio_codec;
#[cfg(test)]
extern crate tokio_timer;
#[cfg(test)]
#[macro_use]
extern crate assert_matches;
#[cfg(test)]
Expand All @@ -213,6 +212,7 @@ mod tests;
pub mod either;
pub mod muxing;
pub mod nodes;
pub mod topology;
pub mod transport;
pub mod upgrade;

Expand Down
3 changes: 2 additions & 1 deletion core/src/muxing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,10 @@ impl<P> fmt::Debug for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
<P::Target as StreamMuxer>::Substream: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Substream({:?})", self)
write!(f, "Substream({:?})", self.substream)
}
}

Expand Down
32 changes: 16 additions & 16 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ impl<'a, TInEvent, TOutEvent, THandler> CollectionReachEvent<'a, TInEvent, TOutE
let ret_value = if let Some(former_task_id) = former_task_id {
self.parent.inner.task(former_task_id)
.expect("whenever we receive a TaskClosed event or close a node, we remove the \
corresponding entry from self.nodes ; therefore all elements in \
self.nodes are valid tasks in the HandledNodesTasks ; qed")
corresponding entry from self.nodes; therefore all elements in \
self.nodes are valid tasks in the HandledNodesTasks; qed")
.close();
let _former_other_state = self.parent.tasks.remove(&former_task_id);
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone())));
Expand Down Expand Up @@ -241,10 +241,10 @@ impl<'a, TInEvent, TOutEvent, THandler> Drop for CollectionReachEvent<'a, TInEve
let task_state = self.parent.tasks.remove(&self.id);
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
self.parent.inner.task(self.id)
.expect("we create the CollectionReachEvent with a valid task id ; the \
.expect("we create the CollectionReachEvent with a valid task id; the \
CollectionReachEvent mutably borrows the collection, therefore nothing \
can delete this task during the lifetime of the CollectionReachEvent ; \
therefore the task is still valid when we delete it ; qed")
can delete this task during the lifetime of the CollectionReachEvent; \
therefore the task is still valid when we delete it; qed")
.close();
}
}
Expand Down Expand Up @@ -301,16 +301,16 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
Entry::Vacant(_) => Err(()),
Entry::Occupied(entry) => {
match entry.get() {
&TaskState::Connected(_) => return Err(()),
&TaskState::Pending => (),
TaskState::Connected(_) => return Err(()),
TaskState::Pending => (),
};

entry.remove();
self.inner.task(id.0)
.expect("whenever we receive a TaskClosed event or interrupt a task, we \
remove the corresponding entry from self.tasks ; therefore all \
remove the corresponding entry from self.tasks; therefore all \
elements in self.tasks are valid tasks in the \
HandledNodesTasks ; qed")
HandledNodesTasks; qed")
.close();

Ok(())
Expand Down Expand Up @@ -397,7 +397,7 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
})
},
(Some(TaskState::Pending), _, _) => {
// TODO: this variant shouldn't happen ; prove this
// TODO: this variant shouldn't happen; prove this
panic!()
},
(Some(TaskState::Connected(peer_id)), Ok(()), _handler) => {
Expand All @@ -420,9 +420,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
})
},
(None, _, _) => {
panic!("self.tasks is always kept in sync with the tasks in self.inner ; \
panic!("self.tasks is always kept in sync with the tasks in self.inner; \
when we add a task in self.inner we add a corresponding entry in \
self.tasks, and remove the entry only when the task is closed ; \
self.tasks, and remove the entry only when the task is closed; \
qed")
},
}
Expand All @@ -440,9 +440,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
let peer_id = match self.tasks.get(&id) {
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task ; \
received a corresponding NodeReached event from that same task; \
when we receive a NodeReached event, we ensure that the entry in \
self.tasks is switched to the Connected state ; qed"),
self.tasks is switched to the Connected state; qed"),
};

Async::Ready(CollectionEvent::NodeEvent {
Expand Down Expand Up @@ -477,8 +477,8 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> {
let old_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(old_task_id, Some(self.inner.id()));
} else {
panic!("a PeerMut can only be created if an entry is present in nodes ; an entry in \
nodes always matched a Connected entry in tasks ; qed");
panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \
nodes always matched a Connected entry in tasks; qed");
};

self.inner.close();
Expand Down
57 changes: 36 additions & 21 deletions core/src/nodes/handled_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::io::Error as IoError;

/// Handler for the substreams of a node.
// TODO: right now it is possible for a node handler to be built, then shut down right after if we
// realize we dialed the wrong peer for example ; this could be surprising and should either
// realize we dialed the wrong peer for example; this could be surprising and should either
// be documented or changed (favouring the "documented" right now)
pub trait NodeHandler {
/// Custom event that can be received from the outside.
Expand All @@ -41,6 +41,12 @@ pub trait NodeHandler {
/// Sends a new substream to the handler.
///
/// The handler is responsible for upgrading the substream to whatever protocol it wants.
///
/// # Panic
///
/// Implementations are allowed to panic in the case of dialing if the `user_data` in
/// `endpoint` doesn't correspond to what was returned earlier when polling, or is used
/// multiple times.
fn inject_substream(&mut self, substream: Self::Substream, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>);

/// Indicates to the handler that the inbound part of the muxer has been closed, and that
Expand All @@ -49,6 +55,11 @@ pub trait NodeHandler {

/// Indicates to the handler that an outbound substream failed to open because the outbound
/// part of the muxer has been closed.
///
/// # Panic
///
/// Implementations are allowed to panic if `user_data` doesn't correspond to what was returned
/// earlier when polling, or is used multiple times.
fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo);

/// Injects an event coming from the outside into the handler.
Expand Down Expand Up @@ -165,6 +176,16 @@ where
}
}

/// Returns a reference to the `NodeHandler`
pub fn handler(&self) -> &THandler{
&self.handler
}

/// Returns a mutable reference to the `NodeHandler`
pub fn handler_mut(&mut self) -> &mut THandler{
&mut self.handler
}

/// Injects an event to the handler.
#[inline]
pub fn inject_event(&mut self, event: THandler::InEvent) {
Expand Down Expand Up @@ -364,10 +385,6 @@ mod tests {
}
}

fn did_see_event(handled_node: &mut TestHandledNode, event: &InEvent) -> bool {
handled_node.handler.events.contains(event)
}

// Set the state of the `Handler` after `inject_outbound_closed` is called
fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_state: HandlerState) {
handled_node.handler.next_outbound_state = Some(next_state);
Expand Down Expand Up @@ -445,7 +462,7 @@ mod tests {

let event = InEvent::Custom("banana");
handled.inject_event(event.clone());
assert!(did_see_event(&mut handled, &event));
assert_eq!(handled.handler().events, vec![event]);
}

#[test]
Expand Down Expand Up @@ -547,9 +564,7 @@ mod tests {
.handled_node();

assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => {
assert_matches!(event, OutEvent::Custom(s) => {
assert_eq!(s, "pineapple");
});
assert_matches!(event, OutEvent::Custom("pineapple"))
});
}

Expand All @@ -567,7 +582,7 @@ mod tests {
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("pear"))))
);
handled.poll().expect("poll works");
assert_eq!(handled.handler.events, vec![InEvent::OutboundClosed]);
assert_eq!(handled.handler().events, vec![InEvent::OutboundClosed]);
}

#[test]
Expand All @@ -584,8 +599,8 @@ mod tests {
// closed, `inbound_finished` is set to true.
// - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls
// `inject_inbound_close`, but that's irrelevant here)
// - back in `poll()` we call `handler.poll()` which does nothing
// because `HandlerState` is `NotReady`: loop continues
// - back in `poll()` we call `handler.poll()` which does nothing because
// `HandlerState` is `NotReady`: loop continues
// - polls the node again which now skips the inbound block because
// `inbound_finished` is true.
// - Now `poll_outbound()` is called which returns `Async::Ready(None)`
Expand All @@ -604,7 +619,7 @@ mod tests {
// – …and causes the Handler to yield Async::Ready(None)
// – which in turn makes the HandledNode to yield Async::Ready(None) as well
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
assert_eq!(handled.handler.events, vec![
assert_eq!(handled.handler().events, vec![
InEvent::InboundClosed, InEvent::OutboundClosed
]);
}
Expand All @@ -616,9 +631,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::InboundClosed]);
assert_eq!(h.handler().events, vec![InEvent::InboundClosed]);
}

#[test]
Expand All @@ -630,9 +645,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::OutboundClosed]);
assert_eq!(h.handler().events, vec![InEvent::OutboundClosed]);
}

#[test]
Expand All @@ -644,9 +659,9 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::Substream(Some(1))]);
assert_eq!(h.handler().events, vec![InEvent::Substream(Some(1))]);
}

#[test]
Expand All @@ -657,8 +672,8 @@ mod tests {
.with_handler_state(HandlerState::Err) // stop the loop
.handled_node();

assert_eq!(h.handler.events, vec![]);
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler.events, vec![InEvent::Substream(None)]);
assert_eq!(h.handler().events, vec![InEvent::Substream(None)]);
}
}
Loading

0 comments on commit 3cb793c

Please sign in to comment.