Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Providers part 3: publish provider records (start providing) #234

Merged
merged 8 commits into from
Sep 30, 2024
36 changes: 35 additions & 1 deletion src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ pub(crate) enum KademliaCommand {
query_id: QueryId,
},

/// Register as a content provider for `key`.
StartProviding {
/// Provided key.
key: RecordKey,

/// Our external addresses to publish.
public_addresses: Vec<Multiaddr>,

/// Query ID for the query.
query_id: QueryId,
},

/// Store record locally.
StoreRecord {
// Record.
Expand Down Expand Up @@ -175,7 +187,8 @@ pub enum KademliaEvent {
},

/// `PUT_VALUE` query succeeded.
PutRecordSucess {
// TODO: this is never emitted. Implement + add `AddProviderSuccess`.
PutRecordSuccess {
/// Query ID.
query_id: QueryId,

Expand Down Expand Up @@ -299,6 +312,27 @@ impl KademliaHandle {
query_id
}

/// Register as a content provider on the DHT.
///
/// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
pub async fn start_providing(
&mut self,
key: RecordKey,
public_addresses: Vec<Multiaddr>,
) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
.send(KademliaCommand::StartProviding {
key,
public_addresses,
query_id,
})
.await;

query_id
}

/// Store the record in the local store. Used in combination with
/// [`IncomingRecordValidationMode::Manual`].
pub async fn store_record(&mut self, record: Record) {
Expand Down
7 changes: 3 additions & 4 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ impl KademliaMessage {
}

/// Create `ADD_PROVIDER` message with `provider`.
#[allow(unused)]
pub fn add_provider(provider: ProviderRecord) -> Vec<u8> {
pub fn add_provider(provider: ProviderRecord) -> Bytes {
let peer = KademliaPeer::new(
provider.provider,
provider.addresses,
Expand All @@ -187,10 +186,10 @@ impl KademliaMessage {
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");

buf
buf.freeze()
}

/// Create `GET_PROVIDERS` request for `key`.
Expand Down
154 changes: 137 additions & 17 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ mod schema {
}

/// Peer action.
#[derive(Debug)]
#[derive(Debug, Clone)]
enum PeerAction {
/// Send `FIND_NODE` message to peer.
SendFindNode(QueryId),

/// Send `PUT_VALUE` message to peer.
SendPutValue(Bytes),

/// Send `ADD_PROVIDER` message to peer.
SendAddProvider(Bytes),
}

/// Peer context.
Expand Down Expand Up @@ -335,7 +338,12 @@ impl Kademlia {
}
}
Some(PeerAction::SendPutValue(message)) => {
tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` response");
tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message");

self.executor.send_message(peer, message, substream);
}
Some(PeerAction::SendAddProvider(message)) => {
tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message");

self.executor.send_message(peer, message, substream);
}
Expand Down Expand Up @@ -758,6 +766,35 @@ impl Kademlia {

Ok(())
}
QueryAction::AddProviderToFoundNodes { provider, peers } => {
tracing::trace!(
target: LOG_TARGET,
provided_key = ?provider.key,
num_peers = ?peers.len(),
"add provider record to found peers",
);

let provided_key = provider.key.clone();
let message = KademliaMessage::add_provider(provider);

for peer in peers {
if let Err(error) = self.open_substream_or_dial(
peer.peer,
PeerAction::SendAddProvider(message.clone()),
None,
) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?provided_key,
?error,
"failed to add provider record to peer",
)
}
}

Ok(())
}
QueryAction::GetRecordQueryDone { query_id, records } => {
let _ = self
.event_tx
Expand Down Expand Up @@ -794,7 +831,11 @@ impl Kademlia {
event = self.service.next() => match event {
Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
if let Err(error) = self.on_connection_established(peer) {
tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection");
tracing::debug!(
target: LOG_TARGET,
?error,
"failed to handle established connection",
);
}
}
Some(TransportEvent::ConnectionClosed { peer }) => {
Expand All @@ -804,7 +845,10 @@ impl Kademlia {
match direction {
Direction::Inbound => self.on_inbound_substream(peer, substream).await,
Direction::Outbound(substream_id) => {
if let Err(error) = self.on_outbound_substream(peer, substream_id, substream).await {
if let Err(error) = self
.on_outbound_substream(peer, substream_id, substream)
.await
{
tracing::debug!(
target: LOG_TARGET,
?peer,
Expand All @@ -819,22 +863,41 @@ impl Kademlia {
Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
self.on_substream_open_failure(substream, error).await;
}
Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address),
Some(TransportEvent::DialFailure { peer, address, .. }) =>
self.on_dial_failure(peer, address),
None => return Err(Error::EssentialTaskClosed),
},
context = self.executor.next() => {
let QueryContext { peer, query_id, result } = context.unwrap();

match result {
QueryResult::SendSuccess { substream } => {
tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer");
tracing::trace!(
target: LOG_TARGET,
?peer,
query = ?query_id,
"message sent to peer",
);
let _ = substream.close().await;
}
QueryResult::ReadSuccess { substream, message } => {
tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer");
tracing::trace!(target: LOG_TARGET,
?peer,
query = ?query_id,
"message read from peer",
);

if let Err(error) = self.on_message_received(peer, query_id, message, substream).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message");
if let Err(error) = self.on_message_received(
peer,
query_id,
message,
substream
).await {
tracing::debug!(target: LOG_TARGET,
?peer,
?error,
"failed to process message",
);
}
}
QueryResult::SubstreamClosed | QueryResult::Timeout => {
Expand All @@ -853,22 +916,36 @@ impl Kademlia {
command = self.cmd_rx.recv() => {
match command {
Some(KademliaCommand::FindNode { peer, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query");
tracing::debug!(
target: LOG_TARGET,
?peer,
query = ?query_id,
"starting `FIND_NODE` query",
);

self.engine.start_find_node(
query_id,
peer,
self.routing_table.closest(Key::from(peer), self.replication_factor).into()
self.routing_table
.closest(Key::from(peer), self.replication_factor)
.into()
);
}
Some(KademliaCommand::PutRecord { mut record, query_id }) => {
tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT");
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
key = ?record.key,
"store record to DHT",
);

// For `PUT_VALUE` requests originating locally we are always the publisher.
record.publisher = Some(self.local_key.clone().into_preimage());

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
record.expires = record
.expires
.or_else(|| Some(Instant::now() + self.record_ttl));

let key = Key::new(record.key.clone());

Expand All @@ -880,11 +957,23 @@ impl Kademlia {
self.routing_table.closest(key, self.replication_factor).into(),
);
}
Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => {
tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers");
Some(KademliaCommand::PutRecordToPeers {
mut record,
query_id,
peers,
update_local_store,
}) => {
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
key = ?record.key,
"store record to DHT to specified peers",
);

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
record.expires = record
.expires
.or_else(|| Some(Instant::now() + self.record_ttl));

if update_local_store {
self.store.put(record.clone());
Expand All @@ -898,7 +987,8 @@ impl Kademlia {

match self.routing_table.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => Some(entry.clone()),
KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()),
KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() =>
Some(entry.clone()),
_ => None,
}
}).collect();
Expand All @@ -909,6 +999,36 @@ impl Kademlia {
peers,
);
}
Some(KademliaCommand::StartProviding {
key,
public_addresses,
query_id
}) => {
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
?key,
?public_addresses,
"register as content provider"
);

let provider = ProviderRecord {
key: key.clone(),
provider: self.service.local_peer_id(),
addresses: public_addresses,
expires: Instant::now() + self.provider_ttl,
};

self.store.put_provider(provider.clone());

self.engine.start_add_provider(
query_id,
provider,
self.routing_table
.closest(Key::new(key), self.replication_factor)
.into(),
);
}
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");

Expand Down
Loading