Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement streaming #517

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ members = [
"capnp-rpc/examples/hello-world",
"capnp-rpc/examples/calculator",
"capnp-rpc/examples/pubsub",
"capnp-rpc/examples/streaming",
"capnp-rpc/test",
"example/addressbook",
"example/addressbook_send",
Expand Down
24 changes: 24 additions & 0 deletions capnp-rpc/examples/streaming/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "streaming"
version = "0.1.0"
edition = "2021"

build = "build.rs"

[[bin]]
name = "streaming"
path = "main.rs"

[build-dependencies]
capnpc = { path = "../../../capnpc" }

[dependencies]
capnp = { path = "../../../capnp" }
futures = "0.3.0"
rand = "0.8.5"
sha2 = { version = "0.10.8" }
tokio = { version = "1.0.0", features = ["net", "rt", "macros"]}
tokio-util = { version = "0.7.4", features = ["compat"] }

[dependencies.capnp-rpc]
path = "../.."
6 changes: 6 additions & 0 deletions capnp-rpc/examples/streaming/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
capnpc::CompilerCommand::new()
.file("streaming.capnp")
.run()?;
Ok(())
}
70 changes: 70 additions & 0 deletions capnp-rpc/examples/streaming/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::streaming_capnp::receiver;
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};

use futures::AsyncReadExt;
use rand::Rng;
use sha2::{Digest, Sha256};

pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
use std::net::ToSocketAddrs;
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 5 {
println!(
"usage: {} client HOST:PORT STREAM_SIZE WINDOW_SIZE",
args[0]
);
return Ok(());
}

let stream_size: usize = str::parse(&args[3]).unwrap();
let window_size: usize = str::parse(&args[4]).unwrap();

let addr = args[2]
.to_socket_addrs()?
.next()
.expect("could not parse address");

tokio::task::LocalSet::new()
.run_until(async move {
let stream = tokio::net::TcpStream::connect(&addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let mut rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
rpc_network.set_window_size(window_size);
let mut rpc_system = RpcSystem::new(rpc_network, None);
let receiver: receiver::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
tokio::task::spawn_local(rpc_system);

let capnp::capability::RemotePromise { promise, pipeline } =
receiver.write_stream_request().send();

let mut rng = rand::thread_rng();
let mut hasher = Sha256::new();
let bytestream = pipeline.get_stream();
let mut bytes_written: u32 = 0;
const CHUNK_SIZE: u32 = 4096;
while bytes_written < stream_size as u32 {
let mut request = bytestream.write_request();
let body = request.get();
let buf = body.init_bytes(CHUNK_SIZE);
rng.fill(buf);
hasher.update(buf);
request.send().await?;
bytes_written += CHUNK_SIZE;
}
bytestream.end_request().send().promise.await?;
let response = promise.await?;

let sha256 = response.get()?.get_sha256()?;
let local_sha256 = hasher.finalize();
assert_eq!(sha256, &local_sha256[..]);
Ok(())
})
.await
}
21 changes: 21 additions & 0 deletions capnp-rpc/examples/streaming/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pub mod streaming_capnp {
include!(concat!(env!("OUT_DIR"), "/streaming_capnp.rs"));
}

pub mod client;
pub mod server;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = ::std::env::args().collect();
if args.len() >= 2 {
match &args[1][..] {
"client" => return client::main().await,
"server" => return server::main().await,
_ => (),
}
}

println!("usage: {} [client | server] ADDRESS", args[0]);
Ok(())
}
112 changes: 112 additions & 0 deletions capnp-rpc/examples/streaming/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::net::ToSocketAddrs;

use crate::streaming_capnp::{byte_stream, receiver};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};

use capnp::capability::Promise;
use capnp::Error;

use futures::channel::oneshot;
use futures::AsyncReadExt;
use sha2::{Digest, Sha256};

struct ByteStreamImpl {
hasher: Sha256,
hash_sender: Option<oneshot::Sender<Vec<u8>>>,
}

impl ByteStreamImpl {
fn new(hash_sender: oneshot::Sender<Vec<u8>>) -> Self {
Self {
hasher: Sha256::new(),
hash_sender: Some(hash_sender),
}
}
}

impl byte_stream::Server for ByteStreamImpl {
fn write(&mut self, params: byte_stream::WriteParams) -> Promise<(), Error> {
let bytes = pry!(pry!(params.get()).get_bytes());
self.hasher.update(bytes);
Promise::ok(())
}

fn end(
&mut self,
_params: byte_stream::EndParams,
_results: byte_stream::EndResults,
) -> Promise<(), Error> {
let hasher = std::mem::take(&mut self.hasher);
if let Some(sender) = self.hash_sender.take() {
let _ = sender.send(hasher.finalize()[..].to_vec());
}
Promise::ok(())
}
}

struct ReceiverImpl {}

impl ReceiverImpl {
fn new() -> Self {
Self {}
}
}

impl receiver::Server for ReceiverImpl {
fn write_stream(
&mut self,
_params: receiver::WriteStreamParams,
mut results: receiver::WriteStreamResults,
) -> Promise<(), Error> {
let (snd, rcv) = oneshot::channel();
let client: byte_stream::Client = capnp_rpc::new_client(ByteStreamImpl::new(snd));
results.get().set_stream(client);
pry!(results.set_pipeline());
Promise::from_future(async move {
match rcv.await {
Ok(v) => {
results.get().set_sha256(&v[..]);
Ok(())
}
Err(_) => Err(Error::failed("failed to get hash".into())),
}
})
}
}

pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 3 {
println!("usage: {} server ADDRESS[:PORT]", args[0]);
return Ok(());
}

let addr = args[2]
.to_socket_addrs()?
.next()
.expect("could not parse address");

tokio::task::LocalSet::new()
.run_until(async move {
let listener = tokio::net::TcpListener::bind(&addr).await?;
let client: receiver::Client = capnp_rpc::new_client(ReceiverImpl::new());

loop {
let (stream, _) = listener.accept().await?;
stream.set_nodelay(true)?;
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let network = twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Server,
Default::default(),
);

let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client));

tokio::task::spawn_local(rpc_system);
}
})
.await
}
16 changes: 16 additions & 0 deletions capnp-rpc/examples/streaming/streaming.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
@0x9fedc87e438cde81;

interface ByteStream {
write @0 (bytes :Data) -> stream;
# Writes a chunk.

end @1 ();
# Ends the stream.
}

interface Receiver {
writeStream @0 () -> (stream :ByteStream, sha256 :Data);
# Uses set_pipeline() to set up `stream` immediately.
# Actually returns when `end()` is called on that stream.
# `sha256` is the SHA256 checksum of the received data.
}
3 changes: 3 additions & 0 deletions capnp-rpc/src/broken.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ impl RequestHook for Request {
pipeline: any_pointer::Pipeline::new(Box::new(pipeline)),
}
}
fn send_streaming(self: Box<Self>) -> Promise<(), Error> {
Promise::err(self.error)
}
fn tail_send(self: Box<Self>) -> Option<(u32, Promise<(), Error>, Box<dyn PipelineHook>)> {
None
}
Expand Down
Loading
Loading