Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review from Thomas Eizinger #97

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions transport/src/bootnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn main() -> anyhow::Result<()> {
kademlia: kad::Behaviour::with_config(
local_peer_id,
MemoryStore::new(local_peer_id),
Default::default(),
Default::default(), // Same here, this is the default IPFS network.
),
relay: relay::Behaviour::new(local_peer_id, Default::default()),
gossipsub: gossipsub::Behaviour::new(
Expand Down Expand Up @@ -104,7 +104,7 @@ async fn main() -> anyhow::Result<()> {
swarm.add_external_address(public_addr);
}

swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); // This is unnecessary if you are setting an external address.

// Connect to other boot nodes
for BootNode { peer_id, address } in
Expand All @@ -113,15 +113,23 @@ async fn main() -> anyhow::Result<()> {
log::info!("Connecting to boot node {peer_id} at {address}");
swarm.behaviour_mut().autonat.add_server(peer_id, Some(address.clone()));
swarm.behaviour_mut().kademlia.add_address(&peer_id, address.clone());

// By default, the `Swarm` will consult all behaviours for addresses for a peer.
// If you specify `.addresses` here, it won't.
// Unless you want to be specific about _just_ using that one address, I'd leave it off and just specify the peer ID.
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved
swarm.dial(DialOpts::peer_id(peer_id).addresses(vec![address]).build())?;
}

// As mentioned elsewhere, bootstrap is now included automatically.
if swarm.behaviour_mut().kademlia.bootstrap().is_err() {
log::warn!("No peers connected. Cannot bootstrap kademlia.")
}

let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;

// Have you looked at the implementation? :)
// This statically returns `false` so you can also just use a `loop`!
while !swarm.is_terminated() {
let event = tokio::select! {
event = swarm.select_next_some() => event,
Expand Down
2 changes: 1 addition & 1 deletion transport/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1);
pub struct TaskManager {
shutdown_timeout: Duration,
cancel_token: CancellationToken,
tasks: Vec<JoinHandle<()>>,
tasks: Vec<JoinHandle<()>>, // This can technically grow unbounded.
}

impl Default for TaskManager {
Expand Down
109 changes: 104 additions & 5 deletions transport/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,22 @@ where
kademlia: kad::Behaviour<MemoryStore>,
relay: RelayClient,
dcutr: dcutr::Behaviour,
// I am not sure I fully understand your use of the `request_response` behaviour.
// You seem to be ignoring the association of requests and responses because all responses get dumped into a single stream.
// If all your messages are just events (i.e. don't have a response), I would suggest to don't send responses at all.
// Instead, have both sides just send each other "requests".
// That should simplify your event-handling.
//
// You may also want to look into `libp2p-stream` if you need generic stream-handling and not messages.
//
// Another thing to consider is that currently, you aren't really making use of the protocol-based multiplexing capabilities.
// I am not sure where the requirement for just sending messages comes but it might be worthwhile to consider to send them over multiple protocols.
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved
request: request_response::Behaviour<MessageCodec<T>>,
gossipsub: gossipsub::Behaviour,
ping: ping::Behaviour,

// AutoNATv1 is somewhat broken because it may accidentially hole-punch for you and thus report an invalid NAT status.
// v2 is in the works. Progress is a bit slow because there were some disruptions but I am confident it will ship eventually.
autonat: autonat::Behaviour,
}

Expand Down Expand Up @@ -197,6 +210,7 @@ impl Default for P2PTransportBuilder {
}
}

// More of a nit but I'd expect the functions of a "builder" to return `Self` so it can be chained.
impl P2PTransportBuilder {
pub fn new() -> Self {
let keypair = Keypair::generate_ed25519();
Expand Down Expand Up @@ -295,7 +309,7 @@ impl P2PTransportBuilder {
kademlia: kad::Behaviour::with_config(
local_peer_id,
MemoryStore::new(local_peer_id),
Default::default(),
Default::default(), // With the default config, you are running on the IPFS DHT. You probably want to change this to your own protocol string.
),
dcutr: dcutr::Behaviour::new(local_peer_id),
request: request_response::Behaviour::new(
Expand Down Expand Up @@ -420,6 +434,10 @@ impl P2PTransportBuilder {
for addr in self.listen_addrs {
swarm.listen_on(addr)?;
}

// Why block on waiting for listening?
// In general, `Swarm` will always do many things concurrently.
// I would not recommend to interact with `Swarm` in a linear fashion.
Self::wait_for_listening(
&mut swarm,
#[cfg(feature = "metrics")]
Expand All @@ -438,8 +456,18 @@ impl P2PTransportBuilder {
log::info!("Connecting to boot node {peer_id} at {address}");
swarm.behaviour_mut().autonat.add_server(peer_id, Some(address.clone()));
swarm.behaviour_mut().kademlia.add_address(&peer_id, address.clone());
swarm.dial(DialOpts::peer_id(peer_id).addresses(vec![address]).build())?;
let opts = DialOpts::peer_id(peer_id).addresses(vec![address]).build();

let _ = opts.connection_id();

swarm.dial(opts)?;
}

// You don't really need to wait for the connection before you proceed.
// But if you really wanted to, you should identify it by `ConnectionId`.
// You can grab the connection it from the `DialOpts`, see above.
// Then, wait for an connection established or failed event with this connection ID.
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved

Self::wait_for_first_connection(
&mut swarm,
#[cfg(feature = "metrics")]
Expand All @@ -450,9 +478,18 @@ impl P2PTransportBuilder {

// Connect to relay and listen for relayed connections
if self.relay {
// Why only connect to a single relay?
// Reservations are pretty cheap (only a single file-descriptor for the open connection)
//
// You may also be interested in https://github.com/libp2p/rust-libp2p/issues/4651.
let addr = relay_addr.ok_or(Error::NoRelay)?;
log::info!("Connecting to relay {addr}");

// By issuing a `listen_on` with `/p2p-circuit`, we will automatically dial the relay if we don't have a connection to it yet.
// So you don't need to dial the relay here, calling `listen_on` is enough.
swarm.dial(addr.clone())?;

// Why wait for identify?
Self::wait_for_identify(
&mut swarm,
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -604,6 +641,7 @@ impl<T: MsgContent> P2PTransportHandle<T> {
&self,
peer_id: PeerId,
) -> impl Future<Output = Result<bool, P2PTransportError>> {
// I've had good experienes with <https://docs.rs/bmrng/latest/bmrng/> for this sort of stuff.
let dial_sender = self.dial_sender.clone();
let (tx, rx) = oneshot::channel();
let result_sender = DialResultSender(tx);
Expand All @@ -619,18 +657,33 @@ impl<T: MsgContent> P2PTransportHandle<T> {
}
}

// Overall, what you are doing here is good practise:
// 1. An eventloop that coordinates the swarm and incoming commands
// 2. A handle for interacting with that from other places.
//
// As I mentioned elsewhere, I'd look into pushing some of the logic here into `NetworkBehaviour`s.
// It is not super urgent but it will make long-term maintenance easier because it is easier to test to means you can likely remove some of the state from this eventloop.
// Building your logic based on reacting to events also makes clean-ups to avoid memory-leaks.
struct P2PTransport<T: MsgContent> {
inbound_msg_sender: mpsc::Sender<Message<T>>,
outbound_msg_receiver: mpsc::Receiver<Message<T>>,
subscription_receiver: mpsc::Receiver<Subscription>,
dial_receiver: DialReceiver,

// I'd suggest renaming this, it is a bit confusing with `ongoing_dials`.
pending_dials: HashMap<PeerId, Vec<DialResultSender>>,

// I'd suggest unifying the state related to one connection into a struct / enum and keying that under `ConnectionId`.
// That way, it is much easier to correctly clean-up state upon a failed connection because you don't need to check several hash maps.
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved
ongoing_dials: HashMap<ConnectionId, DialResultSender>,

ongoing_queries: BiHashMap<PeerId, QueryId>,
pending_messages: HashMap<PeerId, Vec<T>>,
pending_messages: HashMap<PeerId, Vec<T>>, // I'd recommend a timeout on how long you are willing to buffer messages. Otherwise this can be a memory-leak (and bad UX for the original sender).

// Some of this state may be easier to deal with if you create your own `NetworkBehaviour` and wrap `gossipsub` with it.
subscribed_topics: HashMap<TopicHash, (String, bool)>, // hash -> (topic, allow_unordered)
sequence_numbers: HashMap<(TopicHash, PeerId), u64>, // FIXME: Potential memory leak
active_connections: HashMap<PeerId, VecDeque<ConnectionId>>,
active_connections: HashMap<PeerId, VecDeque<ConnectionId>>, // HashMap<ConnectionId, PeerId> would be the better data structure here.
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved
swarm: Swarm<Behaviour<T>>,
bootstrap: bool,
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -670,13 +723,29 @@ impl<T: MsgContent> P2PTransport<T> {
log::info!("P2PTransport starting");
let mut bootstrap_timer = IntervalStream::new(interval(BOOTSTRAP_INTERVAL)).fuse();
loop {
// Personally, I am not a fan of `tokio::select` because it:
// a) forces an additional syntax
// b) has very nuanced behaviour in regards to cancellation
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved
//
// It isn't actually very difficult to write your own `poll`-based eventloop.
// You can just do what `futures::future::select` does: Call various poll APIs in a row,
// always hopping to the next one when the first one returns `Pending`.
// That way, you are enforcing syntactically that nothing `.await`s
// in the eventloop (see comment below).
// Blocking the eventloop will stall your application and should be avoided.
tokio::select! {
_ = cancel_token.cancelled() => break,

// There is now a built-in functionality to automatically bootstrap:
// https://github.com/libp2p/rust-libp2p/commit/6a916174b0d3c16d2cfcebe5b168690d41484d10
_ = bootstrap_timer.select_next_some() => {
if !self.bootstrap() {
break
}
},
// I would advise to _not_ block this event loop, i.e. don't call `.await` in here.
// Quickly scanning the code suggests that it is actually unnecessary.
// You may want to activate the following clippy lint: https://rust-lang.github.io/rust-clippy/master/#/unused_async
event = self.swarm.select_next_some() => self.handle_swarm_event(event).await.unwrap_or_else(|e| {
log::error!("Error handling swarm event: {e}")
}),
Expand Down Expand Up @@ -716,6 +785,10 @@ impl<T: MsgContent> P2PTransport<T> {
}

fn peer_addrs(&mut self, peer_id: PeerId) -> Vec<Multiaddr> {
// You aren't really supposed to call any APIs of `NetworkBehaviour` yourself.
// Some `NetworkBehaviour`s for example will change their state as part of these.
// There is no need to collect these addresses yourself.
// If you issue a dial to a certain `PeerId`, all known addresses will automatically be tried.
self.swarm
.behaviour_mut()
.handle_pending_outbound_connection(
Expand Down Expand Up @@ -758,6 +831,7 @@ impl<T: MsgContent> P2PTransport<T> {
}
e.insert((topic.to_string(), allow_unordered));
}

#[cfg(feature = "metrics")]
SUBSCRIBED_TOPICS.inc();
}
Expand Down Expand Up @@ -975,6 +1049,8 @@ impl<T: MsgContent> P2PTransport<T> {
identify::Event::Received { peer_id, info } => (peer_id, info.listen_addrs),
_ => return Ok(()),
};
// If you are keen for some contributions, this functionality would be reasonably easy to do
// directly in kademlia itself. See https://github.com/libp2p/rust-libp2p/issues/5313.
let kademlia = &mut self.swarm.behaviour_mut().kademlia;
listen_addrs.into_iter().filter(addr_is_reachable).for_each(|addr| {
kademlia.add_address(&peer_id, addr);
Expand Down Expand Up @@ -1013,13 +1089,30 @@ impl<T: MsgContent> P2PTransport<T> {
};

// Query reached the peer that was looked for. Send all pending messages.
// Instead of keeping this state out here, I would suggest writing your own `NetworkBehaviour` that
// wraps `libp2p-request-response`.
// You can then implement a function that buffers a message (with a timeout?) and only sends it
// once you have a connection to that peer. This is quite easy to detect by listening for the
// various events that a `NetworkBehaviour` gets called with.
//
// This way, the functionality of sending a message to a peer is de-coupled from the kademlia queries
// and it will be sent, regardless of how the connection is established which should be more resilient.
// You can also easily unit test that using https://docs.rs/libp2p-swarm-test/latest/libp2p_swarm_test/.
//
// If you don't want to just dial every peer that you discover, you can still keep a list of "peers we want to dial" within that new behaviour.
// You can then query this list in here as part of handling the kademlia event and only dial the peers you are interested in.
//
// In general, I'd recommend to attempt writing your networking logic as `NetworkBehaviour`.
// It will modularize your codebase and allow you to unit-test them.
// A lot of logic can be expressed by just reacting to the various events that are given to each `NetworkBehaviour`.
if peers.contains(&peer_id) {
self.ongoing_queries.remove_by_right(&query_id);
#[cfg(feature = "metrics")]
ONGOING_QUERIES.dec();
self.peer_found(peer_id);
}
// Query finished and the peer wasn't found. Drop all pending messages and dial requests.
// I think the below can be completely dropped in favor of a timeout within above said behaviour.
else if finished {
self.ongoing_queries.remove_by_right(&query_id);
#[cfg(feature = "metrics")]
Expand All @@ -1042,7 +1135,7 @@ impl<T: MsgContent> P2PTransport<T> {
fn peer_found(&mut self, peer_id: PeerId) {
log::debug!("Peer found: {peer_id}");
self.pending_dials.remove(&peer_id).into_iter().flatten().for_each(|rs| {
self.dial_peer(peer_id, rs);
self.dial_peer(peer_id, rs); // Attempting to send a message using `request-response` will already attempt to the dial the peer.
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "metrics")]
PENDING_DIALS.dec();
});
Expand Down Expand Up @@ -1076,6 +1169,10 @@ impl<T: MsgContent> P2PTransport<T> {
self.pending_dials.entry(peer_id).or_default().push(result_sender);
#[cfg(feature = "metrics")]
PENDING_DIALS.inc();
// I would suggest tying this sort of logic to events returned from either the `Swarm` or a behaviour.
// For example, `libp2p-request-response` has a `DialFailure` variant that you could hook into.
// https://docs.rs/libp2p-request-response/latest/libp2p_request_response/enum.Event.html#variant.OutboundFailure
// If you wanted to specifically react to `NoAddresses` then that could also be something you can propose changing upstream.
self.lookup_peer(peer_id);
}
Err(e) => {
Expand Down Expand Up @@ -1117,6 +1214,8 @@ impl<T: MsgContent> P2PTransport<T> {

let peer_conns = self.active_connections.entry(peer_id).or_default();
peer_conns.push_front(connection_id);

// Is there a reason you cannot use https://docs.rs/libp2p-connection-limits/latest/libp2p_connection_limits/struct.ConnectionLimits.html#method.with_max_established_per_peer?
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved
if peer_conns.len() > MAX_CONNS_PER_PEER as usize {
log::debug!("Connection limit reached for {peer_id}");
let conn_to_close = peer_conns.back().expect("not empty");
Expand Down
4 changes: 2 additions & 2 deletions transport/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub async fn get_keypair(path: Option<PathBuf>) -> anyhow::Result<Keypair> {
Err(_) => {
log::info!("Generating new key and saving into {}", path.display());
let keypair = ed25519::Keypair::generate();
tokio::fs::write(&path, keypair.to_bytes()).await?;
tokio::fs::write(&path, keypair.to_bytes()).await?; // `Keypair` has a function "to_protobuf_encoding" that uses a defined encoding to write the key. You could use that here instead of depending on the bytes representation of `ed25519`.
Ok(keypair.into())
}
}
Expand All @@ -35,7 +35,7 @@ pub fn addr_is_reachable(addr: &Multiaddr) -> bool {
Some(Protocol::Ip4(addr)) => {
!(addr.is_loopback() || addr.is_link_local())
// We need to allow private addresses for testing in local environment
&&(!addr.is_private() || std::env::var("PRIVATE_NETWORK").is_ok())
&&(!addr.is_private() || std::env::var("PRIVATE_NETWORK").is_ok()) // Side-effects like these are best passed as parameters.
}
Some(Protocol::Ip6(addr)) => !addr.is_loopback(),
Some(Protocol::Dns(_)) => true,
Expand Down