Skip to content

Commit 176715a

Browse files
refactor: extract RPC definitions into here
1 parent 16bc7fe commit 176715a

8 files changed

+2107
-36
lines changed

Cargo.lock

+80-33
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+17-2
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,21 @@ redb = { version = "2.0.0" }
5050
redb_v1 = { package = "redb", version = "1.5.1" }
5151
self_cell = "1.0.3"
5252
serde = { version = "1.0.164", features = ["derive"] }
53-
strum = { version = "0.25", features = ["derive"] }
53+
strum = { version = "0.26", features = ["derive"] }
5454
tempfile = { version = "3.4" }
5555
thiserror = "1"
5656
tokio = { version = "1", features = ["sync", "rt", "time", "macros"] }
5757
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
5858
tokio-util = { version = "0.7.12", optional = true, features = ["codec", "io-util", "io", "rt"] }
5959
tracing = "0.1"
6060

61+
# rpc
62+
nested_enum_utils = { version = "0.1.0", optional = true }
63+
quic-rpc = { version = "0.13", optional = true }
64+
quic-rpc-derive = { version = "0.13", optional = true }
65+
serde-error = { version = "0.1.3", optional = true }
66+
portable-atomic = { version = "1.9.0", optional = true }
67+
6168
[dev-dependencies]
6269
iroh-test = "0.27.0"
6370
rand_chacha = "0.3.1"
@@ -67,10 +74,18 @@ tempfile = "3.4"
6774
test-strategy = "0.3.1"
6875

6976
[features]
70-
default = ["net", "metrics", "engine"]
77+
default = ["net", "metrics", "engine", "rpc"]
7178
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"]
7279
metrics = ["iroh-metrics/metrics"]
7380
engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs", "dep:iroh-router"]
81+
rpc = [
82+
"engine",
83+
"dep:nested_enum_utils",
84+
"dep:quic-rpc",
85+
"dep:quic-rpc-derive",
86+
"dep:serde-error",
87+
"dep:portable-atomic",
88+
]
7489

7590
[package.metadata.docs.rs]
7691
all-features = true

src/engine.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use std::{
1111

1212
use anyhow::{bail, Context, Result};
1313
use futures_lite::{Stream, StreamExt};
14-
use iroh_blobs::{downloader::Downloader, store::EntryStatus, Hash};
14+
use iroh_blobs::{
15+
downloader::Downloader, store::EntryStatus, util::local_pool::LocalPoolHandle, Hash,
16+
};
1517
use iroh_gossip::net::Gossip;
1618
use iroh_net::{key::PublicKey, Endpoint, NodeAddr};
1719
use serde::{Deserialize, Serialize};
@@ -52,6 +54,7 @@ pub struct Engine {
5254
actor_handle: Arc<AbortOnDropHandle<()>>,
5355
#[debug("ContentStatusCallback")]
5456
content_status_cb: ContentStatusCallback,
57+
local_pool_handle: LocalPoolHandle,
5558
}
5659

5760
impl Engine {
@@ -66,6 +69,7 @@ impl Engine {
6669
bao_store: B,
6770
downloader: Downloader,
6871
default_author_storage: DefaultAuthorStorage,
72+
local_pool_handle: LocalPoolHandle,
6973
) -> anyhow::Result<Self> {
7074
let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
7175
let me = endpoint.node_id().fmt_short();
@@ -111,6 +115,7 @@ impl Engine {
111115
actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
112116
content_status_cb,
113117
default_author: Arc::new(default_author),
118+
local_pool_handle,
114119
})
115120
}
116121

@@ -205,6 +210,10 @@ impl Engine {
205210
reply_rx.await?;
206211
Ok(())
207212
}
213+
214+
pub(crate) fn local_pool_handle(&self) -> &LocalPoolHandle {
215+
&self.local_pool_handle
216+
}
208217
}
209218

210219
/// Converts an [`EntryStatus`] into a ['ContentStatus'].

src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ mod ticket;
4848
#[cfg(feature = "engine")]
4949
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "engine")))]
5050
pub mod engine;
51+
#[cfg(feature = "engine")]
52+
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "engine")))]
53+
pub mod rpc;
5154

5255
pub mod actor;
5356
pub mod store;

src/rpc.rs

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//! Quic RPC implemenation for docs.
2+
3+
use crate::engine::Engine;
4+
5+
pub mod client;
6+
pub mod proto;
7+
8+
mod docs_handle_request;
9+
10+
type RpcError = serde_error::Error;
11+
type RpcResult<T> = std::result::Result<T, RpcError>;
12+
13+
impl Engine {
14+
/// Handle a docs request from the RPC server.
15+
pub async fn handle_rpc_request<S: quic_rpc::Service, C: quic_rpc::ServiceEndpoint<S>>(
16+
&self,
17+
msg: crate::rpc::proto::Request,
18+
chan: quic_rpc::server::RpcChannel<crate::rpc::proto::RpcService, C, S>,
19+
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
20+
use crate::rpc::proto::Request::*;
21+
22+
let this = self.clone();
23+
match msg {
24+
Open(msg) => chan.rpc(msg, this, Self::doc_open).await,
25+
Close(msg) => chan.rpc(msg, this, Self::doc_close).await,
26+
Status(msg) => chan.rpc(msg, this, Self::doc_status).await,
27+
List(msg) => chan.server_streaming(msg, this, Self::doc_list).await,
28+
Create(msg) => chan.rpc(msg, this, Self::doc_create).await,
29+
Drop(msg) => chan.rpc(msg, this, Self::doc_drop).await,
30+
Import(msg) => chan.rpc(msg, this, Self::doc_import).await,
31+
Set(msg) => chan.rpc(msg, this, Self::doc_set).await,
32+
ImportFile(msg) => {
33+
chan.server_streaming(msg, this, Self::doc_import_file)
34+
.await
35+
}
36+
ExportFile(msg) => {
37+
chan.server_streaming(msg, this, Self::doc_export_file)
38+
.await
39+
}
40+
Del(msg) => chan.rpc(msg, this, Self::doc_del).await,
41+
SetHash(msg) => chan.rpc(msg, this, Self::doc_set_hash).await,
42+
Get(msg) => chan.server_streaming(msg, this, Self::doc_get_many).await,
43+
GetExact(msg) => chan.rpc(msg, this, Self::doc_get_exact).await,
44+
StartSync(msg) => chan.rpc(msg, this, Self::doc_start_sync).await,
45+
Leave(msg) => chan.rpc(msg, this, Self::doc_leave).await,
46+
Share(msg) => chan.rpc(msg, this, Self::doc_share).await,
47+
Subscribe(msg) => {
48+
chan.try_server_streaming(msg, this, Self::doc_subscribe)
49+
.await
50+
}
51+
SetDownloadPolicy(msg) => chan.rpc(msg, this, Self::doc_set_download_policy).await,
52+
GetDownloadPolicy(msg) => chan.rpc(msg, this, Self::doc_get_download_policy).await,
53+
GetSyncPeers(msg) => chan.rpc(msg, this, Self::doc_get_sync_peers).await,
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)