Skip to content

Commit 3473d39

Browse files
committed
[feat] Basic IPNS support
1 parent 1b63080 commit 3473d39

File tree

11 files changed

+305
-36
lines changed

11 files changed

+305
-36
lines changed

iroh-p2p/src/dht_records.rs

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use ahash::AHashMap;
2+
use libp2p::kad::{GetRecordOk, GetRecordResult, PeerRecord, QueryId};
3+
use tokio::sync::mpsc::Sender;
4+
5+
pub struct DhtGetQuery {
6+
response_channel: Sender<anyhow::Result<PeerRecord>>,
7+
}
8+
9+
impl DhtGetQuery {
10+
pub fn new(response_channel: Sender<anyhow::Result<PeerRecord>>) -> DhtGetQuery {
11+
DhtGetQuery { response_channel }
12+
}
13+
}
14+
15+
#[derive(Default)]
16+
pub struct DhtRecords {
17+
current_queries: AHashMap<QueryId, DhtGetQuery>,
18+
}
19+
20+
impl DhtRecords {
21+
pub fn insert(&mut self, query_id: QueryId, query: DhtGetQuery) {
22+
self.current_queries.insert(query_id, query);
23+
}
24+
25+
pub fn handle_get_record_result(&mut self, id: QueryId, get_record_result: GetRecordResult) {
26+
if let Some(query) = self.current_queries.remove(&id) {
27+
match get_record_result {
28+
Ok(GetRecordOk::FoundRecord(record)) => {
29+
tokio::spawn(async move { query.response_channel.send(Ok(record)).await.ok() });
30+
}
31+
Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates }) => {}
32+
Err(_) => todo!(),
33+
}
34+
};
35+
}
36+
}

iroh-p2p/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod behaviour;
22
pub mod cli;
33
pub mod config;
4+
mod dht_records;
45
mod keys;
56
pub mod metrics;
67
mod node;

iroh-p2p/src/node.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use tracing::{debug, error, info, trace, warn};
3434
use iroh_bitswap::{BitswapEvent, Block};
3535
use iroh_rpc_client::Lookup;
3636

37+
use crate::dht_records::{DhtGetQuery, DhtRecords};
3738
use crate::keys::{Keychain, Storage};
3839
use crate::providers::Providers;
3940
use crate::rpc::{P2p, ProviderRequestKey};
@@ -87,6 +88,7 @@ pub struct Node<KeyStorage: Storage> {
8788
use_dht: bool,
8889
bitswap_sessions: BitswapSessions,
8990
providers: Providers,
91+
dht_records: DhtRecords,
9092
listen_addrs: Vec<Multiaddr>,
9193
}
9294

@@ -175,6 +177,7 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
175177
use_dht: libp2p_config.kademlia,
176178
bitswap_sessions: Default::default(),
177179
providers: Providers::new(4),
180+
dht_records: DhtRecords::default(),
178181
listen_addrs,
179182
})
180183
}
@@ -522,7 +525,6 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
522525
}
523526
Event::Kademlia(e) => {
524527
libp2p_metrics().record(&e);
525-
526528
if let KademliaEvent::OutboundQueryProgressed {
527529
id, result, step, ..
528530
} = e
@@ -627,6 +629,11 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
627629
});
628630
}
629631
}
632+
QueryResult::GetRecord(get_record_result) => self
633+
.dht_records
634+
.handle_get_record_result(id, get_record_result),
635+
QueryResult::PutRecord(_) => todo!(),
636+
QueryResult::RepublishRecord(_) => todo!(),
630637
other => {
631638
debug!("Libp2p => Unhandled Kademlia query result: {:?}", other)
632639
}
@@ -1061,6 +1068,16 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
10611068
RpcMessage::Shutdown => {
10621069
return Ok(true);
10631070
}
1071+
RpcMessage::DhtGet {
1072+
key,
1073+
response_channel,
1074+
} => {
1075+
if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() {
1076+
let query_id = kad.get_record(key);
1077+
self.dht_records
1078+
.insert(query_id, DhtGetQuery::new(response_channel));
1079+
}
1080+
}
10641081
}
10651082

10661083
Ok(false)

iroh-p2p/src/rpc.rs

