[Solved] ZFuture
Refactoring
#17
Replies: 7 comments 5 replies
-
On Design of
|
Beta Was this translation helpful? Give feedback.
-
Hello @jerry73204 , I've been playing around a bit with Please find attached some code that mimics in a certain way the zenoh internal architecture and how a Reporting here the implementation of struct PutBuilder {
message: Option<Message>,
queues: TargetQueues,
}
impl PutBuilder {
async fn _await(mut pb: Pin<&mut Self>) -> bool {
let message = pb.message.take().unwrap();
match &mut pb.queues {
TargetQueues::None => false,
TargetQueues::One(q) => q.push(message).await,
TargetQueues::Multiple(ref mut qq) => {
let mut res = true;
for q in qq.drain(..) {
res &= q.push(message.clone()).await;
}
res
}
}
}
pub fn wait(self) -> bool {
task::block_on(self)
}
}
impl Future for PutBuilder {
type Output = bool;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Future::poll(
unsafe { Pin::new_unchecked(&mut Self::_await(self)) }.as_mut(),
cx,
)
}
}
#[async_std::main]
async fn main() {
// Stripping out the initialisation code...
session.put(message.clone()).await;
session.put(message.clone()).wait();
} My only concern about this approach is about the use of What's your opinion on this? |
Beta Was this translation helpful? Give feedback.
-
In your Besides, the unsafe of use pin_project::pin_project;
use futures::ready;
#[pin-project]
struct SleepThenSend {
is_sleeping: bool, // mapped to &mut bool
#[pin]
sleep: Sleep, // mapped to Pin<&mut Sleep>
#[pin]
send: Send, // mapped to Pin<&mut Sleep>
}
impl Future for SleepThenSend {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
if me.is_sleeping() {
ready!(me.sleep.poll(cx));
me.is_sleeping = false;
} else {
ready!(me.send().poll(cx));
}
}
} |
Beta Was this translation helpful? Give feedback.
-
Let me summarize the colors of functions and conversions among them. Here is the table of function colors. The sync/async means whether the function call returns a future (or
Conversion RulesThe first rule is that it's fine to directly call X in Y, but only calling The second rule is that calling a blocking async/sync function causes the caller to be blocked. Based on this rule, we have the following conversions. In the arrow A -> B, A is callee and B is caller. The
Nonblocking ConversionsBased on the rules, this direction is fine because it's just done by wrapping a poll(). The reverse has a similar effect, but requires a runtime to do so. I would flavor SN -> AN because it only needs to provide a sync function, and let user to choose to call in sync or async way. AN => SN would make things complex. Blocking ConversionsThe blocking case is interesting because SB -> AB call is the source of chaos. In fact, the AB case should never exist in the library because it will starves the runtime resources. Though, it's not easy to stop people from calling a blocking function inside an async, either intentionally or accidentally. The right way is writing blocking ops inside a sync function, and they can only be in sync functions. To turn them async, call tokio's and async-std's provides fn blocking() {
for _ in 1e8 {
println!("lots of print");
}
}
async {
spawn_blocking(move || { blocking(); }).await;
} As far as a blocking operation cannot be avoided, it's suggested to write a sync function first. Must-be-async CasesIn addition to the arguments above suggesting to write sync functions before async counterparts, there is one scenario that we prefer writing async functions, that is mostly I/O operations. That's the place where the conversion AN => SB would happen. In this case, we would wrap the future by async fn read_large_file() -> String {
async_std::fs::read_to_string("my_large_file.txt").await.unwrap()
}
move || {
let text = block_on(read_large_file());
} A Little SummaryTo summarize, we have the first class citizens in several use cases.
and they give rise the the conversions:
Implications for ZenohLet's break down the Zenoh to see the usage of the functions:
Considering that the async I/O serve most part of Zenoh and plus several computation-bounded routing tasks, I would adopt the strategies:
|
Beta Was this translation helpful? Give feedback.
-
We're having a consensus on the public interface. I think the way of writing is concerned here. My concern is that the signature Let's focus on the problem. impl PutBuilder {
pub async fn build(self) -> bool { /* omit */ }
pub async fn reset(&mut self) {/* omit */}
} Strategy 1:
|
Beta Was this translation helpful? Give feedback.
-
Learned from your vision, I had a some misconception of Let me go on the detour about my experience. According to zenoh 0.6 doc, the usage looks like this. One configuration is done for each put operation. session
.put("/key/expression", "value")
.encoding(Encoding::TEXT_PLAIN)
.congestion_control(CongestionControl::Block)
.await
.unwrap();
let builder = session
.put_builder("/key/expression")
.encoding(Encoding::TEXT_PLAIN)
.congestion_control(CongestionControl::Block);
for value in 0..100 {
builder.put(value).await;
} Having a builder also enables us to configure the senders an receivers at once, and run them in parallel. struct Putter {
builder: PutBuilder,
}
struct Getter {
builder: RecvBuilder,
}
let putter = Putter::new(&session);
let getter = Putter::new(&getter);
futures::join!(
spawn(async { loop { putter.put().await; } }),
spawn(async { loop { getter.get().await; } }),
); If we can really have a builder, I would add impl Session {
pub fn put_builder(&self, key: impl KeyExpr) -> PutBuilder { /* omit */ }
pub async fn put(&self, key: impl KeyExpr, value: Value) -> Result<()> {
self.put_builder(key).put(value).await
}
}
// shortcut
session.put(key, value).await?;
// for experienced users
session
.put_builder(key)
.encoding(Encoding::TEXT_PLAIN)
.congestion_control(CongestionControl::Block)
.put(value)
.await; This way also avoids writing the Plus, the impl PutBuilder {
#[zfuture]
pub async fn put(&self, msg: Message) -> Result<()> { /* omit */ }
} |
Beta Was this translation helpful? Give feedback.
-
We have debated quite extensively the look-and-feel of the new API across multiple languages (see #23). The discussion on the API has taken into account has taken into account the points made in this ZFuture Refactoring discussion.
// SYNC
use zenoh::prelude::sync::*;
fn main() {
let session = zenoh::open(...).res().unwrap();
session.put("/a", vec![0u8; 64]).res().unwrap();
} // ASYNC
use zenoh::prelude::r#async::*;
async fn main() {
let session = zenoh::open(...).res().await.unwrap();
session.put("/a", vec![0u8; 64]).res().await.unwrap();
}
// SYNC
use zenoh::prelude::sync::*;
fn main() {
session.close().res().unwrap();
} // ASYNC
use zenoh::prelude::r#async::*;
async fn main() {
session.close().res().await.unwrap();
} |
Beta Was this translation helpful? Give feedback.
-
It is related to eclipse-zenoh/zenoh#244, stating that the zenoh's
ZFuture
API suffers from the issue of blocked async functions. The async API should go on a rework, in terms of improving experience of API usage, avoiding blocking issues and providing fluent C API. The initial survey to this issue is in the post below.Beta Was this translation helpful? Give feedback.
All reactions