Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Providers part 7: better types and public API, public addresses & known providers #246

Merged
merged 43 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1048320
Introduce `ADD_PROVIDER` query
dmitry-markin Aug 22, 2024
2ab84ae
Merge remote-tracking branch 'origin/master' into dm-provider-queries
dmitry-markin Aug 23, 2024
778078b
Execute `ADD_PROVIDER` query
dmitry-markin Aug 23, 2024
323a26f
Introduce local providers in `MemoryStore`
dmitry-markin Aug 27, 2024
8946809
Add provider refresh interval to Kademlia config
dmitry-markin Aug 28, 2024
474a58a
Merge remote-tracking branch 'origin/master' into dm-republish-providers
dmitry-markin Aug 28, 2024
d02acee
Move `FuturesStream` to a separate file
dmitry-markin Aug 28, 2024
5432c9b
Refresh providers: dry-run without network queries
dmitry-markin Aug 28, 2024
feb493d
Remove `try_get_record()` and other `try_...()` non-async methods
dmitry-markin Aug 29, 2024
982c73d
Move query ID generation from `KademliaHandle` to `Kademlia`
dmitry-markin Aug 29, 2024
68c7a87
Republish providers
dmitry-markin Aug 29, 2024
f9ed3e7
Merge remote-tracking branch 'origin/master' into dm-republish-providers
dmitry-markin Aug 30, 2024
764a3f2
Merge remote-tracking branch 'origin/master' into dm-add-providers
dmitry-markin Aug 30, 2024
5e97484
Use getter `TransportService::local_peer_id()` instead of accessing d…
dmitry-markin Aug 30, 2024
7caf290
Use getter `TransportService::local_peer_id()` instead of accessing d…
dmitry-markin Aug 30, 2024
60b8fb6
Introduce `GET_PROVIDERS` query
dmitry-markin Aug 27, 2024
d340c35
Update `get_providers()` to receive query ID via oneshot channel
dmitry-markin Aug 30, 2024
10b96db
Make lines fit into 100 characters
dmitry-markin Aug 30, 2024
6146540
Introduce `GetProvidersContext` & `GetProvidersConfig`
dmitry-markin Sep 2, 2024
e167419
Implement `GET_PROVIDERS` query
dmitry-markin Sep 2, 2024
eb6eee8
Merge returned provider addresses and deduplicate records
dmitry-markin Sep 3, 2024
8314d95
Merge branch 'dm-add-providers' into dm-republish-providers
dmitry-markin Sep 3, 2024
974eac5
minor: fix log target
dmitry-markin Sep 3, 2024
e895a44
Revert "Remove `try_get_record()` and other `try_...()` non-async met…
dmitry-markin Sep 4, 2024
ce18594
Revert "Move query ID generation from `KademliaHandle` to `Kademlia`"
dmitry-markin Sep 4, 2024
0fcd621
Use `AtomicUsize` to generate `QueryId` in both `KademliaHandle` and …
dmitry-markin Sep 4, 2024
5f26161
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 4, 2024
57f17ce
Use `open_substream_or_dial()` to add provider records to peers
dmitry-markin Sep 5, 2024
8594103
Merge remote-tracking branch 'origin/master' into dm-add-providers
dmitry-markin Sep 5, 2024
4861959
Fix refresh when we are the only provider
dmitry-markin Sep 9, 2024
c85dd3b
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 10, 2024
03846c7
Address review suggestions
dmitry-markin Sep 12, 2024
7492895
Stop providing
dmitry-markin Sep 13, 2024
c90861f
Use `HashMap::entry()` API to remove local providers
dmitry-markin Sep 13, 2024
17fd943
Introduce `ContentProvider` type
dmitry-markin Sep 18, 2024
e853d3b
Merge branch 'dm-add-providers' into dm-republish-providers
dmitry-markin Sep 18, 2024
aeb59cf
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 18, 2024
674c54e
Merge branch 'dm-get-providers' into dm-stop-providing
dmitry-markin Sep 18, 2024
98523e7
Merge branch 'dm-stop-providing' into dm-improve-providers-api
dmitry-markin Sep 18, 2024
d32bd56
Use `PublicAddresses` API
dmitry-markin Sep 18, 2024
54e8bf5
Use locally known providers when performing `KademliaHandle::get_prov…
dmitry-markin Sep 18, 2024
6b9b24a
Emit `IncomingProvider` event
dmitry-markin Sep 18, 2024
3fe1b9a
Merge remote-tracking branch 'origin/master' into dm-improve-provider…
dmitry-markin Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::{
const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60);

/// Default provider record TTL.
const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60);
pub(super) const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60);

/// Default provider republish interval.
pub(super) const DEFAULT_PROVIDER_REFRESH_INTERVAL: Duration = Duration::from_secs(22 * 60 * 60);
Expand Down
34 changes: 16 additions & 18 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
protocol::libp2p::kademlia::{KademliaPeer, PeerRecord, QueryId, Record, RecordKey},
protocol::libp2p::kademlia::{ContentProvider, PeerRecord, QueryId, Record, RecordKey},
PeerId,
};

Expand Down Expand Up @@ -148,9 +148,6 @@ pub(crate) enum KademliaCommand {
/// Provided key.
key: RecordKey,

/// Our external addresses to publish.
public_addresses: Vec<Multiaddr>,

/// Query ID for the query.
query_id: QueryId,
},
Expand Down Expand Up @@ -210,9 +207,12 @@ pub enum KademliaEvent {
/// Query ID.
query_id: QueryId,

/// Provided key.
provided_key: RecordKey,

/// Found providers with cached addresses. Returned providers are sorted by distane to the
/// provided key.
providers: Vec<KademliaPeer>,
providers: Vec<ContentProvider>,
},

/// `PUT_VALUE` query succeeded.
Expand Down Expand Up @@ -240,6 +240,15 @@ pub enum KademliaEvent {
/// Record.
record: Record,
},

/// Incoming `ADD_PROVIDER` request received.
IncomingProvider {
/// Provided key.
provided_key: RecordKey,

/// Provider.
provider: ContentProvider,
},
}

/// The type of the DHT records.
Expand Down Expand Up @@ -352,20 +361,9 @@ impl KademliaHandle {
///
/// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn start_providing(
&mut self,
key: RecordKey,
public_addresses: Vec<Multiaddr>,
) -> QueryId {
pub async fn start_providing(&mut self, key: RecordKey) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
.send(KademliaCommand::StartProviding {
key,
public_addresses,
query_id,
})
.await;
let _ = self.cmd_tx.send(KademliaCommand::StartProviding { key, query_id }).await;

query_id
}
Expand Down
15 changes: 8 additions & 7 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
protocol::libp2p::kademlia::{
record::{Key as RecordKey, ProviderRecord, Record},
record::{ContentProvider, Key as RecordKey, Record},
schema,
types::{ConnectionType, KademliaPeer},
},
Expand Down Expand Up @@ -172,14 +172,14 @@ impl KademliaMessage {
}

/// Create `ADD_PROVIDER` message with `provider`.
pub fn add_provider(provider: ProviderRecord) -> Bytes {
pub fn add_provider(provided_key: RecordKey, provider: ContentProvider) -> Bytes {
let peer = KademliaPeer::new(
provider.provider,
provider.peer,
provider.addresses,
ConnectionType::CanConnect, // ignored by message recipient
);
let message = schema::kademlia::Message {
key: provider.key.clone().to_vec(),
key: provided_key.clone().to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::AddProvider.into(),
provider_peers: std::iter::once((&peer).into()).collect(),
Expand Down Expand Up @@ -209,16 +209,17 @@ impl KademliaMessage {

/// Create `GET_PROVIDERS` response.
pub fn get_providers_response(
providers: Vec<ProviderRecord>,
providers: Vec<ContentProvider>,
closer_peers: &[KademliaPeer],
) -> Vec<u8> {
let provider_peers = providers
.into_iter()
.map(|p| {
KademliaPeer::new(
p.provider,
p.peer,
p.addresses,
ConnectionType::CanConnect, // ignored by recipient
// `ConnectionType` is ignored by a recipient
ConnectionType::NotConnected,
)
})
.map(|p| (&p).into())
Expand Down
92 changes: 54 additions & 38 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{
handle::KademliaCommand,
message::KademliaMessage,
query::{QueryAction, QueryEngine},
record::ProviderRecord,
routing_table::RoutingTable,
store::{MemoryStore, MemoryStoreAction, MemoryStoreConfig},
types::{ConnectionType, KademliaPeer, Key},
Expand Down Expand Up @@ -61,7 +60,7 @@ pub use handle::{
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode,
};
pub use query::QueryId;
pub use record::{Key as RecordKey, PeerRecord, Record};
pub use record::{ContentProvider, Key as RecordKey, PeerRecord, Record};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia";
Expand Down Expand Up @@ -165,9 +164,6 @@ pub(crate) struct Kademlia {
/// Default record TTL.
record_ttl: Duration,

/// Provider record TTL.
provider_ttl: Duration,

/// Query engine.
engine: QueryEngine,

Expand All @@ -193,6 +189,7 @@ impl Kademlia {
local_peer_id,
MemoryStoreConfig {
provider_refresh_interval: config.provider_refresh_interval,
provider_ttl: config.provider_ttl,
..Default::default()
},
);
Expand All @@ -212,7 +209,6 @@ impl Kademlia {
update_mode: config.update_mode,
validation_mode: config.validation_mode,
record_ttl: config.record_ttl,
provider_ttl: config.provider_ttl,
replication_factor: config.replication_factor,
engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
}
Expand Down Expand Up @@ -523,7 +519,7 @@ impl Kademlia {
),
}
}
KademliaMessage::AddProvider { key, providers } => {
KademliaMessage::AddProvider { key, mut providers } => {
tracing::trace!(
target: LOG_TARGET,
?peer,
Expand All @@ -532,15 +528,27 @@ impl Kademlia {
"handle `ADD_PROVIDER` message",
);

match (providers.len(), providers.first()) {
match (providers.len(), providers.pop()) {
(1, Some(provider)) =>
if provider.peer == peer {
self.store.put_provider(ProviderRecord {
key,
provider: peer,
addresses: provider.addresses.clone(),
expires: Instant::now() + self.provider_ttl,
});
self.store.put_provider(
key.clone(),
ContentProvider {
peer,
addresses: provider.addresses.clone(),
},
);

let _ = self
.event_tx
.send(KademliaEvent::IncomingProvider {
provided_key: key,
provider: ContentProvider {
peer: provider.peer,
addresses: provider.addresses,
},
})
.await;
} else {
tracing::trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -590,10 +598,13 @@ impl Kademlia {
"handle `GET_PROVIDERS` request",
);

let providers = self.store.get_providers(key);
// TODO: if local peer is among the providers, update its `ProviderRecord`
// to have up-to-date addresses.
// Requires https://github.com/paritytech/litep2p/issues/211.
let mut providers = self.store.get_providers(key);

// Make sure local provider addresses are up to date.
let local_peer_id = self.local_key.clone().into_preimage();
providers.iter_mut().find(|p| p.peer == local_peer_id).as_mut().map(|p| {
p.addresses = self.service.public_addresses().get_addresses();
});

let closer_peers = self
.routing_table
Expand Down Expand Up @@ -787,16 +798,19 @@ impl Kademlia {

Ok(())
}
QueryAction::AddProviderToFoundNodes { provider, peers } => {
QueryAction::AddProviderToFoundNodes {
provided_key,
provider,
peers,
} => {
tracing::trace!(
target: LOG_TARGET,
provided_key = ?provider.key,
?provided_key,
num_peers = ?peers.len(),
"add provider record to found peers",
);

let provided_key = provider.key.clone();
let message = KademliaMessage::add_provider(provider);
let message = KademliaMessage::add_provider(provided_key.clone(), provider);

for peer in peers {
if let Err(error) = self.open_substream_or_dial(
Expand Down Expand Up @@ -828,12 +842,14 @@ impl Kademlia {
}
QueryAction::GetProvidersQueryDone {
query_id,
provided_key,
providers,
} => {
let _ = self
.event_tx
.send(KademliaEvent::GetProvidersSuccess {
query_id,
provided_key,
providers,
})
.await;
Expand Down Expand Up @@ -1036,28 +1052,26 @@ impl Kademlia {
}
Some(KademliaCommand::StartProviding {
key,
public_addresses,
query_id
}) => {
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
?key,
?public_addresses,
"register as a content provider",
);

let provider = ProviderRecord {
key: key.clone(),
provider: self.service.local_peer_id(),
addresses: public_addresses,
expires: Instant::now() + self.provider_ttl,
let addresses = self.service.public_addresses().get_addresses();
let provider = ContentProvider {
peer: self.service.local_peer_id(),
addresses,
};

self.store.put_provider(provider.clone());
self.store.put_provider(key.clone(), provider.clone());

self.engine.start_add_provider(
query_id,
key.clone(),
provider,
self.routing_table
.closest(Key::new(key), self.replication_factor)
Expand Down Expand Up @@ -1105,12 +1119,15 @@ impl Kademlia {
Some(KademliaCommand::GetProviders { key, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT");

let known_providers = self.store.get_providers(&key);

self.engine.start_get_providers(
query_id,
key.clone(),
self.routing_table
.closest(Key::new(key), self.replication_factor)
.into(),
known_providers,
);
}
Some(KademliaCommand::AddKnownPeer { peer, addresses }) => {
Expand Down Expand Up @@ -1151,25 +1168,24 @@ impl Kademlia {
}
},
action = self.store.next_action() => match action {
Some(MemoryStoreAction::RefreshProvider { mut provider }) => {
Some(MemoryStoreAction::RefreshProvider { provided_key, provider }) => {
tracing::trace!(
target: LOG_TARGET,
key = ?provider.key,
?provided_key,
"republishing local provider",
);

// Make sure to roll expiration time.
provider.expires = Instant::now() + self.provider_ttl;

self.store.put_provider(provider.clone());
self.store.put_provider(provided_key.clone(), provider.clone());
// We never update local provider addresses in the store when refresh
// it, as this is done anyway when replying to `GET_PROVIDERS` request.

let key = provider.key.clone();
let query_id = self.next_query_id();
self.engine.start_add_provider(
query_id,
provided_key.clone(),
provider,
self.routing_table
.closest(Key::new(key), self.replication_factor)
.closest(Key::new(provided_key), self.replication_factor)
.into(),
);
}
Expand Down
Loading