Skip to content

Commit 80dc47f

Browse files
committed
[feat] Basic IPNS support
1 parent 1b63080 commit 80dc47f

File tree

12 files changed

+273
-9
lines changed

12 files changed

+273
-9
lines changed

iroh-p2p/src/dht_records.rs

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use ahash::AHashMap;
2+
use libp2p::kad::{GetRecordOk, GetRecordResult, PeerRecord, QueryId};
3+
use tokio::sync::mpsc::Sender;
4+
5+
pub struct DhtGetQuery {
6+
response_channel: Sender<anyhow::Result<PeerRecord>>,
7+
}
8+
9+
impl DhtGetQuery {
10+
pub fn new(response_channel: Sender<anyhow::Result<PeerRecord>>) -> DhtGetQuery {
11+
DhtGetQuery { response_channel }
12+
}
13+
}
14+
15+
#[derive(Default)]
16+
pub struct DhtRecords {
17+
current_queries: AHashMap<QueryId, DhtGetQuery>,
18+
}
19+
20+
impl DhtRecords {
21+
pub fn insert(&mut self, query_id: QueryId, query: DhtGetQuery) {
22+
self.current_queries.insert(query_id, query);
23+
}
24+
25+
pub fn handle_get_record_result(&mut self, id: QueryId, get_record_result: GetRecordResult) {
26+
if let Some(query) = self.current_queries.remove(&id) {
27+
match get_record_result {
28+
Ok(GetRecordOk::FoundRecord(record)) => {
29+
tokio::spawn(async move { query.response_channel.send(Ok(record)).await.ok() });
30+
}
31+
Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates }) => {}
32+
Err(_) => todo!(),
33+
}
34+
};
35+
}
36+
}

iroh-p2p/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod behaviour;
22
pub mod cli;
33
pub mod config;
4+
mod dht_records;
45
mod keys;
56
pub mod metrics;
67
mod node;

iroh-p2p/src/node.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use tracing::{debug, error, info, trace, warn};
3434
use iroh_bitswap::{BitswapEvent, Block};
3535
use iroh_rpc_client::Lookup;
3636

37+
use crate::dht_records::{DhtGetQuery, DhtRecords};
3738
use crate::keys::{Keychain, Storage};
3839
use crate::providers::Providers;
3940
use crate::rpc::{P2p, ProviderRequestKey};
@@ -87,6 +88,7 @@ pub struct Node<KeyStorage: Storage> {
8788
use_dht: bool,
8889
bitswap_sessions: BitswapSessions,
8990
providers: Providers,
91+
dht_records: DhtRecords,
9092
listen_addrs: Vec<Multiaddr>,
9193
}
9294

@@ -175,6 +177,7 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
175177
use_dht: libp2p_config.kademlia,
176178
bitswap_sessions: Default::default(),
177179
providers: Providers::new(4),
180+
dht_records: DhtRecords::default(),
178181
listen_addrs,
179182
})
180183
}
@@ -522,7 +525,6 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
522525
}
523526
Event::Kademlia(e) => {
524527
libp2p_metrics().record(&e);
525-
526528
if let KademliaEvent::OutboundQueryProgressed {
527529
id, result, step, ..
528530
} = e
@@ -627,6 +629,11 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
627629
});
628630
}
629631
}
632+
QueryResult::GetRecord(get_record_result) => self
633+
.dht_records
634+
.handle_get_record_result(id, get_record_result),
635+
QueryResult::PutRecord(_) => todo!(),
636+
QueryResult::RepublishRecord(_) => todo!(),
630637
other => {
631638
debug!("Libp2p => Unhandled Kademlia query result: {:?}", other)
632639
}
@@ -1061,6 +1068,16 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
10611068
RpcMessage::Shutdown => {
10621069
return Ok(true);
10631070
}
1071+
RpcMessage::DhtGet {
1072+
key,
1073+
response_channel,
1074+
} => {
1075+
if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() {
1076+
let query_id = kad.get_record(key);
1077+
self.dht_records
1078+
.insert(query_id, DhtGetQuery::new(response_channel));
1079+
}
1080+
}
10641081
}
10651082

10661083
Ok(false)

