Skip to content

Commit

Permalink
core/: Remove TInEvent and TOutEvent (#2183)
Browse files Browse the repository at this point in the history
TInEvent and TOutEvent are implied through THandler and thus
superflucious. Both are removed in favor of a derivation through
THandler.
  • Loading branch information
mxinden authored Aug 11, 2021
1 parent 7391b6e commit 0085612
Show file tree
Hide file tree
Showing 24 changed files with 356 additions and 244 deletions.
11 changes: 10 additions & 1 deletion core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@

- Add `From<&PublicKey> for PeerId` (see [PR 2145]).

- Remove `TInEvent` and `TOutEvent` trait paramters on most public types.
`TInEvent` and `TOutEvent` are implied through `THandler` and thus
superflucious. Both are removed in favor of a derivation through `THandler`
(see [PR 2183]).

- Require `ConnectionHandler::{InEvent,OutEvent,Error}` to implement `Debug`
(see [PR 2183]).

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137/
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183

# 0.29.0 [2021-07-12]

Expand Down
2 changes: 1 addition & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

mod error;
mod handler;
pub(crate) mod handler;
mod listeners;
mod substream;

Expand Down
13 changes: 8 additions & 5 deletions core/src/connection/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::Multiaddr;
use std::{task::Context, task::Poll};
use std::{fmt::Debug, task::Context, task::Poll};
use super::{Connected, SubstreamEndpoint};

/// The interface of a connection handler.
Expand All @@ -30,14 +30,14 @@ pub trait ConnectionHandler {
///
/// See also [`EstablishedConnection::notify_handler`](super::EstablishedConnection::notify_handler)
/// and [`ConnectionHandler::inject_event`].
type InEvent;
type InEvent: Debug + Send + 'static;
/// The outbound type of events that the handler emits to the `Network`
/// through [`ConnectionHandler::poll`].
///
/// See also [`NetworkEvent::ConnectionEvent`](crate::network::NetworkEvent::ConnectionEvent).
type OutEvent;
type OutEvent: Debug + Send + 'static;
/// The type of errors that the handler can produce when polled by the `Network`.
type Error;
type Error: Debug + Send + 'static;
/// The type of the substream containing the data.
type Substream;
/// Information about a substream. Can be sent to the handler through a `SubstreamEndpoint`,
Expand Down Expand Up @@ -91,6 +91,10 @@ where
}
}

pub(crate) type THandlerInEvent<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;
pub(crate) type THandlerOutEvent<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent;
pub(crate) type THandlerError<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::Error;

/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
Expand Down Expand Up @@ -127,4 +131,3 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
}
}
}

49 changes: 21 additions & 28 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ use super::{
ConnectionHandler,
IntoConnectionHandler,
PendingConnectionError,
Substream
Substream,
handler::{
THandlerInEvent,
THandlerOutEvent,
THandlerError,
},
};
use task::{Task, TaskId};

Expand Down Expand Up @@ -88,15 +93,15 @@ impl ConnectionId {
}

