Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): deprecate SwarmBuilder in favor of configuring Swarm #3189

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions protocols/dcutr/examples/dcutr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ use libp2p::dns::DnsConfig;
use libp2p::identify;
use libp2p::noise;
use libp2p::relay::v2::client::{self, Client};
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent};
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::tcp;
use libp2p::Transport;
use libp2p::{dcutr, ping};
use libp2p::{identity, PeerId};
use libp2p_swarm::Swarm;
use log::info;
use std::convert::TryInto;
use std::error::Error;
Expand Down Expand Up @@ -156,11 +157,10 @@ fn main() -> Result<(), Box<dyn Error>> {
};

let mut swarm = match ThreadPool::new() {
Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp),
Err(_) => SwarmBuilder::without_executor(transport, behaviour, local_peer_id),
Ok(tp) => Swarm::with_executor(transport, behaviour, local_peer_id, tp),
Err(_) => Swarm::without_executor(transport, behaviour, local_peer_id),
}
.dial_concurrency_factor(10_u8.try_into().unwrap())
.build();
.dial_concurrency_factor(10_u8.try_into().unwrap());

swarm
.listen_on(
Expand Down
175 changes: 40 additions & 135 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use std::{
collections::{hash_map, HashMap},
convert::TryFrom as _,
fmt,
num::{NonZeroU8, NonZeroUsize},
num::NonZeroU8,
pin::Pin,
task::Context,
task::Poll,
Expand All @@ -56,29 +56,6 @@ use void::Void;
mod concurrent_dial;
mod task;

enum ExecSwitch {
Executor(Box<dyn Executor + Send>),
LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
}

impl ExecSwitch {
fn advance_local(&mut self, cx: &mut Context) {
match self {
ExecSwitch::Executor(_) => {}
ExecSwitch::LocalSpawn(local) => {
while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
}
}
}

fn spawn(&mut self, task: BoxFuture<'static, ()>) {
match self {
Self::Executor(executor) => executor.exec(task),
Self::LocalSpawn(local) => local.push(task),
}
}
}

/// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<THandler, TTrans>
where
Expand All @@ -88,7 +65,7 @@ where
local_id: PeerId,

/// The connection counter(s).
counters: ConnectionCounters,
pub(crate) counters: ConnectionCounters,

/// The managed connections of each peer that are currently considered established.
established: FnvHashMap<
Expand All @@ -106,25 +83,25 @@ where
next_connection_id: ConnectionId,

/// Size of the task command buffer (per task).
task_command_buffer_size: usize,
pub(crate) task_command_buffer_size: usize,

/// Number of addresses concurrently dialed for a single outbound connection attempt.
dial_concurrency_factor: NonZeroU8,
pub(crate) dial_concurrency_factor: NonZeroU8,

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
pub(crate) substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`Connection::max_negotiating_inbound_streams`].
max_negotiating_inbound_streams: usize,
pub(crate) max_negotiating_inbound_streams: usize,

/// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured.
task_event_buffer_size: usize,
pub(crate) task_event_buffer_size: usize,

/// The executor to use for running connection tasks. Can either be a global executor
/// or a local queue.
executor: ExecSwitch,
pub(crate) executor: ExecSwitch,

/// Sender distributed to pending tasks for reporting events back
/// to the pool.
Expand Down Expand Up @@ -316,23 +293,20 @@ where
TTrans: Transport,
{
/// Creates a new empty `Pool`.
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
pub fn new(local_id: PeerId, executor: ExecSwitch) -> Self {
let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
let executor = match config.executor {
Some(exec) => ExecSwitch::Executor(exec),
None => ExecSwitch::LocalSpawn(Default::default()),
};

Pool {
local_id,
counters: ConnectionCounters::new(limits),
counters: ConnectionCounters::new(ConnectionLimits::default()),
established: Default::default(),
pending: Default::default(),
next_connection_id: ConnectionId::new(0),
task_command_buffer_size: config.task_command_buffer_size,
dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
task_event_buffer_size: config.task_event_buffer_size,
task_command_buffer_size: 32,
dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
substream_upgrade_protocol_override: None,
max_negotiating_inbound_streams: 128,
task_event_buffer_size: 7,
executor,
pending_connection_events_tx,
pending_connection_events_rx,
Expand Down Expand Up @@ -858,7 +832,7 @@ where
#[derive(Debug, Clone)]
pub struct ConnectionCounters {
/// The effective connection limits.
limits: ConnectionLimits,
pub(crate) limits: ConnectionLimits,
/// The current number of incoming connections.
pending_incoming: u32,
/// The current number of outgoing connections.
Expand Down Expand Up @@ -1073,98 +1047,6 @@ impl ConnectionLimits {
}
}

/// Configuration options when creating a [`Pool`].
///
/// The default configuration specifies no dedicated task executor, a
/// task event buffer size of 32, and a task command buffer size of 7.
pub struct PoolConfig {
/// Executor to use to spawn tasks.
pub executor: Option<Box<dyn Executor + Send>>,

/// Size of the task command buffer (per task).
pub task_command_buffer_size: usize,

/// Size of the pending connection task event buffer and the established connection task event
/// buffer.
pub task_event_buffer_size: usize,

/// Number of addresses concurrently dialed for a single outbound connection attempt.
pub dial_concurrency_factor: NonZeroU8,

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`Connection::max_negotiating_inbound_streams`].
max_negotiating_inbound_streams: usize,
}

impl PoolConfig {
pub fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
Self {
executor,
task_command_buffer_size: 32,
task_event_buffer_size: 7,
dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"),
substream_upgrade_protocol_override: None,
max_negotiating_inbound_streams: 128,
}
}

/// Configures the executor to use for spawning connection background tasks.
pub fn with_executor(mut self, executor: Box<dyn Executor + Send>) -> Self {
self.executor = Some(executor);
self
}

/// Sets the maximum number of events sent to a connection's background task
/// that may be buffered, if the task cannot keep up with their consumption and
/// delivery to the connection handler.
///
/// When the buffer for a particular connection is full, `notify_handler` will no
/// longer be able to deliver events to the associated [`Connection`](super::Connection),
/// thus exerting back-pressure on the connection and peer API.
pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.task_command_buffer_size = n.get() - 1;
self
}

/// Sets the maximum number of buffered connection events (beyond a guaranteed
/// buffer of 1 event per connection).
///
/// When the buffer is full, the background tasks of all connections will stall.
/// In this way, the consumers of network events exert back-pressure on
/// the network connection I/O.
pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
self.task_event_buffer_size = n;
self
}

/// Number of addresses concurrently dialed for a single outbound connection attempt.
pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.dial_concurrency_factor = factor;
self
}

