Skip to content

Commit

Permalink
kad: Providers part 7: better types and public API, public addresses …
Browse files Browse the repository at this point in the history
…& known providers (#246)

This PR introduces the following changes:
1. Introduces a better type `ContentProvider` without a burden of
`KademliaPeer`.
2. Simplifies public litep2p content providers API.
3. Uses `PublicAddresses` API when advertising local providers.
4. Adds locally known providers to discovered providers when performing
`GET_PROVIDERS` request.
5. Emits `IncomingProvider` event when remote node registers as a
provider via `ADD_PROVIDER` request.
  • Loading branch information
dmitry-markin authored Sep 30, 2024
1 parent e9f4f97 commit 1a0325c
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 224 deletions.
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::{
const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60);

/// Default provider record TTL.
const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60);
pub(super) const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60);

/// Default provider republish interval.
pub(super) const DEFAULT_PROVIDER_REFRESH_INTERVAL: Duration = Duration::from_secs(22 * 60 * 60);
Expand Down
34 changes: 16 additions & 18 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
protocol::libp2p::kademlia::{KademliaPeer, PeerRecord, QueryId, Record, RecordKey},
protocol::libp2p::kademlia::{ContentProvider, PeerRecord, QueryId, Record, RecordKey},
PeerId,
};

Expand Down Expand Up @@ -148,9 +148,6 @@ pub(crate) enum KademliaCommand {
/// Provided key.
key: RecordKey,

/// Our external addresses to publish.
public_addresses: Vec<Multiaddr>,

/// Query ID for the query.
query_id: QueryId,
},
Expand Down Expand Up @@ -210,9 +207,12 @@ pub enum KademliaEvent {
/// Query ID.
query_id: QueryId,

/// Provided key.
provided_key: RecordKey,

/// Found providers with cached addresses. Returned providers are sorted by distane to the
/// provided key.
providers: Vec<KademliaPeer>,
providers: Vec<ContentProvider>,
},

/// `PUT_VALUE` query succeeded.
Expand Down Expand Up @@ -240,6 +240,15 @@ pub enum KademliaEvent {
/// Record.
record: Record,
},

/// Incoming `ADD_PROVIDER` request received.
IncomingProvider {
/// Provided key.
provided_key: RecordKey,

/// Provider.
provider: ContentProvider,
},
}

/// The type of the DHT records.
Expand Down Expand Up @@ -352,20 +361,9 @@ impl KademliaHandle {
///
/// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn start_providing(
&mut self,
key: RecordKey,
public_addresses: Vec<Multiaddr>,
) -> QueryId {
pub async fn start_providing(&mut self, key: RecordKey) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
.send(KademliaCommand::StartProviding {
key,
public_addresses,
query_id,
})
.await;
let _ = self.cmd_tx.send(KademliaCommand::StartProviding { key, query_id }).await;

query_id
}
Expand Down
15 changes: 8 additions & 7 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
protocol::libp2p::kademlia::{
record::{Key as RecordKey, ProviderRecord, Record},
record::{ContentProvider, Key as RecordKey, Record},
schema,
types::{ConnectionType, KademliaPeer},
},
Expand Down Expand Up @@ -172,14 +172,14 @@ impl KademliaMessage {
}