/// A connection `Manager` orchestrates the I/O of a set of connections.
pub struct Manager<I, O, H, E, HE> {
pub struct Manager<H: IntoConnectionHandler, E> {
/// The tasks of the managed connections.
///
/// Each managed connection is associated with a (background) task
/// spawned onto an executor. Each `TaskInfo` in `tasks` is linked to such a
/// background task via a channel. Closing that channel (i.e. dropping
/// the sender in the associated `TaskInfo`) stops the background task,
/// which will attempt to gracefully close the connection.
tasks: FnvHashMap<TaskId, TaskInfo<I>>,
tasks: FnvHashMap<TaskId, TaskInfo<THandlerInEvent<H>>>,

/// Next available identifier for a new connection / task.
next_task_id: TaskId,
Expand All @@ -115,13 +120,13 @@ pub struct Manager<I, O, H, E, HE> {

/// Sender distributed to managed tasks for reporting events back
/// to the manager.
events_tx: mpsc::Sender<task::Event<O, H, E, HE>>,
events_tx: mpsc::Sender<task::Event<H, E>>,

/// Receiver for events reported from managed tasks.
events_rx: mpsc::Receiver<task::Event<O, H, E, HE>>
events_rx: mpsc::Receiver<task::Event<H, E>>
}

impl<I, O, H, E, HE> fmt::Debug for Manager<I, O, H, E, HE>
impl<H: IntoConnectionHandler, E> fmt::Debug for Manager<H, E>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_map()
Expand Down Expand Up @@ -179,7 +184,7 @@ enum TaskState {

/// Events produced by the [`Manager`].
#[derive(Debug)]
pub enum Event<'a, I, O, H, TE, HE> {
pub enum Event<'a, H: IntoConnectionHandler, TE> {
/// A connection attempt has failed.
PendingConnectionError {
/// The connection ID.
Expand All @@ -206,35 +211,35 @@ pub enum Event<'a, I, O, H, TE, HE> {
connected: Connected,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
error: Option<ConnectionError<THandlerError<H>>>,
},

/// A connection has been established.
ConnectionEstablished {
/// The entry associated with the new connection.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
},

/// A connection handler has produced an event.
ConnectionEvent {
/// The entry associated with the connection that produced the event.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
/// The produced event.
event: O
event: THandlerOutEvent<H>
},

/// A connection to a node has changed its address.
AddressChange {
/// The entry associated with the connection that changed address.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
/// The former [`ConnectedPoint`].
old_endpoint: ConnectedPoint,
/// The new [`ConnectedPoint`].
new_endpoint: ConnectedPoint,
},
}

impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
/// Creates a new connection manager.
pub fn new(config: ManagerConfig) -> Self {
let (tx, rx) = mpsc::channel(config.task_event_buffer_size);
Expand All @@ -255,19 +260,13 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
/// processing the node's events.
pub fn add_pending<F, M>(&mut self, future: F, handler: H) -> ConnectionId
where
I: Send + 'static,
O: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
F: Future<Output = ConnectResult<M, TE>> + Send + 'static,
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
OutEvent = O,
Error = HE
> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
{
Expand All @@ -293,15 +292,9 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
OutEvent = O,
Error = HE
> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
I: Send + 'static,
O: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
{
Expand All @@ -313,7 +306,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
sender: tx, state: TaskState::Established(info)
});

let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _, _, _>>> =
let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _>>> =
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));

if let Some(executor) = &mut self.executor {
Expand All @@ -326,7 +319,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
}

/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I>> {
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, THandlerInEvent<H>>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Some(Entry::new(task))
} else {
Expand All @@ -340,7 +333,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
}

/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE>> {
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, H, TE>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}

Expand Down
45 changes: 26 additions & 19 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ use crate::{
IntoConnectionHandler,
PendingConnectionError,
Substream,
handler::{
THandlerInEvent,
THandlerOutEvent,
THandlerError,
},
},
};
use futures::{prelude::*, channel::mpsc, stream};
Expand All @@ -53,23 +58,23 @@ pub enum Command<T> {

/// Events that a task can emit to its manager.
#[derive(Debug)]
pub enum Event<T, H, TE, HE> {
pub enum Event<H: IntoConnectionHandler, TE> {
/// A connection to a node has succeeded.
Established { id: TaskId, info: Connected },
/// A pending connection failed.
Failed { id: TaskId, error: PendingConnectionError<TE>, handler: H },
/// A node we are connected to has changed its address.
AddressChange { id: TaskId, new_address: Multiaddr },
/// Notify the manager of an event from the connection.
Notify { id: TaskId, event: T },
Notify { id: TaskId, event: THandlerOutEvent<H> },
/// A connection closed, possibly due to an error.
///
/// If `error` is `None`, the connection has completed
/// an active orderly close.
Closed { id: TaskId, error: Option<ConnectionError<HE>> }
Closed { id: TaskId, error: Option<ConnectionError<THandlerError<H>>> }
}

impl<T, H, TE, HE> Event<T, H, TE, HE> {
impl<H: IntoConnectionHandler, TE> Event<H, TE> {
pub fn id(&self) -> &TaskId {
match self {
Event::Established { id, .. } => id,
Expand All @@ -82,7 +87,7 @@ impl<T, H, TE, HE> Event<T, H, TE, HE> {
}

/// A `Task` is a [`Future`] that handles a single connection.
pub struct Task<F, M, H, I, O, E>
pub struct Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
Expand All @@ -92,16 +97,16 @@ where
id: TaskId,

/// Sender to emit events to the manager of this task.
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
events: mpsc::Sender<Event<H, E>>,

/// Receiver for commands sent by the manager of this task.
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,
commands: stream::Fuse<mpsc::Receiver<Command<THandlerInEvent<H>>>>,

/// Inner state of this `Task`.
state: State<F, M, H, O, E>,
state: State<F, M, H, E>,
}

impl<F, M, H, I, O, E> Task<F, M, H, I, O, E>
impl<F, M, H, E> Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
Expand All @@ -110,8 +115,8 @@ where
/// Create a new task to connect and handle some node.
pub fn pending(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
future: F,
handler: H
) -> Self {
Expand All @@ -129,8 +134,8 @@ where
/// Create a task for an existing node we are already connected to.
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
connection: Connection<M, H::Handler>
) -> Self {
Task {
Expand All @@ -143,7 +148,7 @@ where
}

/// The state associated with the `Task` of a connection.
enum State<F, M, H, O, E>
enum State<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
Expand All @@ -165,33 +170,35 @@ where
/// is polled for new events in this state, otherwise the event
/// must be sent to the `Manager` before the connection can be
/// polled again.
event: Option<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>
event: Option<Event<H, E>>,
},

/// The connection is closing (active close).
Closing(Close<M>),

/// The task is terminating with a final event for the `Manager`.
Terminating(Event<O, H, E, <H::Handler as ConnectionHandler>::Error>),
Terminating(Event<H, E>),

/// The task has finished.
Done
}

impl<F, M, H, I, O, E> Unpin for Task<F, M, H, I, O, E>
impl<F, M, H, E> Unpin for Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
}

impl<F, M, H, I, O, E> Future for Task<F, M, H, I, O, E>
impl<F, M, H, E> Future for Task<F, M, H, E>
where
M: StreamMuxer,
F: Future<Output = ConnectResult<M, E>>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
H::Handler: ConnectionHandler<
Substream = Substream<M>,
> + Send + 'static,
{
type Output = ();

Expand Down
Loading

0 comments on commit 0085612

Please sign in to comment.