Skip to content

Commit

Permalink
kad: Reutilize substreams instead of askig yamux for new ones
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Dec 17, 2024
1 parent a248900 commit fcbdb00
Showing 1 changed file with 35 additions and 13 deletions.
48 changes: 35 additions & 13 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ pub(crate) struct Kademlia {
/// Pending outbound substreams.
pending_substreams: HashMap<SubstreamId, PeerId>,

/// Established substreams of connected peers.
established_substreams: HashMap<PeerId, Substream>,

/// Pending dials.
pending_dials: HashMap<PeerId, Vec<PeerAction>>,

Expand Down Expand Up @@ -205,6 +208,8 @@ impl Kademlia {
pending_dials: HashMap::new(),
executor: QueryExecutor::new(),
pending_substreams: HashMap::new(),
established_substreams: HashMap::new(),

update_mode: config.update_mode,
validation_mode: config.validation_mode,
record_ttl: config.record_ttl,
Expand Down Expand Up @@ -245,7 +250,7 @@ impl Kademlia {
context.add_pending_action(substream_id, action);
}
Err(error) => {
tracing::debug!(
tracing::warn!(
target: LOG_TARGET,
?peer,
?action,
Expand Down Expand Up @@ -326,7 +331,7 @@ impl Kademlia {
"pending action doesn't exist for peer, closing substream",
);

let _ = substream.close().await;
self.established_substreams.insert(peer, substream);
return Ok(());
}
Some(PeerAction::SendFindNode(query)) => {
Expand All @@ -347,11 +352,11 @@ impl Kademlia {
}
// query finished while the substream was being opened
None => {
let _ = substream.close().await;
self.established_substreams.insert(peer, substream);
}
action => {
tracing::warn!(target: LOG_TARGET, ?query, ?peer, ?action, "unexpected action for `FIND_NODE`");
let _ = substream.close().await;
self.established_substreams.insert(peer, substream);
debug_assert!(false);
}
}
Expand Down Expand Up @@ -744,15 +749,32 @@ impl Kademlia {
/// Handle next query action.
async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
match action {
QueryAction::SendMessage { query, peer, .. } => {
if self
.open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query))
.is_err()
{
// Announce the error to the query engine.
self.engine.register_response_failure(query, peer);
QueryAction::SendMessage {
query,
peer,
message,
} => {
if let Some(substream) = self.established_substreams.remove(&peer) {
tracing::trace!(
target: LOG_TARGET,
?peer,
query = ?query,
"start sending message to peer",
);

self.executor.send_request_read_response(peer, Some(query), message, substream);

Ok(())
} else {
if self
.open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query))
.is_err()
{
// Announce the error to the query engine.
self.engine.register_response_failure(query, peer);
}
Ok(())
}
Ok(())
}
QueryAction::FindNodeQuerySucceeded {
target,
Expand Down Expand Up @@ -943,7 +965,7 @@ impl Kademlia {
query = ?query_id,
"message sent to peer",
);
let _ = substream.close().await;
self.established_substreams.insert(peer, substream);
}
QueryResult::ReadSuccess { substream, message } => {
tracing::trace!(target: LOG_TARGET,
Expand Down

0 comments on commit fcbdb00

Please sign in to comment.