Skip to content

Commit

Permalink
Merge branch 'master' into better-async-tls
Browse files Browse the repository at this point in the history
  • Loading branch information
twittner authored Apr 2, 2020
2 parents 5dcdccf + a203db8 commit 70affb7
Show file tree
Hide file tree
Showing 48 changed files with 727 additions and 458 deletions.
47 changes: 47 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,50 @@
# Version ???

- `libp2p-core`: Finished "identity hashing" for peer IDs migration.
[PR 1460](https://github.com/libp2p/rust-libp2p/pull/1460)
- `libp2p-core`: Remove `poll_broadcast`.
[PR 1527](https://github.com/libp2p/rust-libp2p/pull/1527)
- `libp2p-core`, `libp2p-swarm`: Report addresses of closed listeners.
[PR 1485](https://github.com/libp2p/rust-libp2p/pull/1485)
- `libp2p-core`: Support for multiple connections per peer and configurable connection limits.
See [PR #1440](https://github.com/libp2p/rust-libp2p/pull/1440),
[PR #1519](https://github.com/libp2p/rust-libp2p/pull/1519) and
[issue #912](https://github.com/libp2p/rust-libp2p/issues/912) for details.

- `libp2p-swarm`: Pass the cause of closing a listener to `inject_listener_closed`.
[PR 1517](https://github.com/libp2p/rust-libp2p/pull/1517)
- `libp2p-swarm`: Support for multiple connections per peer and configurable connection limits.
See [PR #1440](https://github.com/libp2p/rust-libp2p/pull/1440),
[PR #1519](https://github.com/libp2p/rust-libp2p/pull/1519) and
[issue #912](https://github.com/libp2p/rust-libp2p/issues/912) for details.
- `libp2p-swarm`: The `SwarmEvent` now returns more events.
[PR 1515](https://github.com/libp2p/rust-libp2p/pull/1515)
- `libp2p-swarm`: New `protocols_handler::multi` module.
[PR 1497](https://github.com/libp2p/rust-libp2p/pull/1497)
- `libp2p-swarm`: Allow configuration of outbound substreams.
[PR 1521](https://github.com/libp2p/rust-libp2p/pull/1521)

- `libp2p-kad`: Providers returned from a lookup are now deduplicated.
[PR 1528](https://github.com/libp2p/rust-libp2p/pull/1528)
- `libp2p-kad`: Allow customising the maximum packet size.
[PR 1502](https://github.com/libp2p/rust-libp2p/pull/1502)
- `libp2p-kad`: Allow customising the (libp2p) connection keep-alive timeout.
[PR 1477](https://github.com/libp2p/rust-libp2p/pull/1477)
- `libp2p-kad`: Avoid storing records that are expired upon receipt (optimisation).
[PR 1496](https://github.com/libp2p/rust-libp2p/pull/1496)
- `libp2p-kad`: Fixed potential panic on computing record expiry.
[PR 1492](https://github.com/libp2p/rust-libp2p/pull/1492)

- `libp2p-mplex`: Guard against use of underlying `Sink` upon
error or connection close.
[PR 1529](https://github.com/libp2p/rust-libp2p/pull/1529)

- `multistream-select`: Upgrade to stable futures.
[PR 1484](https://github.com/libp2p/rust-libp2p/pull/1484)

- `multihash`: Removed the crate in favour of the upstream crate.
[PR 1472](https://github.com/libp2p/rust-libp2p/pull/1472)

# Version 0.16.2 (2020-02-28)

- Fixed yamux connections not properly closing and being stuck in the `CLOSE_WAIT` state.
Expand Down
46 changes: 23 additions & 23 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p"
edition = "2018"
description = "Peer-to-peer networking library"
version = "0.16.2"
version = "0.17.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -54,36 +54,36 @@ secp256k1 = ["libp2p-core/secp256k1", "libp2p-secio/secp256k1"]
[dependencies]
bytes = "0.5"
futures = "0.3.1"
multiaddr = { package = "parity-multiaddr", version = "0.7.2", path = "misc/multiaddr" }
multiaddr = { package = "parity-multiaddr", version = "0.8.0", path = "misc/multiaddr" }
multihash = "0.10"
lazy_static = "1.2"
libp2p-mplex = { version = "0.16.0", path = "muxers/mplex", optional = true }
libp2p-identify = { version = "0.16.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.16.2", path = "protocols/kad", optional = true }
libp2p-floodsub = { version = "0.16.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.16.0", path = "./protocols/gossipsub", optional = true }
libp2p-ping = { version = "0.16.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.16.0", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.16.0", path = "protocols/pnet", optional = true }
libp2p-core = { version = "0.16.0", path = "core" }
libp2p-core-derive = { version = "0.16.0", path = "misc/core-derive" }
libp2p-secio = { version = "0.16.1", path = "protocols/secio", default-features = false, optional = true }
libp2p-swarm = { version = "0.16.1", path = "swarm" }
libp2p-uds = { version = "0.16.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.16.2", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.16.2", path = "muxers/yamux", optional = true }
libp2p-noise = { version = "0.16.2", path = "protocols/noise", optional = true }
libp2p-mplex = { version = "0.17.0", path = "muxers/mplex", optional = true }
libp2p-identify = { version = "0.17.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.17.0", path = "protocols/kad", optional = true }
libp2p-floodsub = { version = "0.17.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.17.0", path = "./protocols/gossipsub", optional = true }
libp2p-ping = { version = "0.17.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.17.0", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.17.0", path = "protocols/pnet", optional = true }
libp2p-core = { version = "0.17.1", path = "core" }
libp2p-core-derive = { version = "0.17.0", path = "misc/core-derive" }
libp2p-secio = { version = "0.17.0", path = "protocols/secio", default-features = false, optional = true }
libp2p-swarm = { version = "0.17.0", path = "swarm" }
libp2p-uds = { version = "0.17.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.17.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.17.0", path = "muxers/yamux", optional = true }
libp2p-noise = { version = "0.17.0", path = "protocols/noise", optional = true }
parking_lot = "0.10.0"
pin-project = "0.4.6"
smallvec = "1.0"
wasm-timer = "0.2.4"

[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.16.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.16.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.16.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.16.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.16.0", path = "transports/websocket", optional = true }
libp2p-deflate = { version = "0.17.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.17.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.17.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.17.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.17.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = "1.0"
Expand Down
12 changes: 6 additions & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.16.0"
version = "0.17.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -20,9 +20,9 @@ futures-timer = "3"
lazy_static = "1.2"
libsecp256k1 = { version = "0.3.1", optional = true }
log = "0.4"
multiaddr = { package = "parity-multiaddr", version = "0.7.3", path = "../misc/multiaddr" }
multiaddr = { package = "parity-multiaddr", version = "0.8.0", path = "../misc/multiaddr" }
multihash = "0.10"
multistream-select = { version = "0.7.0", path = "../misc/multistream-select" }
multistream-select = { version = "0.8.0", path = "../misc/multistream-select" }
parking_lot = "0.10.0"
pin-project = "0.4.6"
prost = "0.6.1"
Expand All @@ -40,9 +40,9 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal

[dev-dependencies]
async-std = "1.0"
libp2p-mplex = { version = "0.16.0", path = "../muxers/mplex" }
libp2p-secio = { version = "0.16.0", path = "../protocols/secio" }
libp2p-tcp = { version = "0.16.0", path = "../transports/tcp" }
libp2p-mplex = { version = "0.17.0", path = "../muxers/mplex" }
libp2p-secio = { version = "0.17.0", path = "../protocols/secio" }
libp2p-tcp = { version = "0.17.0", path = "../transports/tcp" }
quickcheck = "0.9.0"
wasm-timer = "0.2"

Expand Down
5 changes: 4 additions & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnecti

use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
use std::{fmt, pin::Pin, task::Context, task::Poll};
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use std::hash::Hash;
use substream::{Muxing, SubstreamEvent};

Expand Down Expand Up @@ -334,3 +334,6 @@ impl fmt::Display for ConnectionLimit {
write!(f, "{}/{}", self.current, self.limit)
}
}

/// A `ConnectionLimit` can represent an error if it has been exceeded.
impl Error for ConnectionLimit {}
36 changes: 1 addition & 35 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ where
/// the associated user data.
#[derive(Debug)]
struct TaskInfo<I, C> {
/// channel endpoint to send messages to the task
/// Channel endpoint to send messages to the task.
sender: mpsc::Sender<task::Command<I>>,
/// The state of the task as seen by the `Manager`.
state: TaskState<C>,
Expand Down Expand Up @@ -286,40 +286,6 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
ConnectionId(task_id)
}

/// Notifies the handlers of all managed connections of an event.
///
/// This function is "atomic", in the sense that if `Poll::Pending` is
/// returned then no event has been sent.
#[must_use]
pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()>
where
I: Clone
{
for task in self.tasks.values_mut() {
if let Poll::Pending = task.sender.poll_ready(cx) { // (*)
return Poll::Pending;
}
}

for (id, task) in self.tasks.iter_mut() {
let cmd = task::Command::NotifyHandler(event.clone());
match task.sender.start_send(cmd) {
Ok(()) => {},
Err(e) if e.is_full() => unreachable!("by (*)"),
Err(e) if e.is_disconnected() => {
// The background task ended. The manager will eventually be
// informed through an `Error` event from the task.
log::trace!("Connection dropped: {:?}", id);
},
Err(e) => {
log::error!("Unexpected error: {:?}", e);
}
}
}

Poll::Ready(())
}

/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I, C>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Expand Down
41 changes: 16 additions & 25 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,

/// Inner state of this `Task`.
state: State<F, M, H, I, O, E, C>,
state: State<F, M, H, O, E, C>,
}

impl<F, M, H, I, O, E, C> Task<F, M, H, I, O, E, C>
Expand All @@ -111,7 +111,6 @@ where
state: State::Pending {
future: Box::pin(future),
handler,
events: Vec::new()
},
}
}
Expand All @@ -133,7 +132,7 @@ where
}

/// The state associated with the `Task` of a connection.
enum State<F, M, H, I, O, E, C>
enum State<F, M, H, O, E, C>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
Expand All @@ -146,12 +145,6 @@ where
future: Pin<Box<F>>,
/// The intended handler for the established connection.
handler: H,
/// While we are dialing the future, we need to buffer the events received via
/// `Command::NotifyHandler` so that they get delivered to the `handler`
/// once the connection is established. We can't leave these in `Task::receiver`
/// because we have to detect if the connection attempt has been aborted (by
/// dropping the corresponding `sender` owned by the manager).
events: Vec<I>
},

/// The connection is established and a new event is ready to be emitted.
Expand Down Expand Up @@ -198,30 +191,29 @@ where

'poll: loop {
match std::mem::replace(&mut this.state, State::Done) {
State::Pending { mut future, handler, mut events } => {
// Process commands from the manager.
loop {
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(Command::NotifyHandler(event))) =>
events.push(event),
}
State::Pending { mut future, handler } => {
// Check if the manager aborted this task by dropping the `commands`
// channel sender side.
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(Command::NotifyHandler(_))) => unreachable!(
"Manager does not allow sending commands to pending tasks.",
)
}
// Check if the connection succeeded.
match Future::poll(Pin::new(&mut future), cx) {
Poll::Ready(Ok((info, muxer))) => {
let mut c = Connection::new(muxer, handler.into_handler(&info));
for event in events {
c.inject_event(event)
}
this.state = State::EstablishedReady {
connection: Some(c),
connection: Some(Connection::new(
muxer,
handler.into_handler(&info),
)),
event: Event::Established { id, info }
}
}
Poll::Pending => {
this.state = State::Pending { future, handler, events };
this.state = State::Pending { future, handler };
return Poll::Pending
}
Poll::Ready(Err(error)) => {
Expand Down Expand Up @@ -338,4 +330,3 @@ where
}
}
}

20 changes: 4 additions & 16 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ where
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_pending_incoming {
if let Some(limit) = self.limits.max_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
Expand Down Expand Up @@ -330,18 +330,6 @@ where
id
}

/// Sends an event to all nodes.
///
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
/// has been sent to any node yet.
#[must_use]
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
where
TInEvent: Clone
{
self.manager.poll_broadcast(event, cx)
}

/// Adds an existing established connection to the pool.
///
/// Returns the assigned connection ID on success. An error is returned
Expand Down Expand Up @@ -846,8 +834,8 @@ where
/// The configurable limits of a connection [`Pool`].
#[derive(Debug, Clone, Default)]
pub struct PoolLimits {
pub max_pending_outgoing: Option<usize>,
pub max_pending_incoming: Option<usize>,
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
}

Expand All @@ -863,7 +851,7 @@ impl PoolLimits {
where
F: FnOnce() -> usize
{
Self::check(current, self.max_pending_outgoing)
Self::check(current, self.max_outgoing)
}

fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
Expand Down
Loading

0 comments on commit 70affb7

Please sign in to comment.