Skip to content

Commit

Permalink
Merge branch 'master' into lexnv/limits-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lexnv authored Nov 13, 2024
2 parents 1217365 + e7cb35e commit a5951cc
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 70 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub mod yamux;

mod bandwidth;
mod multistream_select;
mod utils;

#[cfg(test)]
mod mock;
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
protocol::libp2p::kademlia::{futures_stream::FuturesStream, query::QueryId},
substream::Substream,
PeerId,
protocol::libp2p::kademlia::query::QueryId, substream::Substream,
utils::futures_stream::FuturesStream, PeerId,
};

use bytes::{Bytes, BytesMut};
Expand Down
1 change: 0 additions & 1 deletion src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ const PARALLELISM_FACTOR: usize = 3;
mod bucket;
mod config;
mod executor;
mod futures_stream;
mod handle;
mod message;
mod query;
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
use crate::{
protocol::libp2p::kademlia::{
config::{DEFAULT_PROVIDER_REFRESH_INTERVAL, DEFAULT_PROVIDER_TTL},
futures_stream::FuturesStream,
record::{ContentProvider, Key, ProviderRecord, Record},
types::Key as KademliaKey,
},
utils::futures_stream::FuturesStream,
PeerId,
};

Expand Down
245 changes: 212 additions & 33 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,27 +170,29 @@ impl Stream for TransportContext {
type Item = (SupportedTransport, TransportEvent);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let len = match self.transports.len() {
0 => return Poll::Ready(None),
len => len,
};
let start_index = self.index;

loop {
let index = self.index % len;
self.index += 1;
if self.transports.is_empty() {
// Terminate if we don't have any transports installed.
return Poll::Ready(None);
}

let (key, stream) = self.transports.get_index_mut(index).expect("transport to exist");
let len = self.transports.len();
self.index = (self.index + 1) % len;
for index in 0..len {
let current = (self.index + index) % len;
let (key, stream) = self.transports.get_index_mut(current).expect("transport to exist");
match stream.poll_next_unpin(cx) {
Poll::Pending => {}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(event)) => return Poll::Ready(Some((*key, event))),
}

if self.index == start_index + len {
break Poll::Pending;
Poll::Ready(None) => {
return Poll::Ready(None);
}
Poll::Ready(Some(event)) => {
let event = Some((*key, event));
return Poll::Ready(event);
}
}
}

Poll::Pending
}
}

