diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 64c80a61..7168b127 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -25,7 +25,10 @@ use crate::{ use futures::Stream; use multiaddr::Multiaddr; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{ + mpsc::{Receiver, Sender}, + oneshot, +}; use std::{ num::NonZeroUsize, @@ -88,8 +91,8 @@ pub(crate) enum KademliaCommand { /// Peer ID. peer: PeerId, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Store record to DHT. @@ -97,8 +100,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Store record to DHT to the given peers. @@ -108,8 +111,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, /// Use the following peers for the put request. peers: Vec, @@ -126,8 +129,8 @@ pub(crate) enum KademliaCommand { /// [`Quorum`] for the query. quorum: Quorum, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Register as a content provider for `key`. @@ -139,7 +142,7 @@ pub(crate) enum KademliaCommand { public_addresses: Vec, /// Query ID for the query. - query_id: QueryId, + query_id_tx: oneshot::Sender, }, /// Store record locally. @@ -232,27 +235,12 @@ pub struct KademliaHandle { /// RX channel for receiving events from `Kademlia`. event_rx: Receiver, - - /// Next query ID. - next_query_id: usize, } impl KademliaHandle { /// Create new [`KademliaHandle`]. pub(super) fn new(cmd_tx: Sender, event_rx: Receiver) -> Self { - Self { - cmd_tx, - event_rx, - next_query_id: 0usize, - } - } - - /// Allocate next query ID. - fn next_query_id(&mut self) -> QueryId { - let query_id = self.next_query_id; - self.next_query_id += 1; - - QueryId(query_id) + Self { cmd_tx, event_rx } } /// Add known peer. @@ -261,19 +249,32 @@ impl KademliaHandle { } /// Send `FIND_NODE` query to known peers. - pub async fn find_node(&mut self, peer: PeerId) -> QueryId { - let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await; - - query_id + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + pub async fn find_node(&mut self, peer: PeerId) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx + .send(KademliaCommand::FindNode { peer, query_id_tx }) + .await + .map_err(|_| ())?; + + query_id_rx.await.map_err(|_| ()) } /// Store record to DHT. - pub async fn put_record(&mut self, record: Record) -> QueryId { - let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await; + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + pub async fn put_record(&mut self, record: Record) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx + .send(KademliaCommand::PutRecord { + record, + query_id_tx, + }) + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Store record to DHT to the given peers. @@ -282,34 +283,34 @@ impl KademliaHandle { record: Record, peers: Vec, update_local_store: bool, - ) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + ) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::PutRecordToPeers { record, - query_id, + query_id_tx, peers, update_local_store, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Get record from DHT. - pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::GetRecord { key, quorum, - query_id, + query_id_tx, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Register as a content provider on the DHT. @@ -319,18 +320,18 @@ impl KademliaHandle { &mut self, key: RecordKey, public_addresses: Vec, - ) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + ) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::StartProviding { key, public_addresses, - query_id, + query_id_tx, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Store the record in the local store. Used in combination with diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index f89ed728..7698bde0 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -166,6 +166,9 @@ pub(crate) struct Kademlia { /// Query executor. executor: QueryExecutor, + + /// Next query ID. + next_query_id: usize, } impl Kademlia { @@ -207,6 +210,7 @@ impl Kademlia { provider_ttl: config.provider_ttl, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), + next_query_id: 0usize, } } @@ -938,7 +942,10 @@ impl Kademlia { }, command = self.cmd_rx.recv() => { match command { - Some(KademliaCommand::FindNode { peer, query_id }) => { + Some(KademliaCommand::FindNode { peer, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, ?peer, @@ -954,7 +961,10 @@ impl Kademlia { .into() ); } - Some(KademliaCommand::PutRecord { mut record, query_id }) => { + Some(KademliaCommand::PutRecord { mut record, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -982,10 +992,13 @@ impl Kademlia { } Some(KademliaCommand::PutRecordToPeers { mut record, - query_id, + query_id_tx, peers, update_local_store, }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1025,8 +1038,11 @@ impl Kademlia { Some(KademliaCommand::StartProviding { key, public_addresses, - query_id + query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1052,7 +1068,10 @@ impl Kademlia { .into(), ); } - Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { + Some(KademliaCommand::GetRecord { key, quorum, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); match (self.store.get(&key), quorum) { @@ -1135,6 +1154,14 @@ impl Kademlia { } } } + + /// Allocate next query ID. + fn next_query_id(&mut self) -> QueryId { + let query_id = self.next_query_id; + self.next_query_id += 1; + + QueryId(query_id) + } } #[cfg(test)]