Skip to content

Commit 283d457

Browse files
authored
refactor: Add associated types to Service, use protocol enum as Service (#46)
This PR simplifies working with `irpc`: * Instead of defining a separate `FooService` struct, the `Service` trait is implemented on the protocol enum. One type less to create. * Added an associated types to the `Service` trait to point to the extended message enum (the one where the variants contain the `WithChannels` structs). By doing this, we can reduce the generics on the `Client` from 3 to 1, and on the `LocalSender` from 2 to 1. I think this is a net benefit for all users. * The service trait is now implemented by the proc macro if the `message` argument is provided. * A new trait `RemoteService: Service` has a required method to create a a`Service::Message` from the protocol enum, and a provided method to create a `Handler` for use with the listen function. The `RemoteService` impl is generated by the proc macro. This saves the tedious manual impl of mapping from `msg, rx, tx` to the message enum (see the diff in the examples). * Added `no_rpc` and `no_spans` arguments to the macro to omit generating code that depends on the `rpc` or `spans` features of `irpc` * Expanded and improved the documentation of the `rpc_requests` macro. Also moved it from `irpc_derive` to `irpc` to be able to add doc links to items from `irpc`. First and foremost, have a look at the diff to the examples.
1 parent 5cc6248 commit 283d457

File tree

15 files changed

+678
-409
lines changed

15 files changed

+678
-409
lines changed

.github/workflows/ci.yml

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,20 @@ jobs:
5757
toolchain: ${{ matrix.channel }}
5858
targets: ${{ matrix.target.toolchain }}
5959
- uses: swatinem/rust-cache@v2
60-
- name: cargo test (all features)
60+
- name: cargo test (workspace, all features)
6161
run: cargo test --locked --workspace --all-features --bins --tests --examples
62-
- name: cargo test (default features)
62+
- name: cargo test (workspace, default features)
6363
run: cargo test --locked --workspace --bins --tests --examples
64-
- name: cargo test (no default features)
64+
- name: cargo test (workspace, no default features)
6565
run: cargo test --locked --workspace --no-default-features --bins --tests --examples
66-
- name: cargo check (feature message_spans)
67-
run: cargo check --no-default-features --features message_spans
68-
- name: cargo check (feature rpc)
69-
run: cargo check --no-default-features --features rpc
66+
- name: cargo check (irpc, no default features)
67+
run: cargo check --locked --no-default-features --bins --tests --examples
68+
- name: cargo check (irpc, feature derive)
69+
run: cargo check --locked --no-default-features --features derive --bins --tests --examples
70+
- name: cargo check (irpc, feature spans)
71+
run: cargo check --locked --no-default-features --features spans --bins --tests --examples
72+
- name: cargo check (irpc, feature rpc)
73+
run: cargo check --locked --no-default-features --features rpc --bins --tests --examples
7074

7175
test-release:
7276
runs-on: ${{ matrix.target.os }}

Cargo.toml

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,26 @@ rpc = ["dep:quinn", "dep:postcard", "dep:anyhow", "dep:smallvec", "dep:tracing",
6565
# add test utilities
6666
quinn_endpoint_setup = ["rpc", "dep:rustls", "dep:rcgen", "dep:anyhow", "dep:futures-buffered", "quinn/rustls-ring"]
6767
# pick up parent span when creating channel messages
68-
message_spans = ["dep:tracing"]
68+
spans = ["dep:tracing"]
6969
stream = ["dep:futures-util"]
7070
derive = ["dep:irpc-derive"]
71-
default = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"]
71+
default = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"]
72+
73+
[[example]]
74+
name = "derive"
75+
required-features = ["rpc", "derive", "quinn_endpoint_setup"]
76+
77+
[[example]]
78+
name = "compute"
79+
required-features = ["rpc", "derive", "quinn_endpoint_setup"]
80+
81+
[[example]]
82+
name = "local"
83+
required-features = ["derive"]
84+
85+
[[example]]
86+
name = "storage"
87+
required-features = ["rpc", "quinn_endpoint_setup"]
7288

7389
[workspace]
7490
members = ["irpc-derive", "irpc-iroh"]
@@ -84,7 +100,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(quicrpc_docsrs)"] }
84100
anyhow = { version = "1.0.98" }
85101
tokio = { version = "1.44", default-features = false }
86102
postcard = { version = "1.1.1", default-features = false }
87-
serde = { version = "1", default-features = false }
103+
serde = { version = "1", default-features = false, features = ["derive"] }
88104
tracing = { version = "0.1.41", default-features = false }
89105
n0-future = { version = "0.1.2", default-features = false }
90106
tracing-subscriber = { version = "0.3.19" }

examples/compute.rs

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
use std::{
22
io::{self, Write},
33
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
4-
sync::Arc,
54
};
65

76
use anyhow::bail;
87
use futures_buffered::BufferedStreamExt;
98
use irpc::{
109
channel::{mpsc, oneshot},
11-
rpc::{listen, Handler},
10+
rpc::{listen, RemoteService},
1211
rpc_requests,
1312
util::{make_client_endpoint, make_server_endpoint},
14-
Client, LocalSender, Request, Service, WithChannels,
13+
Client, Request, WithChannels,
1514
};
1615
use n0_future::{
1716
stream::StreamExt,
@@ -21,11 +20,19 @@ use serde::{Deserialize, Serialize};
2120
use thousands::Separable;
2221
use tracing::trace;
2322

24-
// Define the ComputeService
25-
#[derive(Debug, Clone, Copy)]
26-
struct ComputeService;
27-
28-
impl Service for ComputeService {}
23+
// Define the protocol and message enums using the macro
24+
#[rpc_requests(message = ComputeMessage)]
25+
#[derive(Serialize, Deserialize, Debug)]
26+
enum ComputeProtocol {
27+
#[rpc(tx=oneshot::Sender<u128>)]
28+
Sqr(Sqr),
29+
#[rpc(rx=mpsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
30+
Sum(Sum),
31+
#[rpc(tx=mpsc::Sender<u64>)]
32+
Fibonacci(Fibonacci),
33+
#[rpc(rx=mpsc::Receiver<u64>, tx=mpsc::Sender<u64>)]
34+
Multiply(Multiply),
35+
}
2936

3037
// Define ComputeRequest sub-messages
3138
#[derive(Debug, Serialize, Deserialize)]
@@ -55,20 +62,6 @@ enum ComputeRequest {
5562
Multiply(Multiply),
5663
}
5764

58-
// Define the protocol and message enums using the macro
59-
#[rpc_requests(ComputeService, message = ComputeMessage)]
60-
#[derive(Serialize, Deserialize)]
61-
enum ComputeProtocol {
62-
#[rpc(tx=oneshot::Sender<u128>)]
63-
Sqr(Sqr),
64-
#[rpc(rx=mpsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
65-
Sum(Sum),
66-
#[rpc(tx=mpsc::Sender<u64>)]
67-
Fibonacci(Fibonacci),
68-
#[rpc(rx=mpsc::Receiver<u64>, tx=mpsc::Sender<u64>)]
69-
Multiply(Multiply),
70-
}
71-
7265
// The actor that processes requests
7366
struct ComputeActor {
7467
recv: tokio::sync::mpsc::Receiver<ComputeMessage>,
@@ -79,9 +72,8 @@ impl ComputeActor {
7972
let (tx, rx) = tokio::sync::mpsc::channel(128);
8073
let actor = Self { recv: rx };
8174
n0_future::task::spawn(actor.run());
82-
let local = LocalSender::<ComputeMessage, ComputeService>::from(tx);
8375
ComputeApi {
84-
inner: local.into(),
76+
inner: Client::local(tx),
8577
}
8678
}
8779

@@ -157,7 +149,7 @@ impl ComputeActor {
157149
// The API for interacting with the ComputeService
158150
#[derive(Clone)]
159151
struct ComputeApi {
160-
inner: Client<ComputeMessage, ComputeProtocol, ComputeService>,
152+
inner: Client<ComputeProtocol>,
161153
}
162154

163155
impl ComputeApi {
@@ -168,18 +160,10 @@ impl ComputeApi {
168160
}
169161

170162
pub fn listen(&self, endpoint: quinn::Endpoint) -> anyhow::Result<AbortOnDropHandle<()>> {
171-
let Some(local) = self.inner.local() else {
163+
let Some(local) = self.inner.as_local() else {
172164
bail!("cannot listen on a remote service");
173165
};
174-
let handler: Handler<ComputeProtocol> = Arc::new(move |msg, rx, tx| {
175-
let local = local.clone();
176-
Box::pin(match msg {
177-
ComputeProtocol::Sqr(msg) => local.send((msg, tx)),
178-
ComputeProtocol::Sum(msg) => local.send((msg, tx, rx)),
179-
ComputeProtocol::Fibonacci(msg) => local.send((msg, tx)),
180-
ComputeProtocol::Multiply(msg) => local.send((msg, tx, rx)),
181-
})
182-
});
166+
let handler = ComputeProtocol::remote_handler(local);
183167
Ok(AbortOnDropHandle::new(task::spawn(listen(
184168
endpoint, handler,
185169
))))

examples/derive.rs

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,21 @@
11
use std::{
22
collections::BTreeMap,
33
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
4-
sync::Arc,
54
};
65

76
use anyhow::{Context, Result};
87
use irpc::{
98
channel::{mpsc, oneshot},
10-
rpc::Handler,
9+
rpc::RemoteService,
1110
rpc_requests,
1211
util::{make_client_endpoint, make_server_endpoint},
13-
Client, LocalSender, Service, WithChannels,
12+
Client, WithChannels,
1413
};
1514
// Import the macro
1615
use n0_future::task::{self, AbortOnDropHandle};
1716
use serde::{Deserialize, Serialize};
1817
use tracing::info;
1918

20-
/// A simple storage service, just to try it out
21-
#[derive(Debug, Clone, Copy)]
22-
struct StorageService;
23-
24-
impl Service for StorageService {}
25-
2619
#[derive(Debug, Serialize, Deserialize)]
2720
struct Get {
2821
key: String,
@@ -48,8 +41,8 @@ struct SetMany;
4841

4942
// Use the macro to generate both the StorageProtocol and StorageMessage enums
5043
// plus implement Channels for each type
51-
#[rpc_requests(StorageService, message = StorageMessage)]
52-
#[derive(Serialize, Deserialize)]
44+
#[rpc_requests(message = StorageMessage)]
45+
#[derive(Serialize, Deserialize, Debug)]
5346
enum StorageProtocol {
5447
#[rpc(tx=oneshot::Sender<Option<String>>)]
5548
Get(Get),
@@ -74,9 +67,8 @@ impl StorageActor {
7467
state: BTreeMap::new(),
7568
};
7669
n0_future::task::spawn(actor.run());
77-
let local = LocalSender::<StorageMessage, StorageService>::from(tx);
7870
StorageApi {
79-
inner: local.into(),
71+
inner: Client::local(tx),
8072
}
8173
}
8274

@@ -123,7 +115,7 @@ impl StorageActor {
123115
}
124116

125117
struct StorageApi {
126-
inner: Client<StorageMessage, StorageProtocol, StorageService>,
118+
inner: Client<StorageProtocol>,
127119
}
128120

129121
impl StorageApi {
@@ -134,17 +126,14 @@ impl StorageApi {
134126
}
135127

136128
pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
137-
let local = self.inner.local().context("cannot listen on remote API")?;
138-
let handler: Handler<StorageProtocol> = Arc::new(move |msg, rx, tx| {
139-
let local = local.clone();
140-
Box::pin(match msg {
141-
StorageProtocol::Get(msg) => local.send((msg, tx)),
142-
StorageProtocol::Set(msg) => local.send((msg, tx)),
143-
StorageProtocol::SetMany(msg) => local.send((msg, tx, rx)),
144-
StorageProtocol::List(msg) => local.send((msg, tx)),
145-
})
146-
});
147-
let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler));
129+
let local = self
130+
.inner
131+
.as_local()
132+
.context("cannot listen on remote API")?;
133+
let join_handle = task::spawn(irpc::rpc::listen(
134+
endpoint,
135+
StorageProtocol::remote_handler(local),
136+
));
148137
Ok(AbortOnDropHandle::new(join_handle))
149138
}
150139

examples/local.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
//! This demonstrates using irpc with the derive macro but without the rpc feature
2+
//! for local-only use. Run with:
3+
//! ```
4+
//! cargo run --example local --no-default-features --features derive
5+
//! ```
6+
7+
use std::collections::BTreeMap;
8+
9+
use irpc::{channel::oneshot, rpc_requests, Client, WithChannels};
10+
use serde::{Deserialize, Serialize};
11+
12+
#[derive(Debug, Serialize, Deserialize)]
13+
struct Get {
14+
key: String,
15+
}
16+
17+
#[derive(Debug, Serialize, Deserialize)]
18+
struct List;
19+
20+
#[derive(Debug, Serialize, Deserialize)]
21+
struct Set {
22+
key: String,
23+
value: String,
24+
}
25+
26+
impl From<(String, String)> for Set {
27+
fn from((key, value): (String, String)) -> Self {
28+
Self { key, value }
29+
}
30+
}
31+
32+
#[derive(Debug, Serialize, Deserialize)]
33+
struct SetMany;
34+
35+
#[rpc_requests(message = StorageMessage, no_rpc, no_spans)]
36+
#[derive(Serialize, Deserialize, Debug)]
37+
enum StorageProtocol {
38+
#[rpc(tx=oneshot::Sender<Option<String>>)]
39+
Get(Get),
40+
#[rpc(tx=oneshot::Sender<()>)]
41+
Set(Set),
42+
}
43+
44+
struct Actor {
45+
recv: tokio::sync::mpsc::Receiver<StorageMessage>,
46+
state: BTreeMap<String, String>,
47+
}
48+
49+
impl Actor {
50+
async fn run(mut self) {
51+
while let Some(msg) = self.recv.recv().await {
52+
self.handle(msg).await;
53+
}
54+
}
55+
56+
async fn handle(&mut self, msg: StorageMessage) {
57+
match msg {
58+
StorageMessage::Get(get) => {
59+
let WithChannels { tx, inner, .. } = get;
60+
tx.send(self.state.get(&inner.key).cloned()).await.ok();
61+
}
62+
StorageMessage::Set(set) => {
63+
let WithChannels { tx, inner, .. } = set;
64+
self.state.insert(inner.key, inner.value);
65+
tx.send(()).await.ok();
66+
}
67+
}
68+
}
69+
}
70+
71+
struct StorageApi {
72+
inner: Client<StorageProtocol>,
73+
}
74+
75+
impl StorageApi {
76+
pub fn spawn() -> StorageApi {
77+
let (tx, rx) = tokio::sync::mpsc::channel(1);
78+
let actor = Actor {
79+
recv: rx,
80+
state: BTreeMap::new(),
81+
};
82+
n0_future::task::spawn(actor.run());
83+
StorageApi {
84+
inner: Client::local(tx),
85+
}
86+
}
87+
88+
pub async fn get(&self, key: String) -> irpc::Result<Option<String>> {
89+
self.inner.rpc(Get { key }).await
90+
}
91+
92+
pub async fn set(&self, key: String, value: String) -> irpc::Result<()> {
93+
self.inner.rpc(Set { key, value }).await
94+
}
95+
}
96+
97+
#[tokio::main]
98+
async fn main() -> irpc::Result<()> {
99+
tracing_subscriber::fmt::init();
100+
let api = StorageApi::spawn();
101+
api.set("hello".to_string(), "world".to_string()).await?;
102+
let value = api.get("hello".to_string()).await?;
103+
println!("get: hello = {value:?}");
104+
Ok(())
105+
}

0 commit comments

Comments
 (0)