Expand Down Expand Up @@ -1001,29 +1003,60 @@ impl TransportManager {
pub async fn next(&mut self) -> Option<TransportEvent> {
loop {
tokio::select! {
event = self.event_rx.recv() => match event? {
TransportManagerEvent::ConnectionClosed {
peer,
connection: connection_id,
} => match self.on_connection_closed(peer, connection_id) {
None => {}
Some(event) => return Some(event),
}
event = self.event_rx.recv() => {
let Some(event) = event else {
tracing::error!(
target: LOG_TARGET,
"Installed protocols terminated, ignore if the node is stopping"
);

return None;
};

match event {
TransportManagerEvent::ConnectionClosed {
peer,
connection: connection_id,
} => if let Some(event) = self.on_connection_closed(peer, connection_id) {
return Some(event);
}
};
},
command = self.cmd_rx.recv() => match command? {
InnerTransportManagerCommand::DialPeer { peer } => {
if let Err(error) = self.dial(peer).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer")

command = self.cmd_rx.recv() =>{
let Some(command) = command else {
tracing::error!(
target: LOG_TARGET,
"User command terminated, ignore if the node is stopping"
);

return None;
};

match command {
InnerTransportManagerCommand::DialPeer { peer } => {
if let Err(error) = self.dial(peer).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to dial peer")
}
}
}
InnerTransportManagerCommand::DialAddress { address } => {
if let Err(error) = self.dial_address(address).await {
tracing::debug!(target: LOG_TARGET, ?error, "failed to dial peer")
InnerTransportManagerCommand::DialAddress { address } => {
if let Err(error) = self.dial_address(address).await {
tracing::debug!(target: LOG_TARGET, ?error, "failed to dial peer")
}
}
}
},

event = self.transports.next() => {
let (transport, event) = event?;
let Some((transport, event)) = event else {
tracing::error!(
target: LOG_TARGET,
"Installed transports terminated, ignore if the node is stopping"
);

return None;
};


match event {
TransportEvent::DialFailure { connection_id, address, error } => {
Expand Down Expand Up @@ -1318,6 +1351,152 @@ mod tests {
(dial_address, connection_id)
}

struct MockTransport {
rx: tokio::sync::mpsc::Receiver<TransportEvent>,
}

impl MockTransport {
fn new(rx: tokio::sync::mpsc::Receiver<TransportEvent>) -> Self {
Self { rx }
}
}

impl Transport for MockTransport {
fn dial(&mut self, _connection_id: ConnectionId, _address: Multiaddr) -> crate::Result<()> {
Ok(())
}

fn accept(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Ok(())
}

fn accept_pending(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Ok(())
}

fn reject_pending(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Ok(())
}

fn reject(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Ok(())
}

fn open(
&mut self,
_connection_id: ConnectionId,
_addresses: Vec<Multiaddr>,
) -> crate::Result<()> {
Ok(())
}

fn negotiate(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Ok(())
}

fn cancel(&mut self, _connection_id: ConnectionId) {}
}
impl Stream for MockTransport {
type Item = TransportEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}

#[tokio::test]
#[cfg(feature = "websocket")]
async fn transport_events() {
let mut transports = TransportContext::new();

let (tx_tcp, rx) = tokio::sync::mpsc::channel(8);
let transport = MockTransport::new(rx);
transports.register_transport(SupportedTransport::Tcp, Box::new(transport));

let (tx_ws, rx) = tokio::sync::mpsc::channel(8);
let transport = MockTransport::new(rx);
transports.register_transport(SupportedTransport::WebSocket, Box::new(transport));

assert_eq!(transports.index, 0);
assert_eq!(transports.transports.len(), 2);
// No items.
futures::future::poll_fn(|cx| match transports.poll_next_unpin(cx) {
std::task::Poll::Ready(_) => panic!("didn't expect event from `TransportService`"),
std::task::Poll::Pending => std::task::Poll::Ready(()),
})
.await;
assert_eq!(transports.index, 1);

// Websocket events.
tx_ws
.send(TransportEvent::PendingInboundConnection {
connection_id: ConnectionId::from(1),
})
.await
.expect("chanel to be open");

let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
.await
.expect("expected event");
assert_eq!(event.0, SupportedTransport::WebSocket);
assert!(std::matches!(
event.1,
TransportEvent::PendingInboundConnection { .. }
));
assert_eq!(transports.index, 0);

// TCP events.
tx_tcp
.send(TransportEvent::PendingInboundConnection {
connection_id: ConnectionId::from(2),
})
.await
.expect("chanel to be open");

let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
.await
.expect("expected event");
assert_eq!(event.0, SupportedTransport::Tcp);
assert!(std::matches!(
event.1,
TransportEvent::PendingInboundConnection { .. }
));
assert_eq!(transports.index, 1);

// Both transports produce events.
tx_ws
.send(TransportEvent::PendingInboundConnection {
connection_id: ConnectionId::from(3),
})
.await
.expect("chanel to be open");
tx_tcp
.send(TransportEvent::PendingInboundConnection {
connection_id: ConnectionId::from(4),
})
.await
.expect("chanel to be open");

let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
.await
.expect("expected event");
assert_eq!(event.0, SupportedTransport::Tcp);
assert!(std::matches!(
event.1,
TransportEvent::PendingInboundConnection { .. }
));
assert_eq!(transports.index, 0);

let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
.await
.expect("expected event");
assert_eq!(event.0, SupportedTransport::WebSocket);
assert!(std::matches!(
event.1,
TransportEvent::PendingInboundConnection { .. }
));
assert_eq!(transports.index, 1);
}

#[test]
#[should_panic]
#[cfg(debug_assertions)]
Expand Down
Loading

0 comments on commit a5951cc

Please sign in to comment.