Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Providers part 5: GET_PROVIDERS query #236

Merged
merged 37 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1048320
Introduce `ADD_PROVIDER` query
dmitry-markin Aug 22, 2024
2ab84ae
Merge remote-tracking branch 'origin/master' into dm-provider-queries
dmitry-markin Aug 23, 2024
778078b
Execute `ADD_PROVIDER` query
dmitry-markin Aug 23, 2024
323a26f
Introduce local providers in `MemoryStore`
dmitry-markin Aug 27, 2024
8946809
Add provider refresh interval to Kademlia config
dmitry-markin Aug 28, 2024
474a58a
Merge remote-tracking branch 'origin/master' into dm-republish-providers
dmitry-markin Aug 28, 2024
d02acee
Move `FuturesStream` to a separate file
dmitry-markin Aug 28, 2024
5432c9b
Refresh providers: dry-run without network queries
dmitry-markin Aug 28, 2024
feb493d
Remove `try_get_record()` and other `try_...()` non-async methods
dmitry-markin Aug 29, 2024
982c73d
Move query ID generation from `KademliaHandle` to `Kademlia`
dmitry-markin Aug 29, 2024
68c7a87
Republish providers
dmitry-markin Aug 29, 2024
f9ed3e7
Merge remote-tracking branch 'origin/master' into dm-republish-providers
dmitry-markin Aug 30, 2024
764a3f2
Merge remote-tracking branch 'origin/master' into dm-add-providers
dmitry-markin Aug 30, 2024
5e97484
Use getter `TransportService::local_peer_id()` instead of accessing d…
dmitry-markin Aug 30, 2024
7caf290
Use getter `TransportService::local_peer_id()` instead of accessing d…
dmitry-markin Aug 30, 2024
60b8fb6
Introduce `GET_PROVIDERS` query
dmitry-markin Aug 27, 2024
d340c35
Update `get_providers()` to receive query ID via oneshot channel
dmitry-markin Aug 30, 2024
10b96db
Make lines fit into 100 characters
dmitry-markin Aug 30, 2024
6146540
Introduce `GetProvidersContext` & `GetProvidersConfig`
dmitry-markin Sep 2, 2024
e167419
Implement `GET_PROVIDERS` query
dmitry-markin Sep 2, 2024
eb6eee8
Merge returned provider addresses and deduplicate records
dmitry-markin Sep 3, 2024
8314d95
Merge branch 'dm-add-providers' into dm-republish-providers
dmitry-markin Sep 3, 2024
974eac5
minor: fix log target
dmitry-markin Sep 3, 2024
e895a44
Revert "Remove `try_get_record()` and other `try_...()` non-async met…
dmitry-markin Sep 4, 2024
ce18594
Revert "Move query ID generation from `KademliaHandle` to `Kademlia`"
dmitry-markin Sep 4, 2024
0fcd621
Use `AtomicUsize` to generate `QueryId` in both `KademliaHandle` and …
dmitry-markin Sep 4, 2024
5f26161
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 4, 2024
57f17ce
Use `open_substream_or_dial()` to add provider records to peers
dmitry-markin Sep 5, 2024
8594103
Merge remote-tracking branch 'origin/master' into dm-add-providers
dmitry-markin Sep 5, 2024
4861959
Fix refresh when we are the only provider
dmitry-markin Sep 9, 2024
c85dd3b
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 10, 2024
03846c7
Address review suggestions
dmitry-markin Sep 12, 2024
e853d3b
Merge branch 'dm-add-providers' into dm-republish-providers
dmitry-markin Sep 18, 2024
aeb59cf
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 18, 2024
cabb65a
Merge remote-tracking branch 'origin/master' into dm-get-providers
dmitry-markin Sep 30, 2024
ffef265
minor: fmt
dmitry-markin Sep 30, 2024
c17a251
minor: fix docs
dmitry-markin Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
protocol::libp2p::kademlia::{PeerRecord, QueryId, Record, RecordKey},
protocol::libp2p::kademlia::{KademliaPeer, PeerRecord, QueryId, Record, RecordKey},
PeerId,
};

Expand Down Expand Up @@ -134,6 +134,15 @@ pub(crate) enum KademliaCommand {
query_id: QueryId,
},

/// Get providers from DHT.
GetProviders {
/// Provided key.
key: RecordKey,

/// Query ID for the query.
query_id: QueryId,
},

