From 104832058e337d5d90063f5ba4a53df33e6003fa Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 22 Aug 2024 15:15:19 +0000 Subject: [PATCH 01/26] Introduce `ADD_PROVIDER` query --- src/protocol/libp2p/kademlia/handle.rs | 36 ++++++- src/protocol/libp2p/kademlia/mod.rs | 113 +++++++++++++++++++--- src/protocol/libp2p/kademlia/query/mod.rs | 18 ++-- 3 files changed, 145 insertions(+), 22 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 5d3b4630..f1b4c218 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -130,6 +130,18 @@ pub(crate) enum KademliaCommand { query_id: QueryId, }, + /// Register as a content provider for `key`. + StartProviding { + /// Provided key. + key: RecordKey, + + /// Our external addresses to publish. + public_addresses: Vec, + + /// Query ID for the query. + query_id: QueryId, + }, + /// Store record locally. StoreRecord { // Record. @@ -175,7 +187,8 @@ pub enum KademliaEvent { }, /// `PUT_VALUE` query succeeded. - PutRecordSucess { + // TODO: this is never emitted. Implement + add `AddProviderSuccess`. + PutRecordSuccess { /// Query ID. query_id: QueryId, @@ -299,6 +312,27 @@ impl KademliaHandle { query_id } + /// Register as a content provider on the DHT. + /// + /// Register the local peer ID & its `public_addresses` as a provider for a given `key`. + pub async fn start_providing( + &mut self, + key: RecordKey, + public_addresses: Vec, + ) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx + .send(KademliaCommand::StartProviding { + key, + public_addresses, + query_id, + }) + .await; + + query_id + } + /// Store the record in the local store. Used in combination with /// [`IncomingRecordValidationMode::Manual`]. pub async fn store_record(&mut self, record: Record) { diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index b59a1fcf..c0f809e5 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -791,7 +791,11 @@ impl Kademlia { event = self.service.next() => match event { Some(TransportEvent::ConnectionEstablished { peer, .. }) => { if let Err(error) = self.on_connection_established(peer) { - tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection"); + tracing::debug!( + target: LOG_TARGET, + ?error, + "failed to handle established connection", + ); } } Some(TransportEvent::ConnectionClosed { peer }) => { @@ -801,7 +805,10 @@ impl Kademlia { match direction { Direction::Inbound => self.on_inbound_substream(peer, substream).await, Direction::Outbound(substream_id) => { - if let Err(error) = self.on_outbound_substream(peer, substream_id, substream).await { + if let Err(error) = self + .on_outbound_substream(peer, substream_id, substream) + .await + { tracing::debug!( target: LOG_TARGET, ?peer, @@ -816,7 +823,8 @@ impl Kademlia { Some(TransportEvent::SubstreamOpenFailure { substream, error }) => { self.on_substream_open_failure(substream, error).await; } - Some(TransportEvent::DialFailure { peer, address }) => self.on_dial_failure(peer, address), + Some(TransportEvent::DialFailure { peer, address }) => + self.on_dial_failure(peer, address), None => return Err(Error::EssentialTaskClosed), }, context = self.executor.next() => { @@ -824,14 +832,32 @@ impl Kademlia { match result { QueryResult::SendSuccess { substream } => { - tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer"); + tracing::trace!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "message sent to peer", + ); let _ = substream.close().await; } QueryResult::ReadSuccess { substream, message } => { - tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer"); + tracing::trace!(target: LOG_TARGET, + ?peer, + query = ?query_id, + "message read from peer", + ); - if let Err(error) = self.on_message_received(peer, query_id, message, substream).await { - tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message"); + if let Err(error) = self.on_message_received( + peer, + query_id, + message, + substream + ).await { + tracing::debug!(target: LOG_TARGET, + ?peer, + ?error, + "failed to process message", + ); } } QueryResult::SubstreamClosed | QueryResult::Timeout => { @@ -850,22 +876,36 @@ impl Kademlia { command = self.cmd_rx.recv() => { match command { Some(KademliaCommand::FindNode { peer, query_id }) => { - tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query"); + tracing::debug!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "starting `FIND_NODE` query", + ); self.engine.start_find_node( query_id, peer, - self.routing_table.closest(Key::from(peer), self.replication_factor).into() + self.routing_table + .closest(Key::from(peer), self.replication_factor) + .into() ); } Some(KademliaCommand::PutRecord { mut record, query_id }) => { - tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT"); + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + key = ?record.key, + "store record to DHT", + ); // For `PUT_VALUE` requests originating locally we are always the publisher. record.publisher = Some(self.local_key.clone().into_preimage()); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = record + .expires + .or_else(|| Some(Instant::now() + self.record_ttl)); let key = Key::new(record.key.clone()); @@ -877,11 +917,23 @@ impl Kademlia { self.routing_table.closest(key, self.replication_factor).into(), ); } - Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => { - tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers"); + Some(KademliaCommand::PutRecordToPeers { + mut record, + query_id, + peers, + update_local_store, + }) => { + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + key = ?record.key, + "store record to DHT to specified peers", + ); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = record + .expires + .or_else(|| Some(Instant::now() + self.record_ttl)); if update_local_store { self.store.put(record.clone()); @@ -895,7 +947,8 @@ impl Kademlia { match self.routing_table.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => Some(entry.clone()), - KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()), + KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => + Some(entry.clone()), _ => None, } }).collect(); @@ -906,6 +959,36 @@ impl Kademlia { peers, ); } + Some(KademliaCommand::StartProviding { + key, + public_addresses, + query_id + }) => { + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + ?key, + ?public_addresses, + "register as content provider" + ); + + let provider = ProviderRecord { + key: key.clone(), + provider: self.service.local_peer_id, + addresses: public_addresses, + expires: Instant::now() + self.provider_ttl, + }; + + self.store.put_provider(provider); + + self.engine.start_add_provider( + query_id, + provider, + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); + } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index f29af805..da293556 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -25,7 +25,7 @@ use crate::{ find_node::{FindNodeConfig, FindNodeContext}, get_record::{GetRecordConfig, GetRecordContext}, }, - record::{Key as RecordKey, Record}, + record::{Key as RecordKey, ProviderRecord, Record}, types::{KademliaPeer, Key}, PeerRecord, Quorum, }, @@ -45,8 +45,6 @@ mod get_record; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; -// TODO: store record key instead of the actual record - /// Type representing a query ID. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct QueryId(pub usize); @@ -56,7 +54,7 @@ pub struct QueryId(pub usize); enum QueryType { /// `FIND_NODE` query. FindNode { - /// Context for the `FIND_NODE` query + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -65,7 +63,7 @@ enum QueryType { /// Record that needs to be stored. record: Record, - /// Context for the `FIND_NODE` query + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -83,6 +81,15 @@ enum QueryType { /// Context for the `GET_VALUE` query. context: GetRecordContext, }, + + /// `ADD_PROVIDER` query. + AddProvider { + /// Provider record that need to be stored. + provider: ProviderRecord, + + /// Context for the `FIND_NODE` query. + context: FindNodeConfig, + }, } /// Query action. @@ -131,7 +138,6 @@ pub enum QueryAction { records: Vec, }, - // TODO: remove /// Query succeeded. QuerySucceeded { /// ID of the query that succeeded. From 778078b52790e82d6674e73a07cc32be5353dd9f Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 23 Aug 2024 15:33:22 +0000 Subject: [PATCH 02/26] Execute `ADD_PROVIDER` query --- src/protocol/libp2p/kademlia/message.rs | 7 ++- src/protocol/libp2p/kademlia/mod.rs | 60 ++++++++++++++++++++-- src/protocol/libp2p/kademlia/query/mod.rs | 61 ++++++++++++++++++++++- 3 files changed, 120 insertions(+), 8 deletions(-) diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index 4f53fbc1..bba2b285 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -172,8 +172,7 @@ impl KademliaMessage { } /// Create `ADD_PROVIDER` message with `provider`. - #[allow(unused)] - pub fn add_provider(provider: ProviderRecord) -> Vec { + pub fn add_provider(provider: ProviderRecord) -> Bytes { let peer = KademliaPeer::new( provider.provider, provider.addresses, @@ -187,10 +186,10 @@ impl KademliaMessage { ..Default::default() }; - let mut buf = Vec::with_capacity(message.encoded_len()); + let mut buf = BytesMut::with_capacity(message.encoded_len()); message.encode(&mut buf).expect("Vec to provide needed capacity"); - buf + buf.freeze() } /// Create `GET_PROVIDERS` request for `key`. diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index c0f809e5..fc6373cd 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -83,13 +83,16 @@ mod schema { } /// Peer action. -#[derive(Debug)] +#[derive(Debug, Clone)] enum PeerAction { /// Send `FIND_NODE` message to peer. SendFindNode(QueryId), /// Send `PUT_VALUE` message to peer. SendPutValue(Bytes), + + /// Send `ADD_PROVIDER` message to peer. + SendAddProvider(Bytes), } /// Peer context. @@ -335,7 +338,12 @@ impl Kademlia { } } Some(PeerAction::SendPutValue(message)) => { - tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` response"); + tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message"); + + self.executor.send_message(peer, message, substream); + } + Some(PeerAction::SendAddProvider(message)) => { + tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message"); self.executor.send_message(peer, message, substream); } @@ -755,6 +763,52 @@ impl Kademlia { Ok(()) } + QueryAction::AddProviderToFoundNodes { provider, peers } => { + tracing::trace!( + target: LOG_TARGET, + provided_key = ?provider.key, + num_peers = ?peers.len(), + "add provider record to found peers", + ); + + let provided_key = provider.key.clone(); + let message = KademliaMessage::add_provider(provider); + let peer_action = PeerAction::SendAddProvider(message); + + for peer in peers { + match self.service.open_substream(peer.peer) { + Ok(substream_id) => { + self.pending_substreams.insert(substream_id, peer.peer); + self.peers + .entry(peer.peer) + .or_default() + .pending_actions + .insert(substream_id, peer_action.clone()); + } + Err(_) => match self.service.dial(&peer.peer) { + Ok(_) => match self.pending_dials.entry(peer.peer) { + Entry::Occupied(entry) => { + entry.into_mut().push(peer_action.clone()); + } + Entry::Vacant(entry) => { + entry.insert(vec![peer_action.clone()]); + } + }, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?provided_key, + ?error, + "failed to dial peer", + ) + } + }, + } + } + + Ok(()) + } QueryAction::GetRecordQueryDone { query_id, records } => { let _ = self .event_tx @@ -979,7 +1033,7 @@ impl Kademlia { expires: Instant::now() + self.provider_ttl, }; - self.store.put_provider(provider); + self.store.put_provider(provider.clone()); self.engine.start_add_provider( query_id, diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index da293556..34f6e84e 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -88,7 +88,7 @@ enum QueryType { provider: ProviderRecord, /// Context for the `FIND_NODE` query. - context: FindNodeConfig, + context: FindNodeContext, }, } @@ -129,6 +129,15 @@ pub enum QueryAction { peers: Vec, }, + /// Add the provider record to nodes closest to the target key. + AddProviderToFoundNodes { + /// Provider record. + provider: ProviderRecord, + + /// Peers for whom the `ADD_PROVIDER` must be sent to. + peers: Vec, + }, + /// `GET_VALUE` query succeeded. GetRecordQueryDone { /// Query ID. @@ -314,6 +323,41 @@ impl QueryEngine { query_id } + /// Start `ADD_PROVIDER` query. + pub fn start_add_provider( + &mut self, + query_id: QueryId, + provider: ProviderRecord, + candidates: VecDeque, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + ?provider, + num_peers = ?candidates.len(), + "start `ADD_PROVIDER` query", + ); + + let target = Key::new(provider.key.clone()); + let config = FindNodeConfig { + local_peer_id: self.local_peer_id, + replication_factor: self.replication_factor, + parallelism_factor: self.parallelism_factor, + query: query_id, + target, + }; + + self.queries.insert( + query_id, + QueryType::AddProvider { + provider, + context: FindNodeContext::new(config, candidates), + }, + ); + + query_id + } + /// Register response failure from a queried peer. pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure"); @@ -334,6 +378,9 @@ impl QueryEngine { Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } + Some(QueryType::AddProvider { context, .. }) => { + context.register_response_failure(peer); + } } } @@ -369,6 +416,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::AddProvider { context, .. }) => match message { + KademliaMessage::FindNode { peers, .. } => { + context.register_response(peer, peers); + } + _ => unreachable!(), + }, } } @@ -385,6 +438,7 @@ impl QueryEngine { Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer), Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), + Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer), } } @@ -409,6 +463,10 @@ impl QueryEngine { query_id: context.config.query, records: context.found_records(), }, + QueryType::AddProvider { provider, context } => QueryAction::AddProviderToFoundNodes { + provider, + peers: context.responses.into_values().collect::>(), + }, } } @@ -428,6 +486,7 @@ impl QueryEngine { QueryType::PutRecord { context, .. } => context.next_action(), QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), + QueryType::AddProvider { context, .. } => context.next_action(), }; match action { From 323a26f58067a7f25c2b56442a5b9cbf9244a4e1 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 27 Aug 2024 14:15:30 +0000 Subject: [PATCH 03/26] Introduce local providers in `MemoryStore` --- src/protocol/libp2p/kademlia/store.rs | 140 +++++++++++++++++--------- 1 file changed, 91 insertions(+), 49 deletions(-) diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index fced9372..8d255913 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -21,8 +21,12 @@ //! Memory store implementation for Kademlia. #![allow(unused)] -use crate::protocol::libp2p::kademlia::record::{Key, ProviderRecord, Record}; +use crate::{ + protocol::libp2p::kademlia::record::{Key, ProviderRecord, Record}, + PeerId, +}; +use futures::{future::BoxFuture, stream::FuturesUnordered}; use std::{ collections::{hash_map::Entry, HashMap}, num::NonZeroUsize, @@ -36,30 +40,42 @@ pub enum MemoryStoreEvent {} /// Memory store. pub struct MemoryStore { + /// Local peer ID. Used to track local providers. + local_peer_id: PeerId, + /// Configuration. + config: MemoryStoreConfig, /// Records. records: HashMap, /// Provider records. provider_keys: HashMap>, - /// Configuration. - config: MemoryStoreConfig, + /// Local providers. + local_providers: HashMap, + /// Futures to signal it's time to republish a local provider. + pending_provider_republish: FuturesUnordered>, } impl MemoryStore { /// Create new [`MemoryStore`]. - pub fn new() -> Self { + pub fn new(local_peer_id: PeerId) -> Self { Self { + local_peer_id, + config: MemoryStoreConfig::default(), records: HashMap::new(), provider_keys: HashMap::new(), - config: MemoryStoreConfig::default(), + local_providers: HashMap::new(), + pending_provider_republish: FuturesUnordered::new(), } } /// Create new [`MemoryStore`] with the provided configuration. - pub fn with_config(config: MemoryStoreConfig) -> Self { + pub fn with_config(local_peer_id: PeerId, config: MemoryStoreConfig) -> Self { Self { + local_peer_id, + config, records: HashMap::new(), provider_keys: HashMap::new(), - config, + local_providers: HashMap::new(), + pending_provider_republish: FuturesUnordered::new(), } } @@ -263,7 +279,7 @@ mod tests { #[test] fn put_get_record() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record = Record::new(key.clone(), vec![4, 5, 6]); @@ -273,11 +289,14 @@ mod tests { #[test] fn max_records() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_records: 1, - max_record_size_bytes: 1024, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_records: 1, + max_record_size_bytes: 1024, + ..Default::default() + }, + ); let key1 = Key::from(vec![1, 2, 3]); let key2 = Key::from(vec![4, 5, 6]); @@ -293,7 +312,7 @@ mod tests { #[test] fn expired_record_removed() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record = Record { key: key.clone(), @@ -310,7 +329,7 @@ mod tests { #[test] fn new_record_overwrites() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record1 = Record { key: key.clone(), @@ -334,11 +353,14 @@ mod tests { #[test] fn max_record_size() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_records: 1024, - max_record_size_bytes: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_records: 1024, + max_record_size_bytes: 2, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let record = Record::new(key.clone(), vec![4, 5]); @@ -352,7 +374,7 @@ mod tests { #[test] fn put_get_provider() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), provider: PeerId::random(), @@ -366,7 +388,7 @@ mod tests { #[test] fn multiple_providers_per_key() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let provider1 = ProviderRecord { key: key.clone(), @@ -392,7 +414,7 @@ mod tests { #[test] fn providers_sorted_by_distance() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let providers = (0..10) .map(|_| ProviderRecord { @@ -418,10 +440,13 @@ mod tests { #[test] fn max_providers_per_key() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..20) .map(|_| ProviderRecord { @@ -440,10 +465,13 @@ mod tests { #[test] fn closest_providers_kept() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..20) .map(|_| ProviderRecord { @@ -470,10 +498,13 @@ mod tests { #[test] fn furthest_provider_discarded() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..11) .map(|_| ProviderRecord { @@ -503,10 +534,13 @@ mod tests { #[test] fn update_provider_in_place() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let peer_ids = (0..10).map(|_| PeerId::random()).collect::>(); let peer_id0 = peer_ids[0]; @@ -558,7 +592,7 @@ mod tests { #[test] fn provider_record_expires() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), provider: PeerId::random(), @@ -575,7 +609,7 @@ mod tests { #[test] fn individual_provider_record_expires() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let provider1 = ProviderRecord { key: key.clone(), @@ -600,10 +634,13 @@ mod tests { #[test] fn max_addresses_per_provider() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_provider_addresses: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_provider_addresses: 2, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), @@ -628,10 +665,13 @@ mod tests { #[test] fn max_provider_keys() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_provider_keys: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_provider_keys: 2, + ..Default::default() + }, + ); let provider1 = ProviderRecord { key: Key::from(vec![1, 2, 3]), @@ -660,4 +700,6 @@ mod tests { assert_eq!(store.get_providers(&provider2.key), vec![provider2]); assert_eq!(store.get_providers(&provider3.key), vec![]); } + + // TODO: test local providers. } From 8946809921642715a36baa7ea0c87b91a271ef99 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 28 Aug 2024 08:03:29 +0000 Subject: [PATCH 04/26] Add provider refresh interval to Kademlia config --- src/protocol/libp2p/kademlia/config.rs | 26 ++++++++++++++++++++++++-- src/protocol/libp2p/kademlia/mod.rs | 13 +++++++++++-- src/protocol/libp2p/kademlia/store.rs | 10 +++++++++- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/protocol/libp2p/kademlia/config.rs b/src/protocol/libp2p/kademlia/config.rs index 8a02a3e3..c9e79050 100644 --- a/src/protocol/libp2p/kademlia/config.rs +++ b/src/protocol/libp2p/kademlia/config.rs @@ -39,6 +39,9 @@ const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60); /// Default provider record TTL. const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60); +/// Default provider republish interval. +pub(super) const DEFAULT_PROVIDER_REFRESH_INTERVAL: Duration = Duration::from_secs(22 * 60 * 60); + /// Protocol name. const PROTOCOL_NAME: &str = "/ipfs/kad/1.0.0"; @@ -74,6 +77,9 @@ pub struct Config { /// Provider record TTL. pub(super) provider_ttl: Duration, + /// Provider republish interval. + pub(super) provider_refresh_interval: Duration, + /// TX channel for sending events to `KademliaHandle`. pub(super) event_tx: Sender, @@ -90,6 +96,7 @@ impl Config { validation_mode: IncomingRecordValidationMode, record_ttl: Duration, provider_ttl: Duration, + provider_refresh_interval: Duration, ) -> (Self, KademliaHandle) { let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE); let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE); @@ -106,6 +113,7 @@ impl Config { validation_mode, record_ttl, provider_ttl, + provider_refresh_interval, codec: ProtocolCodec::UnsignedVarint(None), replication_factor, known_peers, @@ -126,6 +134,7 @@ impl Config { IncomingRecordValidationMode::Automatic, DEFAULT_TTL, DEFAULT_PROVIDER_TTL, + DEFAULT_PROVIDER_REFRESH_INTERVAL, ) } } @@ -151,8 +160,11 @@ pub struct ConfigBuilder { /// Default TTL for the records. pub(super) record_ttl: Duration, - /// Default TTL for the provider records. + /// TTL for the provider records. pub(super) provider_ttl: Duration, + + /// Republish interval for the provider records. + pub(super) provider_refresh_interval: Duration, } impl Default for ConfigBuilder { @@ -172,6 +184,7 @@ impl ConfigBuilder { validation_mode: IncomingRecordValidationMode::Automatic, record_ttl: DEFAULT_TTL, provider_ttl: DEFAULT_PROVIDER_TTL, + provider_refresh_interval: DEFAULT_PROVIDER_REFRESH_INTERVAL, } } @@ -224,7 +237,7 @@ impl ConfigBuilder { self } - /// Set default TTL for the provider records. Recommended value is 2 * (refresh interval) + 20%. + /// Set TTL for the provider records. Recommended value is 2 * (refresh interval) + 10%. /// /// If unspecified, the default TTL is 48 hours. pub fn with_provider_record_ttl(mut self, provider_record_ttl: Duration) -> Self { @@ -232,6 +245,14 @@ impl ConfigBuilder { self } + /// Set the refresh (republish) interval for provider records. + /// + /// If unspecified, the default interval is 22 hours. + pub fn with_provider_refresh_interval(mut self, provider_refresh_interval: Duration) -> Self { + self.provider_refresh_interval = provider_refresh_interval; + self + } + /// Build Kademlia [`Config`]. pub fn build(self) -> (Config, KademliaHandle) { Config::new( @@ -242,6 +263,7 @@ impl ConfigBuilder { self.validation_mode, self.record_ttl, self.provider_ttl, + self.provider_refresh_interval, ) } } diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index fc6373cd..4a8f163a 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -31,7 +31,7 @@ use crate::{ query::{QueryAction, QueryEngine}, record::ProviderRecord, routing_table::RoutingTable, - store::MemoryStore, + store::{MemoryStore, MemoryStoreConfig}, types::{ConnectionType, KademliaPeer, Key}, }, Direction, TransportEvent, TransportService, @@ -181,12 +181,20 @@ impl Kademlia { service.add_known_address(&peer, addresses.into_iter()); } + let store = MemoryStore::with_config( + local_peer_id, + MemoryStoreConfig { + provider_refresh_interval: config.provider_refresh_interval, + ..Default::default() + }, + ); + Self { service, routing_table, peers: HashMap::new(), cmd_rx: config.cmd_rx, - store: MemoryStore::new(), + store, event_tx: config.event_tx, local_key, pending_dials: HashMap::new(), @@ -1156,6 +1164,7 @@ mod tests { validation_mode: IncomingRecordValidationMode::Automatic, record_ttl: Duration::from_secs(36 * 60 * 60), provider_ttl: Duration::from_secs(48 * 60 * 60), + provider_refresh_interval: Duration::from_secs(22 * 60 * 60), event_tx, cmd_rx, }; diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 8d255913..96321340 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -22,7 +22,10 @@ #![allow(unused)] use crate::{ - protocol::libp2p::kademlia::record::{Key, ProviderRecord, Record}, + protocol::libp2p::kademlia::{ + config::DEFAULT_PROVIDER_REFRESH_INTERVAL, + record::{Key, ProviderRecord, Record}, + }, PeerId, }; @@ -30,6 +33,7 @@ use futures::{future::BoxFuture, stream::FuturesUnordered}; use std::{ collections::{hash_map::Entry, HashMap}, num::NonZeroUsize, + time::Duration, }; /// Logging target for the file. @@ -254,6 +258,9 @@ pub struct MemoryStoreConfig { /// Maximum number of providers per key. Only providers with peer IDs closest to the key are /// kept. pub max_providers_per_key: usize, + + /// Local providers republish interval. + pub provider_refresh_interval: Duration, } impl Default for MemoryStoreConfig { @@ -264,6 +271,7 @@ impl Default for MemoryStoreConfig { max_provider_keys: 1024, max_provider_addresses: 30, max_providers_per_key: 20, + provider_refresh_interval: DEFAULT_PROVIDER_REFRESH_INTERVAL, } } } From d02aceee693793404779e1f9ad6fc692e04aa430 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 28 Aug 2024 13:28:09 +0000 Subject: [PATCH 05/26] Move `FuturesStream` to a separate file --- src/protocol/libp2p/kademlia/executor.rs | 58 ++------------- .../libp2p/kademlia/futures_stream.rs | 74 +++++++++++++++++++ src/protocol/libp2p/kademlia/mod.rs | 1 + 3 files changed, 82 insertions(+), 51 deletions(-) create mode 100644 src/protocol/libp2p/kademlia/futures_stream.rs diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index 9b8bd8ce..5d695346 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -18,15 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{protocol::libp2p::kademlia::query::QueryId, substream::Substream, PeerId}; +use crate::{ + protocol::libp2p::kademlia::{futures_stream::FuturesStream, query::QueryId}, + substream::Substream, + PeerId, +}; use bytes::{Bytes, BytesMut}; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{future::BoxFuture, Stream, StreamExt}; use std::{ - future::Future, pin::Pin, - task::{Context, Poll, Waker}, + task::{Context, Poll}, time::Duration, }; @@ -71,53 +74,6 @@ pub struct QueryContext { pub result: QueryResult, } -/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. -#[derive(Default)] -pub struct FuturesStream { - futures: FuturesUnordered, - waker: Option, -} - -impl FuturesStream { - /// Create new [`FuturesStream`]. - pub fn new() -> Self { - Self { - futures: FuturesUnordered::new(), - waker: None, - } - } - - /// Push a future for processing. - pub fn push(&mut self, future: F) { - self.futures.push(future); - - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } -} - -impl Stream for FuturesStream { - type Item = ::Output; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else { - // We must save the current waker to wake up the task when new futures are inserted. - // - // Otherwise, simply returning `Poll::Pending` here would cause the task to never be - // woken up again. - // - // We were previously relying on some other task from the `loop tokio::select!` to - // finish. - self.waker = Some(cx.waker().clone()); - - return Poll::Pending; - }; - - Poll::Ready(Some(result)) - } -} - /// Query executor. pub struct QueryExecutor { /// Pending futures. diff --git a/src/protocol/libp2p/kademlia/futures_stream.rs b/src/protocol/libp2p/kademlia/futures_stream.rs new file mode 100644 index 00000000..1f208f88 --- /dev/null +++ b/src/protocol/libp2p/kademlia/futures_stream.rs @@ -0,0 +1,74 @@ +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{stream::FuturesUnordered, Stream, StreamExt}; + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll, Waker}, +}; + +/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. +#[derive(Default)] +pub struct FuturesStream { + futures: FuturesUnordered, + waker: Option, +} + +impl FuturesStream { + /// Create new [`FuturesStream`]. + pub fn new() -> Self { + Self { + futures: FuturesUnordered::new(), + waker: None, + } + } + + /// Push a future for processing. + pub fn push(&mut self, future: F) { + self.futures.push(future); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +impl Stream for FuturesStream { + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else { + // We must save the current waker to wake up the task when new futures are inserted. + // + // Otherwise, simply returning `Poll::Pending` here would cause the task to never be + // woken up again. + // + // We were previously relying on some other task from the `loop tokio::select!` to + // finish. + self.waker = Some(cx.waker().clone()); + + return Poll::Pending; + }; + + Poll::Ready(Some(result)) + } +} diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 4a8f163a..63f9fcac 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -68,6 +68,7 @@ const PARALLELISM_FACTOR: usize = 3; mod bucket; mod config; mod executor; +mod futures_stream; mod handle; mod message; mod query; From 5432c9b0a0bd7d1db4e526de934f9cbbccb6ef3c Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 28 Aug 2024 14:07:31 +0000 Subject: [PATCH 06/26] Refresh providers: dry-run without network queries --- .../libp2p/kademlia/futures_stream.rs | 2 + src/protocol/libp2p/kademlia/mod.rs | 29 ++++++++- src/protocol/libp2p/kademlia/store.rs | 65 ++++++++++++++----- 3 files changed, 79 insertions(+), 17 deletions(-) diff --git a/src/protocol/libp2p/kademlia/futures_stream.rs b/src/protocol/libp2p/kademlia/futures_stream.rs index 1f208f88..9c7d8039 100644 --- a/src/protocol/libp2p/kademlia/futures_stream.rs +++ b/src/protocol/libp2p/kademlia/futures_stream.rs @@ -27,6 +27,8 @@ use std::{ }; /// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. +/// The [`Stream`] implemented by [`FuturesStream`] never terminates and can be +/// polled when contains no futures. #[derive(Default)] pub struct FuturesStream { futures: FuturesUnordered, diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 63f9fcac..f89ed728 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -31,7 +31,7 @@ use crate::{ query::{QueryAction, QueryEngine}, record::ProviderRecord, routing_table::RoutingTable, - store::{MemoryStore, MemoryStoreConfig}, + store::{MemoryStore, MemoryStoreAction, MemoryStoreConfig}, types::{ConnectionType, KademliaPeer, Key}, }, Direction, TransportEvent, TransportService, @@ -935,7 +935,7 @@ impl Kademlia { self.disconnect_peer(peer, query_id).await; } } - } + }, command = self.cmd_rx.recv() => { match command { Some(KademliaCommand::FindNode { peer, query_id }) => { @@ -1107,6 +1107,31 @@ impl Kademlia { None => return Err(Error::EssentialTaskClosed), } }, + action = self.store.next_action() => match action { + Some(MemoryStoreAction::RefreshProvider { mut provider }) => { + tracing::trace!( + target: LOG_TARGET, + key = ?provider.key, + "republishing local provider", + ); + + // Make sure to roll expiration time. + provider.expires = Instant::now() + self.provider_ttl; + + self.store.put_provider(provider.clone()); + + + todo!("obtain a query ID and start query"); + // self.engine.start_add_provider( + // query_id, + // provider, + // self.routing_table + // .closest(Key::new(provider.key), self.replication_factor) + // .into(), + // ); + } + None => {} + } } } } diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 96321340..e5410f94 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -24,12 +24,13 @@ use crate::{ protocol::libp2p::kademlia::{ config::DEFAULT_PROVIDER_REFRESH_INTERVAL, + futures_stream::FuturesStream, record::{Key, ProviderRecord, Record}, }, PeerId, }; -use futures::{future::BoxFuture, stream::FuturesUnordered}; +use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use std::{ collections::{hash_map::Entry, HashMap}, num::NonZeroUsize, @@ -40,7 +41,9 @@ use std::{ const LOG_TARGET: &str = "litep2p::ipfs::kademlia::store"; /// Memory store events. -pub enum MemoryStoreEvent {} +pub enum MemoryStoreAction { + RefreshProvider { provider: ProviderRecord }, +} /// Memory store. pub struct MemoryStore { @@ -55,7 +58,7 @@ pub struct MemoryStore { /// Local providers. local_providers: HashMap, /// Futures to signal it's time to republish a local provider. - pending_provider_republish: FuturesUnordered>, + pending_provider_refresh: FuturesStream>, } impl MemoryStore { @@ -67,7 +70,7 @@ impl MemoryStore { records: HashMap::new(), provider_keys: HashMap::new(), local_providers: HashMap::new(), - pending_provider_republish: FuturesUnordered::new(), + pending_provider_refresh: FuturesStream::new(), } } @@ -79,7 +82,7 @@ impl MemoryStore { records: HashMap::new(), provider_keys: HashMap::new(), local_providers: HashMap::new(), - pending_provider_republish: FuturesUnordered::new(), + pending_provider_refresh: FuturesStream::new(), } } @@ -204,9 +207,7 @@ impl MemoryStore { match provider_position { Ok(i) => { // Update the provider in place. - providers[i] = provider_record; - - true + providers[i] = provider_record.clone(); } Err(i) => { // `Err(i)` contains the insertion point. @@ -220,25 +221,59 @@ impl MemoryStore { existing `max_providers_per_key`", ); - false + return false; } else { if providers.len() == usize::from(self.config.max_providers_per_key) { providers.pop(); } - providers.insert(i, provider_record); - - true + providers.insert(i, provider_record.clone()); } } } + + if provider_record.provider == self.local_peer_id { + // We must make sure to refresh the local provider. + let key = provider_record.key.clone(); + let refresh_interval = self.config.provider_refresh_interval; + self.local_providers.insert(key.clone(), provider_record); + self.pending_provider_refresh.push(Box::pin(async move { + tokio::time::sleep(refresh_interval).await; + key + })); + } + + true } } } - /// Poll next event from the store. - async fn next_event() -> Option { - None + /// Poll next action from the store. + pub async fn next_action(&mut self) -> Option { + // [`FuturesStream`] never terminates, so `map()` below is always triggered. + self.pending_provider_refresh + .next() + .await + .map(|key| { + if let Some(provider) = self.local_providers.get(&key).cloned() { + tracing::trace!( + target: LOG_TARGET, + ?key, + "refresh provider" + ); + + Some(MemoryStoreAction::RefreshProvider { provider }) + } else { + tracing::trace!( + target: LOG_TARGET, + ?key, + "it's time to refresh a provider, but we do not provide this key anymore", + ); + + None + } + }) + .flatten() } } From feb493df9c08176a5a544f835ee1436f81c78296 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 29 Aug 2024 13:58:53 +0000 Subject: [PATCH 07/26] Remove `try_get_record()` and other `try_...()` non-async methods --- src/protocol/libp2p/kademlia/handle.rs | 64 -------------------------- 1 file changed, 64 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index f1b4c218..64c80a61 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -338,70 +338,6 @@ impl KademliaHandle { pub async fn store_record(&mut self, record: Record) { let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await; } - - /// Try to add known peer and if the channel is clogged, return an error. - pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec) -> Result<(), ()> { - self.cmd_tx - .try_send(KademliaCommand::AddKnownPeer { peer, addresses }) - .map_err(|_| ()) - } - - /// Try to initiate `FIND_NODE` query and if the channel is clogged, return an error. - pub fn try_find_node(&mut self, peer: PeerId) -> Result { - let query_id = self.next_query_id(); - self.cmd_tx - .try_send(KademliaCommand::FindNode { peer, query_id }) - .map(|_| query_id) - .map_err(|_| ()) - } - - /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error. - pub fn try_put_record(&mut self, record: Record) -> Result { - let query_id = self.next_query_id(); - self.cmd_tx - .try_send(KademliaCommand::PutRecord { record, query_id }) - .map(|_| query_id) - .map_err(|_| ()) - } - - /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged, - /// return an error. - pub fn try_put_record_to_peers( - &mut self, - record: Record, - peers: Vec, - update_local_store: bool, - ) -> Result { - let query_id = self.next_query_id(); - self.cmd_tx - .try_send(KademliaCommand::PutRecordToPeers { - record, - query_id, - peers, - update_local_store, - }) - .map(|_| query_id) - .map_err(|_| ()) - } - - /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error. - pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { - let query_id = self.next_query_id(); - self.cmd_tx - .try_send(KademliaCommand::GetRecord { - key, - quorum, - query_id, - }) - .map(|_| query_id) - .map_err(|_| ()) - } - - /// Try to store the record in the local store, and if the channel is clogged, return an error. - /// Used in combination with [`IncomingRecordValidationMode::Manual`]. - pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> { - self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ()) - } } impl Stream for KademliaHandle { From 982c73dcf0b02ba03febe4e892d69411bf41a207 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 29 Aug 2024 13:59:44 +0000 Subject: [PATCH 08/26] Move query ID generation from `KademliaHandle` to `Kademlia` --- src/protocol/libp2p/kademlia/handle.rs | 113 +++++++++++++------------ src/protocol/libp2p/kademlia/mod.rs | 37 ++++++-- 2 files changed, 89 insertions(+), 61 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 64c80a61..7168b127 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -25,7 +25,10 @@ use crate::{ use futures::Stream; use multiaddr::Multiaddr; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{ + mpsc::{Receiver, Sender}, + oneshot, +}; use std::{ num::NonZeroUsize, @@ -88,8 +91,8 @@ pub(crate) enum KademliaCommand { /// Peer ID. peer: PeerId, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Store record to DHT. @@ -97,8 +100,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Store record to DHT to the given peers. @@ -108,8 +111,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, /// Use the following peers for the put request. peers: Vec, @@ -126,8 +129,8 @@ pub(crate) enum KademliaCommand { /// [`Quorum`] for the query. quorum: Quorum, - /// Query ID for the query. - query_id: QueryId, + /// Query ID callback. + query_id_tx: oneshot::Sender, }, /// Register as a content provider for `key`. @@ -139,7 +142,7 @@ pub(crate) enum KademliaCommand { public_addresses: Vec, /// Query ID for the query. - query_id: QueryId, + query_id_tx: oneshot::Sender, }, /// Store record locally. @@ -232,27 +235,12 @@ pub struct KademliaHandle { /// RX channel for receiving events from `Kademlia`. event_rx: Receiver, - - /// Next query ID. - next_query_id: usize, } impl KademliaHandle { /// Create new [`KademliaHandle`]. pub(super) fn new(cmd_tx: Sender, event_rx: Receiver) -> Self { - Self { - cmd_tx, - event_rx, - next_query_id: 0usize, - } - } - - /// Allocate next query ID. - fn next_query_id(&mut self) -> QueryId { - let query_id = self.next_query_id; - self.next_query_id += 1; - - QueryId(query_id) + Self { cmd_tx, event_rx } } /// Add known peer. @@ -261,19 +249,32 @@ impl KademliaHandle { } /// Send `FIND_NODE` query to known peers. - pub async fn find_node(&mut self, peer: PeerId) -> QueryId { - let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await; - - query_id + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + pub async fn find_node(&mut self, peer: PeerId) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx + .send(KademliaCommand::FindNode { peer, query_id_tx }) + .await + .map_err(|_| ())?; + + query_id_rx.await.map_err(|_| ()) } /// Store record to DHT. - pub async fn put_record(&mut self, record: Record) -> QueryId { - let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await; + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + pub async fn put_record(&mut self, record: Record) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx + .send(KademliaCommand::PutRecord { + record, + query_id_tx, + }) + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Store record to DHT to the given peers. @@ -282,34 +283,34 @@ impl KademliaHandle { record: Record, peers: Vec, update_local_store: bool, - ) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + ) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::PutRecordToPeers { record, - query_id, + query_id_tx, peers, update_local_store, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Get record from DHT. - pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::GetRecord { key, quorum, - query_id, + query_id_tx, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Register as a content provider on the DHT. @@ -319,18 +320,18 @@ impl KademliaHandle { &mut self, key: RecordKey, public_addresses: Vec, - ) -> QueryId { - let query_id = self.next_query_id(); - let _ = self - .cmd_tx + ) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx .send(KademliaCommand::StartProviding { key, public_addresses, - query_id, + query_id_tx, }) - .await; + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Store the record in the local store. Used in combination with diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index f89ed728..7698bde0 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -166,6 +166,9 @@ pub(crate) struct Kademlia { /// Query executor. executor: QueryExecutor, + + /// Next query ID. + next_query_id: usize, } impl Kademlia { @@ -207,6 +210,7 @@ impl Kademlia { provider_ttl: config.provider_ttl, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), + next_query_id: 0usize, } } @@ -938,7 +942,10 @@ impl Kademlia { }, command = self.cmd_rx.recv() => { match command { - Some(KademliaCommand::FindNode { peer, query_id }) => { + Some(KademliaCommand::FindNode { peer, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, ?peer, @@ -954,7 +961,10 @@ impl Kademlia { .into() ); } - Some(KademliaCommand::PutRecord { mut record, query_id }) => { + Some(KademliaCommand::PutRecord { mut record, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -982,10 +992,13 @@ impl Kademlia { } Some(KademliaCommand::PutRecordToPeers { mut record, - query_id, + query_id_tx, peers, update_local_store, }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1025,8 +1038,11 @@ impl Kademlia { Some(KademliaCommand::StartProviding { key, public_addresses, - query_id + query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1052,7 +1068,10 @@ impl Kademlia { .into(), ); } - Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { + Some(KademliaCommand::GetRecord { key, quorum, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); match (self.store.get(&key), quorum) { @@ -1135,6 +1154,14 @@ impl Kademlia { } } } + + /// Allocate next query ID. + fn next_query_id(&mut self) -> QueryId { + let query_id = self.next_query_id; + self.next_query_id += 1; + + QueryId(query_id) + } } #[cfg(test)] From 68c7a87b29d0a5225dd06f0006ce1115023ed63f Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 29 Aug 2024 14:02:55 +0000 Subject: [PATCH 09/26] Republish providers --- src/protocol/libp2p/kademlia/mod.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 7698bde0..c6d306a1 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1139,15 +1139,15 @@ impl Kademlia { self.store.put_provider(provider.clone()); - - todo!("obtain a query ID and start query"); - // self.engine.start_add_provider( - // query_id, - // provider, - // self.routing_table - // .closest(Key::new(provider.key), self.replication_factor) - // .into(), - // ); + let key = provider.key.clone(); + let query_id = self.next_query_id(); + self.engine.start_add_provider( + query_id, + provider, + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); } None => {} } From 5e97484bfc2f54b2477c563aea4b193480d7f90e Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 30 Aug 2024 14:48:13 +0000 Subject: [PATCH 10/26] Use getter `TransportService::local_peer_id()` instead of accessing directly --- src/protocol/libp2p/kademlia/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index a6135dbc..25976bdd 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1031,7 +1031,7 @@ impl Kademlia { let provider = ProviderRecord { key: key.clone(), - provider: self.service.local_peer_id, + provider: self.service.local_peer_id(), addresses: public_addresses, expires: Instant::now() + self.provider_ttl, }; From 7caf29002bd085e52b1bfbd6d2cb96ac11dadad8 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 30 Aug 2024 14:48:13 +0000 Subject: [PATCH 11/26] Use getter `TransportService::local_peer_id()` instead of accessing directly --- src/protocol/libp2p/kademlia/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index edd7905a..973527ad 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1056,7 +1056,7 @@ impl Kademlia { let provider = ProviderRecord { key: key.clone(), - provider: self.service.local_peer_id, + provider: self.service.local_peer_id(), addresses: public_addresses, expires: Instant::now() + self.provider_ttl, }; From 60b8fb6104670c6a4354ee006111ef43a20b38c0 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 27 Aug 2024 12:10:23 +0000 Subject: [PATCH 12/26] Introduce `GET_PROVIDERS` query --- src/protocol/libp2p/kademlia/handle.rs | 30 ++++++++++++++++++++++++- src/protocol/libp2p/kademlia/message.rs | 1 - src/protocol/libp2p/kademlia/mod.rs | 11 ++++++++- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 7168b127..a5ff839e 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocol::libp2p::kademlia::{PeerRecord, QueryId, Record, RecordKey}, + protocol::libp2p::kademlia::{KademliaPeer, PeerRecord, QueryId, Record, RecordKey}, PeerId, }; @@ -133,6 +133,15 @@ pub(crate) enum KademliaCommand { query_id_tx: oneshot::Sender, }, + /// Get providers from DHT. + GetProviders { + /// Provided key. + key: RecordKey, + + /// Query ID for the query. + query_id: QueryId, + }, + /// Register as a content provider for `key`. StartProviding { /// Provided key. @@ -189,6 +198,17 @@ pub enum KademliaEvent { records: RecordsType, }, + /// `GET_PROVIDERS` query succeeded. + GetProvidersSuccess { + /// Query ID. + query_id: QueryId, + + /// Found providers with cached addresses. + // TODO: return only `max_providers_per_key` providers from the peers closest to the + // provided key. + providers: Vec, + }, + /// `PUT_VALUE` query succeeded. // TODO: this is never emitted. Implement + add `AddProviderSuccess`. PutRecordSuccess { @@ -334,6 +354,14 @@ impl KademliaHandle { query_id_rx.await.map_err(|_| ()) } + /// Get providers from DHT. + pub async fn get_providers(&mut self, key: RecordKey) -> QueryId { + let query_id = self.next_query_id(); + let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await; + + query_id + } + /// Store the record in the local store. Used in combination with /// [`IncomingRecordValidationMode::Manual`]. pub async fn store_record(&mut self, record: Record) { diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index bba2b285..76d80be4 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -193,7 +193,6 @@ impl KademliaMessage { } /// Create `GET_PROVIDERS` request for `key`. - #[allow(unused)] pub fn get_providers_request(key: RecordKey) -> Vec { let message = schema::kademlia::Message { key: key.to_vec(), diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 973527ad..d0de62d8 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1088,7 +1088,7 @@ impl Kademlia { self.engine.start_get_record( query_id, key.clone(), - self.routing_table.closest(Key::new(key.clone()), self.replication_factor).into(), + self.routing_table.closest(Key::new(key), self.replication_factor).into(), quorum, if record.is_some() { 1 } else { 0 }, ); @@ -1096,6 +1096,15 @@ impl Kademlia { } } + Some(KademliaCommand::GetProviders { key, query_id}) => { + tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT"); + + self.engine.start_get_providers( + query_id, + key.clone(), + self.routing_table.closest(Key::new(key), self.replication_factor), + ); + } Some(KademliaCommand::AddKnownPeer { peer, addresses }) => { tracing::trace!( target: LOG_TARGET, From d340c35c61e24843f934852e193bee6b3e146763 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 30 Aug 2024 14:43:42 +0000 Subject: [PATCH 13/26] Update `get_providers()` to receive query ID via oneshot channel --- src/protocol/libp2p/kademlia/handle.rs | 20 +++++++++++++++----- src/protocol/libp2p/kademlia/mod.rs | 15 +++++++++------ 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index a5ff839e..2b31db6d 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -139,7 +139,7 @@ pub(crate) enum KademliaCommand { key: RecordKey, /// Query ID for the query. - query_id: QueryId, + query_id_tx: oneshot::Sender, }, /// Register as a content provider for `key`. @@ -298,6 +298,8 @@ impl KademliaHandle { } /// Store record to DHT to the given peers. + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. pub async fn put_record_to_peers( &mut self, record: Record, @@ -319,6 +321,8 @@ impl KademliaHandle { } /// Get record from DHT. + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { let (query_id_tx, query_id_rx) = oneshot::channel(); self.cmd_tx @@ -336,6 +340,7 @@ impl KademliaHandle { /// Register as a content provider on the DHT. /// /// Register the local peer ID & its `public_addresses` as a provider for a given `key`. + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. pub async fn start_providing( &mut self, key: RecordKey, @@ -355,11 +360,16 @@ impl KademliaHandle { } /// Get providers from DHT. - pub async fn get_providers(&mut self, key: RecordKey) -> QueryId { - let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await; + /// + /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + pub async fn get_providers(&mut self, key: RecordKey) -> Result { + let (query_id_tx, query_id_rx) = oneshot::channel(); + self.cmd_tx + .send(KademliaCommand::GetProviders { key, query_id_tx }) + .await + .map_err(|_| ())?; - query_id + query_id_rx.await.map_err(|_| ()) } /// Store the record in the local store. Used in combination with diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index d0de62d8..16ae2bc5 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -1096,14 +1096,17 @@ impl Kademlia { } } - Some(KademliaCommand::GetProviders { key, query_id}) => { + Some(KademliaCommand::GetProviders { key, query_id_tx }) => { + let query_id = self.next_query_id(); + let _ = query_id_tx.send(query_id); + tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT"); - self.engine.start_get_providers( - query_id, - key.clone(), - self.routing_table.closest(Key::new(key), self.replication_factor), - ); + // self.engine.start_get_providers( + // query_id, + // key.clone(), + // self.routing_table.closest(Key::new(key), self.replication_factor), + // ); } Some(KademliaCommand::AddKnownPeer { peer, addresses }) => { tracing::trace!( From 10b96db74a4e4a555a52559e04292f1c741039ec Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 30 Aug 2024 15:09:24 +0000 Subject: [PATCH 14/26] Make lines fit into 100 characters --- src/protocol/libp2p/kademlia/mod.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 16ae2bc5..cc7d87e1 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -975,7 +975,8 @@ impl Kademlia { "store record to DHT", ); - // For `PUT_VALUE` requests originating locally we are always the publisher. + // For `PUT_VALUE` requests originating locally we are always the + // publisher. record.publisher = Some(self.local_key.clone().into_preimage()); // Make sure TTL is set. @@ -1081,14 +1082,19 @@ impl Kademlia { (Some(record), Quorum::One) => { let _ = self .event_tx - .send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) }) + .send(KademliaEvent::GetRecordSuccess { + query_id, + records: RecordsType::LocalStore(record.clone()), + }) .await; } (record, _) => { self.engine.start_get_record( query_id, key.clone(), - self.routing_table.closest(Key::new(key), self.replication_factor).into(), + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), quorum, if record.is_some() { 1 } else { 0 }, ); @@ -1105,7 +1111,9 @@ impl Kademlia { // self.engine.start_get_providers( // query_id, // key.clone(), - // self.routing_table.closest(Key::new(key), self.replication_factor), + // self.routing_table + // .closest(Key::new(key), self.replication_factor) + // .into(), // ); } Some(KademliaCommand::AddKnownPeer { peer, addresses }) => { @@ -1121,7 +1129,10 @@ impl Kademlia { addresses.clone(), self.peers .get(&peer) - .map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected), + .map_or( + ConnectionType::NotConnected, + |_| ConnectionType::Connected, + ), ); self.service.add_known_address(&peer, addresses.into_iter()); @@ -1134,7 +1145,8 @@ impl Kademlia { ); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = + record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); self.store.put(record); } From 6146540bfd4d6bbfb873155c46a8ddc53efefbae Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 2 Sep 2024 12:22:09 +0000 Subject: [PATCH 15/26] Introduce `GetProvidersContext` & `GetProvidersConfig` --- src/protocol/libp2p/kademlia/message.rs | 10 +- .../libp2p/kademlia/query/get_providers.rs | 242 ++++++++++++++++++ .../libp2p/kademlia/query/get_record.rs | 34 ++- 3 files changed, 276 insertions(+), 10 deletions(-) create mode 100644 src/protocol/libp2p/kademlia/query/get_providers.rs diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index 76d80be4..edf5b473 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -187,13 +187,13 @@ impl KademliaMessage { }; let mut buf = BytesMut::with_capacity(message.encoded_len()); - message.encode(&mut buf).expect("Vec to provide needed capacity"); + message.encode(&mut buf).expect("BytesMut to provide needed capacity"); buf.freeze() } /// Create `GET_PROVIDERS` request for `key`. - pub fn get_providers_request(key: RecordKey) -> Vec { + pub fn get_providers_request(key: RecordKey) -> Bytes { let message = schema::kademlia::Message { key: key.to_vec(), cluster_level_raw: 10, @@ -201,10 +201,10 @@ impl KademliaMessage { ..Default::default() }; - let mut buf = Vec::with_capacity(message.encoded_len()); - message.encode(&mut buf).expect("Vec to provide needed capacity"); + let mut buf = BytesMut::with_capacity(message.encoded_len()); + message.encode(&mut buf).expect("BytesMut to provide needed capacity"); - buf + buf.freeze() } /// Create `GET_PROVIDERS` response. diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs new file mode 100644 index 00000000..51d1aaa6 --- /dev/null +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -0,0 +1,242 @@ +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; + +use crate::{ + protocol::libp2p::kademlia::{ + message::KademliaMessage, + query::{QueryAction, QueryId}, + record::{Key as RecordKey, ProviderRecord}, + types::{Distance, KademliaPeer, Key}, + }, + PeerId, +}; + +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; + +/// Logging target for the file. +const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::get_record"; + +/// The configuration needed to instantiate a new [`GetProvidersContext`]. +#[derive(Debug)] +pub struct GetProvidersConfig { + /// Local peer ID. + pub local_peer_id: PeerId, + + /// Parallelism factor. + pub parallelism_factor: usize, + + /// Query ID. + pub query: QueryId, + + /// Target key. + pub target: Key, +} + +#[derive(Debug)] +pub struct GetProvidersContext { + /// Query immutable config. + pub config: GetProvidersConfig, + + /// Cached Kademlia message to send. + kad_message: Bytes, + + /// Peers from whom the `QueryEngine` is waiting to hear a response. + pub pending: HashMap, + + /// Queried candidates. + /// + /// These are the peers for whom the query has already been sent + /// and who have either returned their closest peers or failed to answer. + pub queried: HashSet, + + /// Candidates. + pub candidates: BTreeMap, + + /// Found providers. + // TODO: deduplicate. + pub found_providers: Vec, +} + +impl GetProvidersContext { + /// Create new [`GetProvidersContext`]. + pub fn new(config: GetProvidersConfig, in_peers: VecDeque) -> Self { + let mut candidates = BTreeMap::new(); + + for candidate in &in_peers { + let distance = config.target.distance(&candidate.key); + candidates.insert(distance, candidate.clone()); + } + + let kad_message = + KademliaMessage::get_providers_request(config.target.clone().into_preimage()); + + Self { + config, + kad_message, + candidates, + pending: HashMap::new(), + queried: HashSet::new(), + found_providers: Vec::new(), + } + } + + /// Get the found providers. + pub fn found_providers(self) -> Vec { + self.found_providers + } + + /// Register response failure for `peer`. + pub fn register_response_failure(&mut self, peer: PeerId) { + let Some(peer) = self.pending.remove(&peer) else { + tracing::debug!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetProvidersContext`: pending peer doesn't exist", + ); + return; + }; + + self.queried.insert(peer.peer); + } + + /// Register `GET_PROVIDERS` response from `peer`. + pub fn register_response( + &mut self, + peer: PeerId, + providers: impl IntoIterator, + closer_peers: impl IntoIterator, + ) { + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetProvidersContext`: received response from peer", + ); + + let Some(peer) = self.pending.remove(&peer) else { + tracing::debug!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetProvidersContext`: received response from peer but didn't expect it", + ); + return; + }; + + self.found_providers.extend(providers.into_iter()); + + // Add the queried peer to `queried` and all new peers which haven't been + // queried to `candidates` + self.queried.insert(peer.peer); + + let to_query_candidate = closer_peers.into_iter().filter_map(|peer| { + // Peer already produced a response. + if self.queried.contains(&peer.peer) { + return None; + } + + // Peer was queried, awaiting response. + if self.pending.contains_key(&peer.peer) { + return None; + } + + // Local node. + if self.config.local_peer_id == peer.peer { + return None; + } + + Some(peer) + }); + + for candidate in to_query_candidate { + let distance = self.config.target.distance(&candidate.key); + self.candidates.insert(distance, candidate); + } + } + + /// Get next action for `peer`. + // TODO: remove this and store the next action to `PeerAction` + pub fn next_peer_action(&mut self, peer: &PeerId) -> Option { + self.pending.contains_key(peer).then_some(QueryAction::SendMessage { + query: self.config.query, + peer: *peer, + message: self.kad_message.clone(), + }) + } + + /// Schedule next peer for outbound `GET_VALUE` query. + fn schedule_next_peer(&mut self) -> Option { + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + "`GetProvidersContext`: get next peer", + ); + + let (_, candidate) = self.candidates.pop_first()?; + let peer = candidate.peer; + + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetProvidersContext`: current candidate", + ); + self.pending.insert(candidate.peer, candidate); + + Some(QueryAction::SendMessage { + query: self.config.query, + peer, + message: self.kad_message.clone(), + }) + } + + /// Check if the query cannot make any progress. + /// + /// Returns true when there are no pending responses and no candidates to query. + fn is_done(&self) -> bool { + self.pending.is_empty() && self.candidates.is_empty() + } + + /// Get next action for a `GET_PROVIDERS` query. + pub fn next_action(&mut self) -> Option { + if self.is_done() { + // If we cannot make progress, return the final result. + // A query failed when we are not able to find any providers. + if self.found_providers.is_empty() { + Some(QueryAction::QueryFailed { + query: self.config.query, + }) + } else { + Some(QueryAction::QuerySucceeded { + query: self.config.query, + }) + } + } else if self.pending.len() == self.config.parallelism_factor { + // At this point, we either have pending responses or candidates to query; and we need + // more records. Ensure we do not exceed the parallelism factor. + None + } else { + self.schedule_next_peer() + } + } +} diff --git a/src/protocol/libp2p/kademlia/query/get_record.rs b/src/protocol/libp2p/kademlia/query/get_record.rs index 722ee101..12ea8293 100644 --- a/src/protocol/libp2p/kademlia/query/get_record.rs +++ b/src/protocol/libp2p/kademlia/query/get_record.rs @@ -135,7 +135,12 @@ impl GetRecordContext { /// Register response failure for `peer`. pub fn register_response_failure(&mut self, peer: PeerId) { let Some(peer) = self.pending.remove(&peer) else { - tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist"); + tracing::debug!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetRecordContext`: pending peer doesn't exist", + ); return; }; @@ -149,10 +154,20 @@ impl GetRecordContext { record: Option, peers: Vec, ) { - tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer"); + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetRecordContext`: received response from peer", + ); let Some(peer) = self.pending.remove(&peer) else { - tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it"); + tracing::debug!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetRecordContext`: received response from peer but didn't expect it", + ); return; }; @@ -206,12 +221,21 @@ impl GetRecordContext { /// Schedule next peer for outbound `GET_VALUE` query. fn schedule_next_peer(&mut self) -> Option { - tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer"); + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + "`GetRecordContext`: get next peer", + ); let (_, candidate) = self.candidates.pop_first()?; let peer = candidate.peer; - tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate"); + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + "`GetRecordContext`: current candidate", + ); self.pending.insert(candidate.peer, candidate); Some(QueryAction::SendMessage { From e1674196125c89b9514252b0e26be491293a727b Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 2 Sep 2024 17:02:30 +0000 Subject: [PATCH 16/26] Implement `GET_PROVIDERS` query --- src/protocol/libp2p/kademlia/message.rs | 4 -- src/protocol/libp2p/kademlia/mod.rs | 34 +++++---- .../libp2p/kademlia/query/get_providers.rs | 8 +-- src/protocol/libp2p/kademlia/query/mod.rs | 69 +++++++++++++++++++ 4 files changed, 95 insertions(+), 20 deletions(-) diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index edf5b473..f8f4965f 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -209,12 +209,9 @@ impl KademliaMessage { /// Create `GET_PROVIDERS` response. pub fn get_providers_response( - key: RecordKey, providers: Vec, closer_peers: &[KademliaPeer], ) -> Vec { - debug_assert!(providers.iter().all(|p| p.key == key)); - let provider_peers = providers .into_iter() .map(|p| { @@ -228,7 +225,6 @@ impl KademliaMessage { .collect(); let message = schema::kademlia::Message { - key: key.to_vec(), cluster_level_raw: 10, r#type: schema::kademlia::MessageType::GetProviders.into(), closer_peers: closer_peers.iter().map(Into::into).collect(), diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index cc7d87e1..fd251efd 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -588,11 +588,8 @@ impl Kademlia { .routing_table .closest(Key::from(key.to_vec()), self.replication_factor); - let message = KademliaMessage::get_providers_response( - key.clone(), - providers, - &closer_peers, - ); + let message = + KademliaMessage::get_providers_response(providers, &closer_peers); self.executor.send_message(peer, message.into(), substream); } (None, None) => tracing::debug!( @@ -835,6 +832,19 @@ impl Kademlia { .await; Ok(()) } + QueryAction::GetProvidersQueryDone { + query_id, + providers, + } => { + let _ = self + .event_tx + .send(KademliaEvent::GetProvidersSuccess { + query_id, + providers, + }) + .await; + Ok(()) + } QueryAction::QueryFailed { query } => { tracing::debug!(target: LOG_TARGET, ?query, "query failed"); @@ -1108,13 +1118,13 @@ impl Kademlia { tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT"); - // self.engine.start_get_providers( - // query_id, - // key.clone(), - // self.routing_table - // .closest(Key::new(key), self.replication_factor) - // .into(), - // ); + self.engine.start_get_providers( + query_id, + key.clone(), + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); } Some(KademliaCommand::AddKnownPeer { peer, addresses }) => { tracing::trace!( diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs index 51d1aaa6..7e76c11f 100644 --- a/src/protocol/libp2p/kademlia/query/get_providers.rs +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -24,7 +24,7 @@ use crate::{ protocol::libp2p::kademlia::{ message::KademliaMessage, query::{QueryAction, QueryId}, - record::{Key as RecordKey, ProviderRecord}, + record::Key as RecordKey, types::{Distance, KademliaPeer, Key}, }, PeerId, @@ -73,7 +73,7 @@ pub struct GetProvidersContext { /// Found providers. // TODO: deduplicate. - pub found_providers: Vec, + pub found_providers: Vec, } impl GetProvidersContext { @@ -100,7 +100,7 @@ impl GetProvidersContext { } /// Get the found providers. - pub fn found_providers(self) -> Vec { + pub fn found_providers(self) -> Vec { self.found_providers } @@ -123,7 +123,7 @@ impl GetProvidersContext { pub fn register_response( &mut self, peer: PeerId, - providers: impl IntoIterator, + providers: impl IntoIterator, closer_peers: impl IntoIterator, ) { tracing::trace!( diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 34f6e84e..8a247e45 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -23,6 +23,7 @@ use crate::{ message::KademliaMessage, query::{ find_node::{FindNodeConfig, FindNodeContext}, + get_providers::{GetProvidersConfig, GetProvidersContext}, get_record::{GetRecordConfig, GetRecordContext}, }, record::{Key as RecordKey, ProviderRecord, Record}, @@ -40,6 +41,7 @@ use self::find_many_nodes::FindManyNodesContext; mod find_many_nodes; mod find_node; +mod get_providers; mod get_record; /// Logging target for the file. @@ -90,6 +92,12 @@ enum QueryType { /// Context for the `FIND_NODE` query. context: FindNodeContext, }, + + /// `GET_PROVIDERS` query. + GetProviders { + /// Context for the `GET_PROVIDERS` query. + context: GetProvidersContext, + }, } /// Query action. @@ -147,6 +155,15 @@ pub enum QueryAction { records: Vec, }, + /// `GET_PROVIDERS` query succeeded. + GetProvidersQueryDone { + /// Query ID. + query_id: QueryId, + + /// Found providers. + providers: Vec, + }, + /// Query succeeded. QuerySucceeded { /// ID of the query that succeeded. @@ -358,6 +375,39 @@ impl QueryEngine { query_id } + /// Start `GET_PROVIDERS` query. + pub fn start_get_providers( + &mut self, + query_id: QueryId, + key: RecordKey, + candidates: VecDeque, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + ?key, + num_peers = ?candidates.len(), + "start `GET_PROVIDERS` query", + ); + + let target = Key::new(key); + let config = GetProvidersConfig { + local_peer_id: self.local_peer_id, + parallelism_factor: self.parallelism_factor, + query: query_id, + target, + }; + + self.queries.insert( + query_id, + QueryType::GetProviders { + context: GetProvidersContext::new(config, candidates), + }, + ); + + query_id + } + /// Register response failure from a queried peer. pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure"); @@ -381,6 +431,9 @@ impl QueryEngine { Some(QueryType::AddProvider { context, .. }) => { context.register_response_failure(peer); } + Some(QueryType::GetProviders { context }) => { + context.register_response_failure(peer); + } } } @@ -422,6 +475,16 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::GetProviders { context }) => match message { + KademliaMessage::GetProviders { + key: _, + providers, + peers, + } => { + context.register_response(peer, providers, peers); + } + _ => unreachable!(), + }, } } @@ -439,6 +502,7 @@ impl QueryEngine { Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer), + Some(QueryType::GetProviders { context }) => context.next_peer_action(peer), } } @@ -467,6 +531,10 @@ impl QueryEngine { provider, peers: context.responses.into_values().collect::>(), }, + QueryType::GetProviders { context } => QueryAction::GetProvidersQueryDone { + query_id: context.config.query, + providers: context.found_providers(), + }, } } @@ -487,6 +555,7 @@ impl QueryEngine { QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), QueryType::AddProvider { context, .. } => context.next_action(), + QueryType::GetProviders { context } => context.next_action(), }; match action { From eb6eee8b2c366bbc0d54bc2f3a4a9a5ea6f188da Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 3 Sep 2024 13:44:23 +0000 Subject: [PATCH 17/26] Merge returned provider addresses and deduplicate records --- src/protocol/libp2p/kademlia/handle.rs | 5 ++- .../libp2p/kademlia/query/get_providers.rs | 31 +++++++++++++++++-- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 2b31db6d..918f31ec 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -203,9 +203,8 @@ pub enum KademliaEvent { /// Query ID. query_id: QueryId, - /// Found providers with cached addresses. - // TODO: return only `max_providers_per_key` providers from the peers closest to the - // provided key. + /// Found providers with cached addresses. Returned providers are sorted by distane to the + /// provided key. providers: Vec, }, diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs index 7e76c11f..4d02b4ec 100644 --- a/src/protocol/libp2p/kademlia/query/get_providers.rs +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -25,8 +25,9 @@ use crate::{ message::KademliaMessage, query::{QueryAction, QueryId}, record::Key as RecordKey, - types::{Distance, KademliaPeer, Key}, + types::{ConnectionType, Distance, KademliaPeer, Key}, }, + types::multiaddr::Multiaddr, PeerId, }; @@ -72,7 +73,6 @@ pub struct GetProvidersContext { pub candidates: BTreeMap, /// Found providers. - // TODO: deduplicate. pub found_providers: Vec, } @@ -101,7 +101,32 @@ impl GetProvidersContext { /// Get the found providers. pub fn found_providers(self) -> Vec { - self.found_providers + // Merge addresses of different provider records of the same peer. + let mut providers = HashMap::>::new(); + self.found_providers.into_iter().for_each(|provider| { + providers + .entry(provider.peer) + .or_default() + .extend(provider.addresses.into_iter()) + }); + + // Convert into `Vec` + let mut providers = providers + .into_iter() + .map(|(peer, addresses)| KademliaPeer { + key: Key::from(peer.clone()), + peer, + addresses: addresses.into_iter().collect(), + connection: ConnectionType::CanConnect, + }) + .collect::>(); + + // Sort by the provider distance to the target key. + providers.sort_unstable_by(|p1, p2| { + p1.key.distance(&self.config.target).cmp(&p2.key.distance(&self.config.target)) + }); + + providers } /// Register response failure for `peer`. From 974eac524f595a34fde50e6a5cc6fc3924f1be87 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 3 Sep 2024 15:11:45 +0000 Subject: [PATCH 18/26] minor: fix log target --- src/protocol/libp2p/kademlia/query/get_providers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs index 4d02b4ec..523cab13 100644 --- a/src/protocol/libp2p/kademlia/query/get_providers.rs +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -34,7 +34,7 @@ use crate::{ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; /// Logging target for the file. -const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::get_record"; +const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::get_providers"; /// The configuration needed to instantiate a new [`GetProvidersContext`]. #[derive(Debug)] From e895a44a2fa2aec28d8f3b0adb7baa15bc7fe20a Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 4 Sep 2024 07:51:11 +0000 Subject: [PATCH 19/26] Revert "Remove `try_get_record()` and other `try_...()` non-async methods" This reverts commit feb493df9c08176a5a544f835ee1436f81c78296. --- src/protocol/libp2p/kademlia/handle.rs | 64 ++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 7168b127..bf1581f9 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -339,6 +339,70 @@ impl KademliaHandle { pub async fn store_record(&mut self, record: Record) { let _ = self.cmd_tx.send(KademliaCommand::StoreRecord { record }).await; } + + /// Try to add known peer and if the channel is clogged, return an error. + pub fn try_add_known_peer(&self, peer: PeerId, addresses: Vec) -> Result<(), ()> { + self.cmd_tx + .try_send(KademliaCommand::AddKnownPeer { peer, addresses }) + .map_err(|_| ()) + } + + /// Try to initiate `FIND_NODE` query and if the channel is clogged, return an error. + pub fn try_find_node(&mut self, peer: PeerId) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::FindNode { peer, query_id }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error. + pub fn try_put_record(&mut self, record: Record) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::PutRecord { record, query_id }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged, + /// return an error. + pub fn try_put_record_to_peers( + &mut self, + record: Record, + peers: Vec, + update_local_store: bool, + ) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::PutRecordToPeers { + record, + query_id, + peers, + update_local_store, + }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error. + pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::GetRecord { + key, + quorum, + query_id, + }) + .map(|_| query_id) + .map_err(|_| ()) + } + + /// Try to store the record in the local store, and if the channel is clogged, return an error. + /// Used in combination with [`IncomingRecordValidationMode::Manual`]. + pub fn try_store_record(&mut self, record: Record) -> Result<(), ()> { + self.cmd_tx.try_send(KademliaCommand::StoreRecord { record }).map_err(|_| ()) + } } impl Stream for KademliaHandle { From ce18594d3c3466649ed6a3dacf32f1cfb5fb53c4 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 4 Sep 2024 08:01:24 +0000 Subject: [PATCH 20/26] Revert "Move query ID generation from `KademliaHandle` to `Kademlia`" This reverts commit 982c73dcf0b02ba03febe4e892d69411bf41a207. --- src/protocol/libp2p/kademlia/handle.rs | 111 ++++++++++++------------- src/protocol/libp2p/kademlia/mod.rs | 37 ++------- 2 files changed, 60 insertions(+), 88 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index bf1581f9..f1b4c218 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -25,10 +25,7 @@ use crate::{ use futures::Stream; use multiaddr::Multiaddr; -use tokio::sync::{ - mpsc::{Receiver, Sender}, - oneshot, -}; +use tokio::sync::mpsc::{Receiver, Sender}; use std::{ num::NonZeroUsize, @@ -91,8 +88,8 @@ pub(crate) enum KademliaCommand { /// Peer ID. peer: PeerId, - /// Query ID callback. - query_id_tx: oneshot::Sender, + /// Query ID for the query. + query_id: QueryId, }, /// Store record to DHT. @@ -100,8 +97,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID callback. - query_id_tx: oneshot::Sender, + /// Query ID for the query. + query_id: QueryId, }, /// Store record to DHT to the given peers. @@ -111,8 +108,8 @@ pub(crate) enum KademliaCommand { /// Record. record: Record, - /// Query ID callback. - query_id_tx: oneshot::Sender, + /// Query ID for the query. + query_id: QueryId, /// Use the following peers for the put request. peers: Vec, @@ -129,8 +126,8 @@ pub(crate) enum KademliaCommand { /// [`Quorum`] for the query. quorum: Quorum, - /// Query ID callback. - query_id_tx: oneshot::Sender, + /// Query ID for the query. + query_id: QueryId, }, /// Register as a content provider for `key`. @@ -142,7 +139,7 @@ pub(crate) enum KademliaCommand { public_addresses: Vec, /// Query ID for the query. - query_id_tx: oneshot::Sender, + query_id: QueryId, }, /// Store record locally. @@ -235,12 +232,27 @@ pub struct KademliaHandle { /// RX channel for receiving events from `Kademlia`. event_rx: Receiver, + + /// Next query ID. + next_query_id: usize, } impl KademliaHandle { /// Create new [`KademliaHandle`]. pub(super) fn new(cmd_tx: Sender, event_rx: Receiver) -> Self { - Self { cmd_tx, event_rx } + Self { + cmd_tx, + event_rx, + next_query_id: 0usize, + } + } + + /// Allocate next query ID. + fn next_query_id(&mut self) -> QueryId { + let query_id = self.next_query_id; + self.next_query_id += 1; + + QueryId(query_id) } /// Add known peer. @@ -249,32 +261,19 @@ impl KademliaHandle { } /// Send `FIND_NODE` query to known peers. - /// - /// Returns [`Err`] only if [`super::Kademlia`] is terminating. - pub async fn find_node(&mut self, peer: PeerId) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx - .send(KademliaCommand::FindNode { peer, query_id_tx }) - .await - .map_err(|_| ())?; + pub async fn find_node(&mut self, peer: PeerId) -> QueryId { + let query_id = self.next_query_id(); + let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Store record to DHT. - /// - /// Returns [`Err`] only if [`super::Kademlia`] is terminating. - pub async fn put_record(&mut self, record: Record) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx - .send(KademliaCommand::PutRecord { - record, - query_id_tx, - }) - .await - .map_err(|_| ())?; + pub async fn put_record(&mut self, record: Record) -> QueryId { + let query_id = self.next_query_id(); + let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Store record to DHT to the given peers. @@ -283,34 +282,34 @@ impl KademliaHandle { record: Record, peers: Vec, update_local_store: bool, - ) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx + ) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx .send(KademliaCommand::PutRecordToPeers { record, - query_id_tx, + query_id, peers, update_local_store, }) - .await - .map_err(|_| ())?; + .await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Get record from DHT. - pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx + pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx .send(KademliaCommand::GetRecord { key, quorum, - query_id_tx, + query_id, }) - .await - .map_err(|_| ())?; + .await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Register as a content provider on the DHT. @@ -320,18 +319,18 @@ impl KademliaHandle { &mut self, key: RecordKey, public_addresses: Vec, - ) -> Result { - let (query_id_tx, query_id_rx) = oneshot::channel(); - self.cmd_tx + ) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx .send(KademliaCommand::StartProviding { key, public_addresses, - query_id_tx, + query_id, }) - .await - .map_err(|_| ())?; + .await; - query_id_rx.await.map_err(|_| ()) + query_id } /// Store the record in the local store. Used in combination with diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 973527ad..01a66d7f 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -166,9 +166,6 @@ pub(crate) struct Kademlia { /// Query executor. executor: QueryExecutor, - - /// Next query ID. - next_query_id: usize, } impl Kademlia { @@ -210,7 +207,6 @@ impl Kademlia { provider_ttl: config.provider_ttl, replication_factor: config.replication_factor, engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR), - next_query_id: 0usize, } } @@ -945,10 +941,7 @@ impl Kademlia { }, command = self.cmd_rx.recv() => { match command { - Some(KademliaCommand::FindNode { peer, query_id_tx }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - + Some(KademliaCommand::FindNode { peer, query_id }) => { tracing::debug!( target: LOG_TARGET, ?peer, @@ -964,10 +957,7 @@ impl Kademlia { .into() ); } - Some(KademliaCommand::PutRecord { mut record, query_id_tx }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - + Some(KademliaCommand::PutRecord { mut record, query_id }) => { tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -995,13 +985,10 @@ impl Kademlia { } Some(KademliaCommand::PutRecordToPeers { mut record, - query_id_tx, + query_id, peers, update_local_store, }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1041,11 +1028,8 @@ impl Kademlia { Some(KademliaCommand::StartProviding { key, public_addresses, - query_id_tx + query_id }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1071,10 +1055,7 @@ impl Kademlia { .into(), ); } - Some(KademliaCommand::GetRecord { key, quorum, query_id_tx }) => { - let query_id = self.next_query_id(); - let _ = query_id_tx.send(query_id); - + Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); match (self.store.get(&key), quorum) { @@ -1157,14 +1138,6 @@ impl Kademlia { } } } - - /// Allocate next query ID. - fn next_query_id(&mut self) -> QueryId { - let query_id = self.next_query_id; - self.next_query_id += 1; - - QueryId(query_id) - } } #[cfg(test)] From 0fcd621b3eb1baba90d8e169dd0efd3384df32a0 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 4 Sep 2024 08:16:59 +0000 Subject: [PATCH 21/26] Use `AtomicUsize` to generate `QueryId` in both `KademliaHandle` and `Kademlia` --- src/protocol/libp2p/kademlia/config.rs | 13 +++++++++++-- src/protocol/libp2p/kademlia/handle.rs | 17 ++++++++++++----- src/protocol/libp2p/kademlia/mod.rs | 17 +++++++++++++++++ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/protocol/libp2p/kademlia/config.rs b/src/protocol/libp2p/kademlia/config.rs index c9e79050..f4fb5f29 100644 --- a/src/protocol/libp2p/kademlia/config.rs +++ b/src/protocol/libp2p/kademlia/config.rs @@ -31,7 +31,11 @@ use crate::{ use multiaddr::Multiaddr; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::HashMap, + sync::{atomic::AtomicUsize, Arc}, + time::Duration, +}; /// Default TTL for the records. const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60); @@ -85,6 +89,9 @@ pub struct Config { /// RX channel for receiving commands from `KademliaHandle`. pub(super) cmd_rx: Receiver, + + /// Next query ID counter shared with the handle. + pub(super) next_query_id: Arc, } impl Config { @@ -100,6 +107,7 @@ impl Config { ) -> (Self, KademliaHandle) { let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE); let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE); + let next_query_id = Arc::new(AtomicUsize::new(0usize)); // if no protocol names were provided, use the default protocol if protocol_names.is_empty() { @@ -119,8 +127,9 @@ impl Config { known_peers, cmd_rx, event_tx, + next_query_id: next_query_id.clone(), }, - KademliaHandle::new(cmd_tx, event_rx), + KademliaHandle::new(cmd_tx, event_rx, next_query_id), ) } diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index f1b4c218..bf6ee78c 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -30,6 +30,10 @@ use tokio::sync::mpsc::{Receiver, Sender}; use std::{ num::NonZeroUsize, pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, task::{Context, Poll}, }; @@ -234,23 +238,26 @@ pub struct KademliaHandle { event_rx: Receiver, /// Next query ID. - next_query_id: usize, + next_query_id: Arc, } impl KademliaHandle { /// Create new [`KademliaHandle`]. - pub(super) fn new(cmd_tx: Sender, event_rx: Receiver) -> Self { + pub(super) fn new( + cmd_tx: Sender, + event_rx: Receiver, + next_query_id: Arc, + ) -> Self { Self { cmd_tx, event_rx, - next_query_id: 0usize, + next_query_id, } } /// Allocate next query ID. fn next_query_id(&mut self) -> QueryId { - let query_id = self.next_query_id; - self.next_query_id += 1; + let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed); QueryId(query_id) } diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 01a66d7f..c25bff5b 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -48,6 +48,10 @@ use tokio::sync::mpsc::{Receiver, Sender}; use std::{ collections::{hash_map::Entry, HashMap}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::{Duration, Instant}, }; @@ -134,6 +138,9 @@ pub(crate) struct Kademlia { /// RX channel for receiving commands from `KademliaHandle`. cmd_rx: Receiver, + /// Next query ID. + next_query_id: Arc, + /// Routing table. routing_table: RoutingTable, @@ -195,6 +202,7 @@ impl Kademlia { routing_table, peers: HashMap::new(), cmd_rx: config.cmd_rx, + next_query_id: config.next_query_id, store, event_tx: config.event_tx, local_key, @@ -210,6 +218,13 @@ impl Kademlia { } } + /// Allocate next query ID. + fn next_query_id(&mut self) -> QueryId { + let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed); + + QueryId(query_id) + } + /// Connection established to remote peer. fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> { tracing::trace!(target: LOG_TARGET, ?peer, "connection established"); @@ -1183,6 +1198,7 @@ mod tests { ); let (event_tx, event_rx) = channel(64); let (_cmd_tx, cmd_rx) = channel(64); + let next_query_id = Arc::new(AtomicUsize::new(0usize)); let config = Config { protocol_names: vec![ProtocolName::from("/kad/1")], @@ -1196,6 +1212,7 @@ mod tests { provider_refresh_interval: Duration::from_secs(22 * 60 * 60), event_tx, cmd_rx, + next_query_id, }; ( From 57f17ce3890d4bda0484fc359efb884d05beb74d Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 5 Sep 2024 13:49:11 +0000 Subject: [PATCH 22/26] Use `open_substream_or_dial()` to add provider records to peers --- src/protocol/libp2p/kademlia/mod.rs | 41 +++++++++-------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 25976bdd..3d00c7f8 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -776,37 +776,20 @@ impl Kademlia { let provided_key = provider.key.clone(); let message = KademliaMessage::add_provider(provider); - let peer_action = PeerAction::SendAddProvider(message); for peer in peers { - match self.service.open_substream(peer.peer) { - Ok(substream_id) => { - self.pending_substreams.insert(substream_id, peer.peer); - self.peers - .entry(peer.peer) - .or_default() - .pending_actions - .insert(substream_id, peer_action.clone()); - } - Err(_) => match self.service.dial(&peer.peer) { - Ok(_) => match self.pending_dials.entry(peer.peer) { - Entry::Occupied(entry) => { - entry.into_mut().push(peer_action.clone()); - } - Entry::Vacant(entry) => { - entry.insert(vec![peer_action.clone()]); - } - }, - Err(error) => { - tracing::debug!( - target: LOG_TARGET, - ?peer, - ?provided_key, - ?error, - "failed to dial peer", - ) - } - }, + if let Err(error) = self.open_substream_or_dial( + peer.peer, + PeerAction::SendAddProvider(message.clone()), + None, + ) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?provided_key, + ?error, + "failed to add provider record to peer", + ) } } From 48619591293e02faca116221ee3732234dd87aaa Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 9 Sep 2024 16:51:03 +0000 Subject: [PATCH 23/26] Fix refresh when we are the only provider --- src/protocol/libp2p/kademlia/store.rs | 41 ++++++++++++++++++--------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index e5410f94..74bf7b1d 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -171,6 +171,17 @@ impl MemoryStore { /// /// Returns `true` if the provider was added, `false` otherwise. pub fn put_provider(&mut self, provider_record: ProviderRecord) -> bool { + // Helper to schedule local provider refresh. + let mut schedule_local_provider_refresh = |provider_record: ProviderRecord| { + let key = provider_record.key.clone(); + let refresh_interval = self.config.provider_refresh_interval; + self.local_providers.insert(key.clone(), provider_record); + self.pending_provider_refresh.push(Box::pin(async move { + tokio::time::sleep(refresh_interval).await; + key + })); + }; + // Make sure we have no more than `max_provider_addresses`. let provider_record = { let mut record = provider_record; @@ -183,6 +194,10 @@ impl MemoryStore { match self.provider_keys.entry(provider_record.key.clone()) { Entry::Vacant(entry) => if can_insert_new_key { + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } + entry.insert(vec![provider_record]); true @@ -206,8 +221,13 @@ impl MemoryStore { match provider_position { Ok(i) => { + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } // Update the provider in place. providers[i] = provider_record.clone(); + + true } Err(i) => { // `Err(i)` contains the insertion point. @@ -221,29 +241,22 @@ impl MemoryStore { existing `max_providers_per_key`", ); - return false; + false } else { if providers.len() == usize::from(self.config.max_providers_per_key) { providers.pop(); } + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } + providers.insert(i, provider_record.clone()); + + true } } } - - if provider_record.provider == self.local_peer_id { - // We must make sure to refresh the local provider. - let key = provider_record.key.clone(); - let refresh_interval = self.config.provider_refresh_interval; - self.local_providers.insert(key.clone(), provider_record); - self.pending_provider_refresh.push(Box::pin(async move { - tokio::time::sleep(refresh_interval).await; - key - })); - } - - true } } } From 03846c7619a0017a4ca24cd047177580d8686e7c Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 12 Sep 2024 08:32:53 +0000 Subject: [PATCH 24/26] Address review suggestions --- src/protocol/libp2p/kademlia/query/get_providers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs index 523cab13..d679265c 100644 --- a/src/protocol/libp2p/kademlia/query/get_providers.rs +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -117,7 +117,7 @@ impl GetProvidersContext { key: Key::from(peer.clone()), peer, addresses: addresses.into_iter().collect(), - connection: ConnectionType::CanConnect, + connection: ConnectionType::NotConnected, }) .collect::>(); From ffef26590c452c751ddc1963eaf1fdd4d31e4872 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 30 Sep 2024 13:25:33 +0000 Subject: [PATCH 25/26] minor: fmt --- src/protocol/libp2p/kademlia/handle.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 866aeaae..95dc95e8 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -369,9 +369,7 @@ impl KademliaHandle { /// Returns [`Err`] only if [`super::Kademlia`] is terminating. pub async fn get_providers(&mut self, key: RecordKey) -> QueryId { let query_id = self.next_query_id(); - let _ = self.cmd_tx - .send(KademliaCommand::GetProviders { key, query_id }) - .await; + let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await; query_id } From c17a251ee0c03b634f467a9f4886c39abb6caa52 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 30 Sep 2024 13:32:55 +0000 Subject: [PATCH 26/26] minor: fix docs --- src/protocol/libp2p/kademlia/handle.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 95dc95e8..656c403d 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -304,7 +304,7 @@ impl KademliaHandle { /// Store record to DHT to the given peers. /// - /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + /// Returns [`Err`] only if `Kademlia` is terminating. pub async fn put_record_to_peers( &mut self, record: Record, @@ -327,7 +327,7 @@ impl KademliaHandle { /// Get record from DHT. /// - /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + /// Returns [`Err`] only if `Kademlia` is terminating. pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { let query_id = self.next_query_id(); let _ = self @@ -345,7 +345,7 @@ impl KademliaHandle { /// Register as a content provider on the DHT. /// /// Register the local peer ID & its `public_addresses` as a provider for a given `key`. - /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + /// Returns [`Err`] only if `Kademlia` is terminating. pub async fn start_providing( &mut self, key: RecordKey, @@ -366,7 +366,7 @@ impl KademliaHandle { /// Get providers from DHT. /// - /// Returns [`Err`] only if [`super::Kademlia`] is terminating. + /// Returns [`Err`] only if `Kademlia` is terminating. pub async fn get_providers(&mut self, key: RecordKey) -> QueryId { let query_id = self.next_query_id(); let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await;