iroh-p2p/src/rpc.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use futures::{
66
stream::{BoxStream, Stream},
77
TryFutureExt,
88
};
9-
use iroh_bitswap::Block;
109
use iroh_rpc_client::{
1110
create_server, Lookup, P2pServer, ServerError, ServerSocket, HEALTH_POLL_WAIT,
1211
};
@@ -27,6 +26,17 @@ use tokio::sync::mpsc::{channel, Sender};
2726
use tokio::sync::oneshot;
2827
use tracing::{debug, info, trace};
2928

29+
use async_trait::async_trait;
30+
use iroh_bitswap::Block;
31+
use iroh_rpc_types::p2p::{
32+
BitswapRequest, BitswapResponse, ConnectByPeerIdRequest, ConnectRequest, DhtGetRequest,
33+
DisconnectRequest, GetListeningAddrsResponse, GetPeersResponse, GossipsubAllPeersResponse,
34+
GossipsubPublishResponse, GossipsubSubscribeResponse,
35+
GossipsubTopicsResponse, Key as ProviderKey, LookupRequest,
36+
NotifyNewBlocksBitswapRequest, StopSessionBitswapRequest, VersionResponse,
37+
};
38+
use libp2p::kad::PeerRecord;
39+
3040
use super::node::DEFAULT_PROVIDER_LIMIT;
3141
use crate::VERSION;
3242

@@ -231,6 +241,24 @@ impl P2p {
231241
Ok(stream)
232242
}
233243