/// Register as a content provider for `key`.
StartProviding {
/// Provided key.
Expand Down Expand Up @@ -190,6 +199,16 @@ pub enum KademliaEvent {
records: RecordsType,
},

/// `GET_PROVIDERS` query succeeded.
GetProvidersSuccess {
/// Query ID.
query_id: QueryId,

/// Found providers with cached addresses. Returned providers are sorted by distane to the
/// provided key.
providers: Vec<KademliaPeer>,
},

/// `PUT_VALUE` query succeeded.
// TODO: this is never emitted. Implement + add `AddProviderSuccess`.
PutRecordSuccess {
Expand Down Expand Up @@ -284,6 +303,8 @@ impl KademliaHandle {
}

/// Store record to DHT to the given peers.
///
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn put_record_to_peers(
&mut self,
record: Record,
Expand All @@ -305,6 +326,8 @@ impl KademliaHandle {
}

/// Get record from DHT.
///
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
let query_id = self.next_query_id();
let _ = self
Expand All @@ -322,6 +345,7 @@ impl KademliaHandle {
/// Register as a content provider on the DHT.
///
/// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn start_providing(
&mut self,
key: RecordKey,
Expand All @@ -340,6 +364,16 @@ impl KademliaHandle {
query_id
}

/// Get providers from DHT.
///
/// Returns [`Err`] only if `Kademlia` is terminating.
pub async fn get_providers(&mut self, key: RecordKey) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::GetProviders { key, query_id }).await;

query_id
}

/// Store the record in the local store. Used in combination with
/// [`IncomingRecordValidationMode::Manual`].
pub async fn store_record(&mut self, record: Record) {
Expand Down
15 changes: 5 additions & 10 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,35 +187,31 @@ impl KademliaMessage {
};

let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
message.encode(&mut buf).expect("BytesMut to provide needed capacity");

buf.freeze()
}

/// Create `GET_PROVIDERS` request for `key`.
#[allow(unused)]
pub fn get_providers_request(key: RecordKey) -> Vec<u8> {
pub fn get_providers_request(key: RecordKey) -> Bytes {
let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");
let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("BytesMut to provide needed capacity");

buf
buf.freeze()
}

/// Create `GET_PROVIDERS` response.
pub fn get_providers_response(
key: RecordKey,
providers: Vec<ProviderRecord>,
closer_peers: &[KademliaPeer],
) -> Vec<u8> {
debug_assert!(providers.iter().all(|p| p.key == key));

let provider_peers = providers
.into_iter()
.map(|p| {
Expand All @@ -229,7 +225,6 @@ impl KademliaMessage {
.collect();

let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::GetProviders.into(),
closer_peers: closer_peers.iter().map(Into::into).collect(),
Expand Down
51 changes: 41 additions & 10 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,8 @@ impl Kademlia {
.routing_table
.closest(Key::from(key.to_vec()), self.replication_factor);

let message = KademliaMessage::get_providers_response(
key.clone(),
providers,
&closer_peers,
);
let message =
KademliaMessage::get_providers_response(providers, &closer_peers);
self.executor.send_message(peer, message.into(), substream);
}
(None, None) => tracing::debug!(
Expand Down Expand Up @@ -829,6 +826,19 @@ impl Kademlia {
.await;
Ok(())
}
QueryAction::GetProvidersQueryDone {
query_id,
providers,
} => {
let _ = self
.event_tx
.send(KademliaEvent::GetProvidersSuccess {
query_id,
providers,
})
.await;
Ok(())
}
QueryAction::QueryFailed { query } => {
tracing::debug!(target: LOG_TARGET, ?query, "query failed");

Expand Down Expand Up @@ -963,7 +973,8 @@ impl Kademlia {
"store record to DHT",
);

// For `PUT_VALUE` requests originating locally we are always the publisher.
// For `PUT_VALUE` requests originating locally we are always the
// publisher.
record.publisher = Some(self.local_key.clone().into_preimage());

// Make sure TTL is set.
Expand Down Expand Up @@ -1060,21 +1071,37 @@ impl Kademlia {
(Some(record), Quorum::One) => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) })
.send(KademliaEvent::GetRecordSuccess {
query_id,
records: RecordsType::LocalStore(record.clone()),
})
.await;
}
(record, _) => {
self.engine.start_get_record(
query_id,
key.clone(),
self.routing_table.closest(Key::new(key.clone()), self.replication_factor).into(),
self.routing_table
.closest(Key::new(key), self.replication_factor)
.into(),
quorum,
if record.is_some() { 1 } else { 0 },
);
}
}

}
Some(KademliaCommand::GetProviders { key, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get providers from DHT");

self.engine.start_get_providers(
query_id,
key.clone(),
self.routing_table
.closest(Key::new(key), self.replication_factor)
.into(),
);
}
Some(KademliaCommand::AddKnownPeer { peer, addresses }) => {
tracing::trace!(
target: LOG_TARGET,
Expand All @@ -1088,7 +1115,10 @@ impl Kademlia {
addresses.clone(),
self.peers
.get(&peer)
.map_or(ConnectionType::NotConnected, |_| ConnectionType::Connected),
.map_or(
ConnectionType::NotConnected,
|_| ConnectionType::Connected,
),
);
self.service.add_known_address(&peer, addresses.into_iter());

Expand All @@ -1101,7 +1131,8 @@ impl Kademlia {
);

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
record.expires =
record.expires.or_else(|| Some(Instant::now() + self.record_ttl));

self.store.put(record);
}
Expand Down
Loading