/// Create `ADD_PROVIDER` message with `provider`.
pub fn add_provider(provider: ProviderRecord) -> Bytes {
pub fn add_provider(provided_key: RecordKey, provider: ContentProvider) -> Bytes {
let peer = KademliaPeer::new(
provider.provider,
provider.peer,
provider.addresses,
ConnectionType::CanConnect, // ignored by message recipient
);
let message = schema::kademlia::Message {
key: provider.key.clone().to_vec(),
key: provided_key.clone().to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::AddProvider.into(),
provider_peers: std::iter::once((&peer).into()).collect(),
Expand Down Expand Up @@ -209,16 +209,17 @@ impl KademliaMessage {

/// Create `GET_PROVIDERS` response.
pub fn get_providers_response(
providers: Vec<ProviderRecord>,
providers: Vec<ContentProvider>,
closer_peers: &[KademliaPeer],
) -> Vec<u8> {
let provider_peers = providers
.into_iter()
.map(|p| {
KademliaPeer::new(
p.provider,
p.peer,
p.addresses,
ConnectionType::CanConnect, // ignored by recipient
// `ConnectionType` is ignored by a recipient
ConnectionType::NotConnected,
)
})
.map(|p| (&p).into())
Expand Down
92 changes: 54 additions & 38 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{
handle::KademliaCommand,
message::KademliaMessage,
query::{QueryAction, QueryEngine},
record::ProviderRecord,
routing_table::RoutingTable,
store::{MemoryStore, MemoryStoreAction, MemoryStoreConfig},
types::{ConnectionType, KademliaPeer, Key},
Expand Down Expand Up @@ -61,7 +60,7 @@ pub use handle::{
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode,
};
pub use query::QueryId;
pub use record::{Key as RecordKey, PeerRecord, Record};
pub use record::{ContentProvider, Key as RecordKey, PeerRecord, Record};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia";
Expand Down Expand Up @@ -165,9 +164,6 @@ pub(crate) struct Kademlia {
/// Default record TTL.
record_ttl: Duration,

/// Provider record TTL.
provider_ttl: Duration,

/// Query engine.
engine: QueryEngine,

Expand All @@ -193,6 +189,7 @@ impl Kademlia {
local_peer_id,
MemoryStoreConfig {
provider_refresh_interval: config.provider_refresh_interval,
provider_ttl: config.provider_ttl,
..Default::default()
},
);
Expand All @@ -212,7 +209,6 @@ impl Kademlia {
update_mode: config.update_mode,
validation_mode: config.validation_mode,
record_ttl: config.record_ttl,
provider_ttl: config.provider_ttl,
replication_factor: config.replication_factor,
engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
}
Expand Down Expand Up @@ -523,7 +519,7 @@ impl Kademlia {
),
}
}
KademliaMessage::AddProvider { key, providers } => {
KademliaMessage::AddProvider { key, mut providers } => {
tracing::trace!(
target: LOG_TARGET,
?peer,
Expand All @@ -532,15 +528,27 @@ impl Kademlia {
"handle `ADD_PROVIDER` message",
);

match (providers.len(), providers.first()) {
match (providers.len(), providers.pop()) {
(1, Some(provider)) =>
if provider.peer == peer {
self.store.put_provider(ProviderRecord {
key,
provider: peer,
addresses: provider.addresses.clone(),
expires: Instant::now() + self.provider_ttl,
});
self.store.put_provider(
key.clone(),
ContentProvider {
peer,
addresses: provider.addresses.clone(),
},
);

let _ = self
.event_tx
.send(KademliaEvent::IncomingProvider {
provided_key: key,
provider: ContentProvider {
peer: provider.peer,
addresses: provider.addresses,
},
})
.await;
} else {
tracing::trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -590,10 +598,13 @@ impl Kademlia {
"handle `GET_PROVIDERS` request",
);

let providers = self.store.get_providers(key);
// TODO: if local peer is among the providers, update its `ProviderRecord`
// to have up-to-date addresses.
// Requires https://github.com/paritytech/litep2p/issues/211.
let mut providers = self.store.get_providers(key);

// Make sure local provider addresses are up to date.
let local_peer_id = self.local_key.clone().into_preimage();
providers.iter_mut().find(|p| p.peer == local_peer_id).as_mut().map(|p| {
p.addresses = self.service.public_addresses().get_addresses();
});

let closer_peers = self
.routing_table
Expand Down Expand Up @@ -787,16 +798,19 @@ impl Kademlia {

Ok(())
}
QueryAction::AddProviderToFoundNodes { provider, peers } => {
QueryAction::AddProviderToFoundNodes {
provided_key,
provider,
peers,
} => {
tracing::trace!(
target: LOG_TARGET,
provided_key = ?provider.key,
?provided_key,
num_peers = ?peers.len(),
"add provider record to found peers",
);

let provided_key = provider.key.clone();
let message = KademliaMessage::add_provider(provider);
let message = KademliaMessage::add_provider(provided_key.clone(), provider);

for peer in peers {
if let Err(error) = self.open_substream_or_dial(
Expand Down Expand Up @@ -828,12 +842,14 @@ impl Kademlia {
}
QueryAction::GetProvidersQueryDone {
query_id,
provided_key,
providers,
} => {
let _ = self
.event_tx
.send(KademliaEvent::GetProvidersSuccess {
query_id,
provided_key,
providers,
})
.await;
Expand Down Expand Up @@ -1036,28 +1052,26 @@ impl Kademlia {
}
Some(KademliaCommand::StartProviding {
key,
public_addresses,
query_id
}) => {
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
?key,
?public_addresses,
"register as a content provider",
);

let provider = ProviderRecord {
key: key.clone(),
provider: self.service.local_peer_id(),
addresses: public_addresses,
expires: Instant::now() + self.provider_ttl,
let addresses = self.service.public_addresses().get_addresses();
let provider = ContentProvider {
peer: self.service.local_peer_id(),
addresses,
};

self.store.put_provider(provider.clone());
self.store.put_provider(key.clone(), provider.clone());

self.engine.start_add_provider(
query_id,
key.clone(),
provider,
self.routing_table
.closest(Key::new(key), self.replication_factor)
Expand Down Expand Up @@ -1105,12 +1119,15 @@ impl Kademlia {
Some(KademliaCommand::GetProviders { key, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT");

let known_providers = self.store.get_providers(&key);

self.engine.start_get_providers(
query_id,
key.clone(),
self.routing_table
.closest(Key::new(key), self.replication_factor)
.into(),
known_providers,
);
}
Some(KademliaCommand::AddKnownPeer { peer, addresses }) => {
Expand Down Expand Up @@ -1151,25 +1168,24 @@ impl Kademlia {
}
},
action = self.store.next_action() => match action {
Some(MemoryStoreAction::RefreshProvider { mut provider }) => {
Some(MemoryStoreAction::RefreshProvider { provided_key, provider }) => {
tracing::trace!(
target: LOG_TARGET,
key = ?provider.key,
?provided_key,
"republishing local provider",
);

// Make sure to roll expiration time.
provider.expires = Instant::now() + self.provider_ttl;

self.store.put_provider(provider.clone());
self.store.put_provider(provided_key.clone(), provider.clone());
// We never update local provider addresses in the store when refresh
// it, as this is done anyway when replying to `GET_PROVIDERS` request.

let key = provider.key.clone();
let query_id = self.next_query_id();
self.engine.start_add_provider(
query_id,
provided_key.clone(),
provider,
self.routing_table
.closest(Key::new(key), self.replication_factor)
.closest(Key::new(provided_key), self.replication_factor)
.into(),
);
}
Expand Down
Loading

0 comments on commit 1a0325c

Please sign in to comment.