244+
#[tracing::instrument(skip(self, req))]
245+
async fn dht_get(
246+
&self,
247+
req: DhtGetRequest,
248+
) -> anyhow::Result<BoxStream<'static, anyhow::Result<iroh_rpc_types::p2p::PeerRecord>>> {
249+
// ToDo: parametrize size of channel in a proper way
250+
let (sender, receiver) = channel(64);
251+
let msg = RpcMessage::DhtGet {
252+
key: Key::from(req.key.0.to_vec()),
253+
response_channel: sender,
254+
};
255+
self.sender.send(msg).await?;
256+
let receiver = tokio_stream::wrappers::ReceiverStream::new(receiver)
257+
.map(|peer_record| peer_record.map(|peer_record| peer_record.into()));
258+
259+
Ok(Box::pin(receiver))
260+
}
261+
234262
#[tracing::instrument(skip(self, req))]
235263
async fn start_providing(self, req: StartProvidingRequest) -> Result<()> {
236264
trace!("received StartProviding request: {:?}", req.key);
@@ -620,6 +648,10 @@ pub enum RpcMessage {
620648
ExternalAddrs(oneshot::Sender<Vec<Multiaddr>>),
621649
Listeners(oneshot::Sender<Vec<Multiaddr>>),
622650
LocalPeerId(oneshot::Sender<PeerId>),
651+
DhtGet {
652+
key: Key,
653+
response_channel: Sender<Result<PeerRecord>>,
654+
},
623655
BitswapRequest {
624656
ctx: u64,
625657
cids: Vec<Cid>,

iroh-resolver/src/resolver.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ use crate::dns_resolver::{Config, DnsResolver};
3535

3636
pub const IROH_STORE: &str = "iroh-store";
3737

38+
pub(crate) mod ipns_pb {
39+
#![allow(clippy::all)]
40+
include!(concat!(env!("OUT_DIR"), "/ipns_pb.rs"));
41+
}
42+
3843
// ToDo: Remove this function
3944
// Related issue: https://github.com/n0-computer/iroh/issues/593
4045
fn from_peer_id(id: &str) -> Option<libipld::Multihash> {
@@ -1066,8 +1071,12 @@ impl<T: ContentLoader> Resolver<T> {
10661071
},
10671072
PathType::Ipns => match current.root() {
10681073
CidOrDomain::Cid(ref c) => {
1069-
let c = self.load_ipns_record(c).await?;
1070-
current = Path::from_cid(c);
1074+
let resolved_record = Bytes::from(self.load_ipns_record(c).await?.value);
1075+
let proto = ipns_pb::IpnsEntry::decode(resolved_record).unwrap();
1076+
current = Path::from_str(
1077+
&String::from_utf8(proto.value.clone().unwrap()).unwrap(),
1078+
)
1079+
.unwrap();
10711080
}
10721081
CidOrDomain::Domain(ref domain) => {
10731082
let mut records = self.dns_resolver.resolve_dnslink(domain).await?;
@@ -1102,7 +1111,7 @@ impl<T: ContentLoader> Resolver<T> {
11021111

11031112
#[tracing::instrument(skip(self))]
11041113
async fn load_ipns_record(&self, cid: &Cid) -> Result<Cid> {
1105-
todo!()
1114+
self.loader.load_record_from_dht(cid).await
11061115
}
11071116
}
11081117

iroh-rpc-client/src/network.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::{StatusType, HEALTH_POLL_WAIT};
12
use anyhow::Result;
23
use async_stream::stream;
34
use bytes::Bytes;
@@ -9,8 +10,6 @@ use libp2p::{Multiaddr, PeerId};
910
use std::collections::{HashMap, HashSet};
1011
use tracing::{debug, warn};
1112

12-
use crate::{StatusType, HEALTH_POLL_WAIT};
13-
1413
#[derive(Debug, Clone)]
1514
pub struct P2pClient {
1615
client: quic_rpc::RpcClient<P2pService, crate::ChannelTypes>,
@@ -101,6 +100,17 @@ impl P2pClient {
101100
Ok(providers_stream)
102101
}
103102

103+
#[tracing::instrument(skip(self))]
104+
pub async fn dht_get(
105+
&self,
106+
key: &libp2p::kad::record::Key,
107+
) -> Result<impl Stream<Item = Result<iroh_rpc_types::p2p::PeerRecord>>> {
108+
let key = Key(key.to_vec().into());
109+
let res = self.client.server_streaming(DhtGetRequest { key }).await?;
110+
let peer_records_stream = res.map(|p| Ok(p??));
111+
Ok(peer_records_stream)
112+
}
113+
104114
#[tracing::instrument(skip(self))]
105115
pub async fn start_providing(&self, key: &Cid) -> Result<()> {
106116
let key = Key(key.hash().to_bytes().into());

iroh-rpc-types/src/p2p.rs

+75
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use quic_rpc::{
88
};
99
use serde::{Deserialize, Serialize};
1010
use std::collections::BTreeMap;
11+
use std::time::{Duration, Instant};
1112

1213
use crate::{RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse};
1314

@@ -40,6 +41,20 @@ pub struct ListenersResponse {
4041
pub addrs: Vec<Multiaddr>,
4142
}
4243

44+
#[derive(Serialize, Deserialize, Debug)]
45+
pub struct DhtGetRequest {
46+
pub key: Key,
47+
}
48+
49+
#[derive(Serialize, Deserialize, Debug)]
50+
pub struct PeerRecord {
51+
pub peer_id: Option<Bytes>,
52+
pub key: Bytes,
53+
pub value: Bytes,
54+
pub ttl: u32,
55+
pub publisher: Option<Bytes>,
56+
}
57+
4358
#[derive(Serialize, Deserialize, Debug)]
4459
pub struct BitswapRequest {
4560
pub cid: Cid,
@@ -219,6 +234,7 @@ pub enum P2pRequest {
219234
Watch(WatchRequest),
220235
Version(VersionRequest),
221236
Shutdown(ShutdownRequest),
237+
DhtGet(DhtGetRequest),
222238
FetchBitswap(BitswapRequest),
223239
FetchProviderDht(FetchProvidersDhtRequest),
224240
StopSessionBitswap(StopSessionBitswapRequest),
@@ -250,6 +266,7 @@ pub enum P2pRequest {
250266
pub enum P2pResponse {
251267
Watch(WatchResponse),
252268
Version(VersionResponse),
269+
DhtGet(RpcResult<PeerRecord>),
253270
FetchBitswap(RpcResult<BitswapResponse>),
254271
FetchProviderDht(RpcResult<FetchProvidersDhtResponse>),
255272
GetListeningAddrs(RpcResult<GetListeningAddrsResponse>),
@@ -291,6 +308,14 @@ impl RpcMsg<P2pService> for ShutdownRequest {
291308
type Response = RpcResult<()>;
292309
}
293310

311+
impl Msg<P2pService> for DhtGetRequest {
312+
type Response = RpcResult<PeerRecord>;
313+
314+
type Update = Self;
315+
316+
type Pattern = ServerStreaming;
317+
}
318+
294319
impl RpcMsg<P2pService> for BitswapRequest {
295320
type Response = RpcResult<BitswapResponse>;
296321
}
@@ -394,3 +419,53 @@ impl RpcMsg<P2pService> for ExternalAddrsRequest {
394419
impl RpcMsg<P2pService> for ListenersRequest {
395420
type Response = RpcResult<ListenersResponse>;
396421
}
422+
423+
impl From<libp2p::kad::PeerRecord> for PeerRecord {
424+
fn from(peer_record: libp2p::kad::PeerRecord) -> Self {
425+
PeerRecord {
426+
peer_id: peer_record.peer.map(|peer| Bytes::from(peer.to_bytes())),
427+
key: peer_record.record.key.to_vec().into(),
428+
ttl: peer_record
429+
.record
430+
.expires
431+
.map(|t| {
432+
let now = Instant::now();
433+
if t > now {
434+
(t - now).as_secs() as u32
435+
} else {
436+
1 // because 0 means "does not expire"
437+
}
438+
})
439+
.unwrap_or(0),
440+
value: peer_record.record.value.into(),
441+
publisher: peer_record
442+
.record
443+
.publisher
444+
.map(|publisher| Bytes::from(publisher.to_bytes())),
445+
}
446+
}
447+
}
448+
449+
impl From<PeerRecord> for libp2p::kad::PeerRecord {
450+
fn from(peer_record: PeerRecord) -> Self {
451+
let peer = peer_record
452+
.peer_id
453+
.as_ref()
454+
.map(|peer_id| libp2p::PeerId::from_bytes(peer_id).unwrap());
455+
let key = libp2p::kad::record::Key::from(peer_record.key.to_vec());
456+
let value = peer_record.value;
457+
let publisher = peer_record
458+
.publisher
459+
.as_ref()
460+
.map(|publisher| libp2p::PeerId::from_bytes(publisher).unwrap());
461+
let expires = (peer_record.ttl > 0)
462+
.then(|| Instant::now() + Duration::from_secs(peer_record.ttl as u64));
463+
let record = libp2p::kad::Record {
464+
key,
465+
value: value.to_vec(),
466+
publisher,
467+
expires,
468+
};
469+
libp2p::kad::PeerRecord { peer, record }
470+
}
471+
}

iroh-share/src/p2p_node.rs

+4
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ impl ContentLoader for Loader {
127127
async fn has_cid(&self, cid: &Cid) -> Result<bool> {
128128
Ok(self.client.try_store()?.has(*cid).await?)
129129
}
130+
131+
async fn load_record_from_dht(&self, _: &Cid) -> Result<iroh_rpc_types::p2p::PeerRecord> {
132+
todo!()
133+
}
130134
}
131135

132136
impl P2pNode {

iroh-unixfs/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ fastmurmur3.workspace = true
2222
futures.workspace = true
2323
iroh-metrics = { workspace = true, features = ["resolver", "gateway"] }
2424
iroh-rpc-client.workspace = true
25+
iroh-rpc-types.workspace = true
2526
iroh-util.workspace = true
2627
libipld.workspace = true
2728
libp2p = { workspace = true, features = ["serde"] }

iroh-unixfs/build.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
fn main() {
22
prost_build::Config::new()
3-
.bytes([".unixfs_pb.Data", ".merkledag_pb.PBNode.Data"])
4-
.compile_protos(&["src/unixfs.proto", "src/merkledag.proto"], &["src"])
3+
.bytes([
4+
".unixfs_pb.Data.Data",
5+
".merkledag_pb.PBNode.Data",
6+
".ipns_pb.IpnsEntry.Data",
7+
])
8+
.compile_protos(
9+
&["src/unixfs.proto", "src/merkledag.proto", "src/ipns.proto"],
10+
&["src"],
11+
)
512
.unwrap();
613
}

0 commit comments

Comments
 (0)