/// Configures an override for the substream upgrade protocol to use.
pub fn with_substream_upgrade_protocol_override(
mut self,
v: libp2p_core::upgrade::Version,
) -> Self {
self.substream_upgrade_protocol_override = Some(v);
self
}

/// The maximum number of inbound streams concurrently negotiating on a connection.
///
/// See [`Connection::max_negotiating_inbound_streams`].
pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
self.max_negotiating_inbound_streams = v;
self
}
}

trait EntryExt<'a, K, V> {
fn expect_occupied(self, msg: &'static str) -> hash_map::OccupiedEntry<'a, K, V>;
}
Expand All @@ -1178,6 +1060,29 @@ impl<'a, K: 'a, V: 'a> EntryExt<'a, K, V> for hash_map::Entry<'a, K, V> {
}
}

pub enum ExecSwitch {
Executor(Box<dyn Executor + Send>),
LocalSpawn(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>),
}

impl ExecSwitch {
fn advance_local(&mut self, cx: &mut Context) {
match self {
ExecSwitch::Executor(_) => {}
ExecSwitch::LocalSpawn(local) => {
while let Poll::Ready(Some(())) = local.poll_next_unpin(cx) {}
}
}
}

fn spawn(&mut self, task: BoxFuture<'static, ()>) {
match self {
Self::Executor(executor) => executor.exec(task),
Self::LocalSpawn(local) => local.push(task),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading