Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The ZFuture and Runnable traits need re-design #244

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 88 additions & 63 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions commons/zenoh-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ zenoh-core = { path = "../zenoh-core/" }
async-std = { version = "=1.11.0", features = ["unstable"] }
event-listener = "2.5.1"
flume = "0.10.5"
futures-lite = "1.11.3"
futures = "0.3.21"
tokio = { version = "1.17.0", features = ["sync"] }

[dev-dependencies]
futures = "0.3.21"
async-std = { version = "=1.11.0", features = ["unstable", "attributes"] }
6 changes: 3 additions & 3 deletions commons/zenoh-sync/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ macro_rules! zreceiver{
}
}

impl$(<$( $lt ),+>)? async_std::stream::Stream for $struct_name$(<$( $lt ),+>)? {
impl$(<$( $lt ),+>)? futures::stream::Stream for $struct_name$(<$( $lt ),+>)? {
type Item = $recv_type;

#[inline(always)]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use futures_lite::StreamExt;
self.stream.poll_next(cx)
use ::futures::stream::StreamExt;
self.stream.poll_next_unpin(cx)
}
}

Expand Down
140 changes: 0 additions & 140 deletions commons/zenoh-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use futures_lite::FutureExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub mod backoff;
pub use backoff::*;
pub mod channel;
Expand All @@ -29,138 +24,3 @@ pub use signal::*;
pub fn get_mut_unchecked<T>(arc: &mut std::sync::Arc<T>) -> &mut T {
unsafe { &mut (*(std::sync::Arc::as_ptr(arc) as *mut T)) }
}

/// A future which output can be accessed synchronously or asynchronously.
pub trait ZFuture: Future + Send {
/// Synchronously waits for the output of this future.
fn wait(self) -> Self::Output;
}

/// Creates a [`ZFuture`] that is immediately ready with a value.
///
/// This `struct` is created by [`zready()`]. See its
/// documentation for more.
#[derive(Debug, Clone)]
#[must_use = "ZFutures do nothing unless you `.wait()`, `.await` or poll them"]
pub struct ZReady<T>(Option<T>);

impl<T> Unpin for ZReady<T> {}

impl<T> Future for ZReady<T> {
type Output = T;

#[inline]
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
Poll::Ready(self.0.take().expect("Ready polled after completion"))
}
}

impl<T> ZFuture for ZReady<T>
where
T: Unpin + Send,
{
#[inline]
fn wait(self) -> T {
self.0.unwrap()
}
}

/// Creates a [`ZFuture`] that is immediately ready with a value.
///
/// ZFutures created through this function are functionally similar to those
/// created through `async {}`. The main difference is that futures created
/// through this function are named and implement `Unpin` and `ZFuture`.
///
/// # Examples
///
/// ```
/// use zenoh_sync::zready;
///
/// # async fn run() {
/// let a = zready(1);
/// assert_eq!(a.await, 1);
/// # }
/// ```
#[inline]
pub fn zready<T>(t: T) -> ZReady<T> {
ZReady(Some(t))
}

#[macro_export]
macro_rules! zready_try {
($val:expr) => {
zenoh_sync::zready({
let f = || $val;
f()
})
};
}

/// An alias for `Pin<Box<dyn Future<Output = T> + Send>>`.
#[must_use = "ZFutures do nothing unless you `.wait()`, `.await` or poll them"]
pub struct ZPinBoxFuture<T>(Pin<Box<dyn Future<Output = T> + Send>>);

impl<T> Future for ZPinBoxFuture<T> {
type Output = T;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll(cx)
}
}

impl<T> ZFuture for ZPinBoxFuture<T> {
#[inline]
fn wait(self) -> T {
async_std::task::block_on(self.0)
}
}

#[inline]
pub fn zpinbox<T>(fut: impl Future<Output = T> + Send + 'static) -> ZPinBoxFuture<T> {
ZPinBoxFuture(Box::pin(fut))
}

pub trait Runnable {
type Output;

fn run(&mut self) -> Self::Output;
}

#[macro_export]
macro_rules! derive_zfuture{
(
$(#[$meta:meta])*
$vis:vis struct $struct_name:ident$(<$( $lt:lifetime ),+>)? {
$(
$(#[$field_meta:meta])*
$field_vis:vis $field_name:ident : $field_type:ty,
)*
}
) => {
$(#[$meta])*
#[must_use = "ZFutures do nothing unless you `.wait()`, `.await` or poll them"]
$vis struct $struct_name$(<$( $lt ),+>)? {
$(
$(#[$field_meta:meta])*
$field_vis $field_name : $field_type,
)*
}

impl$(<$( $lt ),+>)? futures_lite::Future for $struct_name$(<$( $lt ),+>)? {
type Output = <Self as Runnable>::Output;

#[inline]
fn poll(mut self: std::pin::Pin<&mut Self>, _cx: &mut async_std::task::Context<'_>) -> std::task::Poll<<Self as futures_lite::Future>::Output> {
std::task::Poll::Ready(self.run())
}
}

impl$(<$( $lt ),+>)? zenoh_sync::ZFuture for $struct_name$(<$( $lt ),+>)? {
#[inline]
fn wait(mut self) -> Self::Output {
self.run()
}
}
}
}
7 changes: 6 additions & 1 deletion examples/examples/z_eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ async fn main() {
let session = zenoh::open(config).await.unwrap();

println!("Creating Queryable on '{}'...", key_expr);
let mut queryable = session.queryable(&key_expr).kind(EVAL).await.unwrap();
let mut queryable = session
.queryable(&key_expr)
.kind(EVAL)
.build()
.await
.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() {
let session = zenoh::open(config).await.unwrap();

println!("Creating Subscriber on '{}'...", key_expr);
let mut subscriber = session.subscribe(&key_expr).await.unwrap();
let mut subscriber = session.subscribe(&key_expr).build().await.unwrap();
println!("Creating Publisher on '{}'...", forward);
let publisher = session.publish(&forward).await.unwrap();
println!("Forwarding data from '{}' to '{}'...", key_expr, forward);
Expand Down
15 changes: 8 additions & 7 deletions examples/examples/z_ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ use zenoh::config::Config;
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;

fn main() {
#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();

let (config, size, n) = parse_args();
let session = zenoh::open(config).wait().unwrap();
let session = zenoh::open(config).await.unwrap();

// The key expression to publish data on
let key_expr_ping = session.declare_expr("/test/ping").wait().unwrap();
let key_expr_ping = session.declare_expr("/test/ping").await.unwrap();

// The key expression to wait the response back
let key_expr_pong = session.declare_expr("/test/pong").wait().unwrap();
let key_expr_pong = session.declare_expr("/test/pong").await.unwrap();

let sub = session.subscribe(&key_expr_pong).wait().unwrap();
let sub = session.subscribe(&key_expr_pong).build().await.unwrap();

let data: Value = (0usize..size)
.map(|i| (i % 10) as u8)
Expand All @@ -47,7 +48,7 @@ fn main() {
.put(&key_expr_ping, data)
// Make sure to not drop messages because of congestion control
.congestion_control(CongestionControl::Block)
.wait()
.await
.unwrap();

let _ = sub.recv();
Expand All @@ -60,7 +61,7 @@ fn main() {
.put(&key_expr_ping, data)
// Make sure to not drop messages because of congestion control
.congestion_control(CongestionControl::Block)
.wait()
.await
.unwrap();

let _ = sub.recv();
Expand Down
13 changes: 7 additions & 6 deletions examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,29 @@ use zenoh::config::Config;
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;

fn main() {
#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();

let config = parse_args();

let session = zenoh::open(config).wait().unwrap();
let session = zenoh::open(config).await.unwrap();

// The key expression to read the data from
let key_expr_ping = session.declare_expr("/test/ping").wait().unwrap();
let key_expr_ping = session.declare_expr("/test/ping").await.unwrap();

// The key expression to echo the data back
let key_expr_pong = session.declare_expr("/test/pong").wait().unwrap();
let key_expr_pong = session.declare_expr("/test/pong").await.unwrap();

let sub = session.subscribe(&key_expr_ping).wait().unwrap();
let sub = session.subscribe(&key_expr_ping).build().await.unwrap();

while let Ok(sample) = sub.recv() {
session
.put(&key_expr_pong, sample.value)
// Make sure to not drop messages because of congestion control
.congestion_control(CongestionControl::Block)
.wait()
.await
.unwrap();
}
}
Expand Down
9 changes: 5 additions & 4 deletions examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use zenoh::config::Config;
use zenoh::prelude::*;
use zenoh::publication::CongestionControl;

fn main() {
#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();
let (config, size, prio, print, number) = parse_args();
Expand All @@ -27,9 +28,9 @@ fn main() {
.collect::<Vec<u8>>()
.into();

let session = zenoh::open(config).wait().unwrap();
let session = zenoh::open(config).await.unwrap();

let key_expr = session.declare_expr("/test/thr").wait().unwrap();
let key_expr = session.declare_expr("/test/thr").await.unwrap();

let mut count: usize = 0;
let mut start = std::time::Instant::now();
Expand All @@ -40,7 +41,7 @@ fn main() {
.congestion_control(CongestionControl::Block)
// Set the right priority
.priority(prio)
.wait()
.await
.unwrap();

if print {
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async fn main() {
let mut subscriber = session
.subscribe(&key_expr)
.mode(SubMode::Pull)
.build()
.await
.unwrap();

Expand Down
9 changes: 7 additions & 2 deletions examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ async fn main() {
let session = zenoh::open(config).await.unwrap();

println!("Creating Subscriber on '{}'...", key_expr);
let mut subscriber = session.subscribe(&key_expr).await.unwrap();
let mut subscriber = session.subscribe(&key_expr).build().await.unwrap();

println!("Creating Queryable on '{}'...", key_expr);
let mut queryable = session.queryable(&key_expr).kind(STORAGE).await.unwrap();
let mut queryable = session
.queryable(&key_expr)
.kind(STORAGE)
.build()
.await
.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() {

println!("Creating Subscriber on '{}'...", key_expr);

let mut subscriber = session.subscribe(&key_expr).await.unwrap();
let mut subscriber = session.subscribe(&key_expr).build().await.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
Expand Down
11 changes: 6 additions & 5 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@
use clap::{App, Arg};
use std::time::Instant;
use zenoh::config::Config;
use zenoh::prelude::*;

fn main() {
#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();

let (config, m, n) = parse_args();

let session = zenoh::open(config).wait().unwrap();
let session = zenoh::open(config).await.unwrap();

let key_expr = session.declare_expr("/test/thr").wait().unwrap();
let key_expr = session.declare_expr("/test/thr").await.unwrap();

let mut count = 0;
let mut start = Instant::now();
Expand All @@ -47,7 +47,8 @@ fn main() {
}
}
})
.wait()
.build()
.await
.unwrap();

// Stop forever
Expand Down
9 changes: 7 additions & 2 deletions plugins/example-plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,15 @@ async fn run(runtime: Runtime, selector: KeyExpr<'_>, flag: Arc<AtomicBool>) {
debug!("Run example-plugin with storage-selector={}", selector);

debug!("Create Subscriber on {}", selector);
let mut sub = session.subscribe(&selector).await.unwrap();
let mut sub = session.subscribe(&selector).build().await.unwrap();

debug!("Create Queryable on {}", selector);
let mut queryable = session.queryable(&selector).kind(STORAGE).await.unwrap();
let mut queryable = session
.queryable(&selector)
.kind(STORAGE)
.build()
.await
.unwrap();

while flag.load(Relaxed) {
select!(
Expand Down
Loading