+36
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,20 @@ use tokio::sync::mpsc::{channel, Sender};
2727
use tokio::sync::oneshot;
2828
use tracing::{debug, info, trace};
2929

30+
use async_trait::async_trait;
31+
use iroh_bitswap::Block;
32+
use iroh_rpc_client::Lookup;
33+
use iroh_rpc_types::p2p::{
34+
BitswapRequest, BitswapResponse, ConnectByPeerIdRequest, ConnectRequest, DhtGetRequest,
35+
DisconnectRequest, GetListeningAddrsResponse, GetPeersResponse, GossipsubAllPeersResponse,
36+
GossipsubPeerAndTopics, GossipsubPeerIdMsg, GossipsubPeersResponse, GossipsubPublishRequest,
37+
GossipsubPublishResponse, GossipsubSubscribeResponse, GossipsubTopicHashMsg,
38+
GossipsubTopicsResponse, Key as ProviderKey, LookupRequest, Multiaddrs,
39+
NotifyNewBlocksBitswapRequest, P2p as RpcP2p, P2pServerAddr, PeerIdResponse, PeerInfo,
40+
Providers, StopSessionBitswapRequest, VersionResponse,
41+
};
42+
use libp2p::kad::PeerRecord;
43+
3044
use super::node::DEFAULT_PROVIDER_LIMIT;
3145
use crate::VERSION;
3246

@@ -231,6 +245,24 @@ impl P2p {
231245
Ok(stream)
232246
}
233247

248+
#[tracing::instrument(skip(self, req))]
249+
async fn dht_get(
250+
&self,
251+
req: DhtGetRequest,
252+
) -> Result<Pin<Box<dyn Stream<Item = Result<iroh_rpc_types::p2p::PeerRecord>> + Send>>> {
253+
// ToDo: parametrize size of channel in a proper way
254+
let (sender, receiver) = channel(64);
255+
let msg = RpcMessage::DhtGet {
256+
key: Key::from(req.key.unwrap().key),
257+
response_channel: sender,
258+
};
259+
self.sender.send(msg).await?;
260+
let receiver = tokio_stream::wrappers::ReceiverStream::new(receiver)
261+
.map(|peer_record| peer_record.map(|peer_record| peer_record.into()));
262+
263+
Ok(Box::pin(receiver))
264+
}
265+
234266
#[tracing::instrument(skip(self, req))]
235267
async fn start_providing(self, req: StartProvidingRequest) -> Result<()> {
236268
trace!("received StartProviding request: {:?}", req.key);
@@ -620,6 +652,10 @@ pub enum RpcMessage {
620652
ExternalAddrs(oneshot::Sender<Vec<Multiaddr>>),
621653
Listeners(oneshot::Sender<Vec<Multiaddr>>),
622654
LocalPeerId(oneshot::Sender<PeerId>),
655+
DhtGet {
656+
key: Key,
657+
response_channel: Sender<Result<PeerRecord>>,
658+
},
623659
BitswapRequest {
624660
ctx: u64,
625661
cids: Vec<Cid>,

iroh-resolver/src/ipns.proto

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
syntax = "proto2";
2+
3+
package ipns_pb;
4+
5+
message IpnsEntry {
6+
enum ValidityType {
7+
// setting an EOL says "this record is valid until..."
8+
EOL = 0;
9+
}
10+
optional bytes value = 1;
11+
optional bytes signatureV1 = 2;
12+
13+
optional ValidityType validityType = 3;
14+
optional bytes validity = 4;
15+
16+
optional uint64 sequence = 5;
17+
18+
optional uint64 ttl = 6;
19+
20+
// in order for nodes to properly validate a record upon receipt, they need the public
21+
// key associated with it. For old RSA keys, its easiest if we just send this as part of
22+
// the record itself. For newer ed25519 keys, the public key can be embedded in the
23+
// peerID, making this field unnecessary.
24+
optional bytes pubKey = 7;
25+
26+
optional bytes signatureV2 = 8;
27+
28+
optional bytes data = 9;
29+
}
30+
31+
message IpnsSignatureV2Checker {
32+
optional bytes pubKey = 7;
33+
optional bytes signatureV2 = 8;
34+
}

0 commit comments

Comments
 (0)