Skip to content

Commit

Permalink
feat: add local/remote sub cache to publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Dec 3, 2024
1 parent 549bc7b commit 5455ecc
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 104 deletions.
155 changes: 155 additions & 0 deletions examples/examples/z_local_pub_sub_thr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use std::{convert::TryInto, time::Instant};

use clap::Parser;
use zenoh::{
bytes::ZBytes,
qos::{CongestionControl, Priority},
Wait,
};
use zenoh_examples::CommonArgs;

struct Stats {
round_count: usize,
round_size: usize,
finished_rounds: usize,
round_start: Instant,
global_start: Option<Instant>,
}
impl Stats {
fn new(round_size: usize) -> Self {
Stats {
round_count: 0,
round_size,
finished_rounds: 0,
round_start: Instant::now(),
global_start: None,
}
}
fn increment(&mut self) {
if self.round_count == 0 {
self.round_start = Instant::now();
if self.global_start.is_none() {
self.global_start = Some(self.round_start)
}
self.round_count += 1;
} else if self.round_count < self.round_size {
self.round_count += 1;
} else {
self.print_round();
self.finished_rounds += 1;
self.round_count = 0;
}
}
fn print_round(&self) {
let elapsed = self.round_start.elapsed().as_secs_f64();
let throughput = (self.round_size as f64) / elapsed;
println!("{throughput} msg/s");
}
}
impl Drop for Stats {
fn drop(&mut self) {
let Some(global_start) = self.global_start else {
return;
};
let elapsed = global_start.elapsed().as_secs_f64();
let total = self.round_size * self.finished_rounds + self.round_count;
let throughput = total as f64 / elapsed;
println!("Received {total} messages over {elapsed:.2}s: {throughput}msg/s");
}
}

fn main() {
// initiate logging
zenoh::init_log_from_env_or("error");
let args = Args::parse();

let session = zenoh::open(args.common).wait().unwrap();

let key_expr = "test/thr";

let mut stats = Stats::new(args.number);
session
.declare_subscriber(key_expr)
.callback_mut(move |_sample| {
stats.increment();
if stats.finished_rounds >= args.samples {
std::process::exit(0)
}
})
.background()
.wait()
.unwrap();

let mut prio = Priority::DEFAULT;
if let Some(p) = args.priority {
prio = p.try_into().unwrap();
}

let publisher = session
.declare_publisher(key_expr)
.congestion_control(CongestionControl::Block)
.priority(prio)
.express(args.express)
.wait()
.unwrap();

println!("Press CTRL-C to quit...");
let payload_size = args.payload_size;
let data: ZBytes = (0..payload_size)
.map(|i| (i % 10) as u8)
.collect::<Vec<u8>>()
.into();
let mut count: usize = 0;
let mut start = std::time::Instant::now();
loop {
publisher.put(data.clone()).wait().unwrap();

if args.print {
if count < args.number {
count += 1;
} else {
let thpt = count as f64 / start.elapsed().as_secs_f64();
println!("{thpt} msg/s");
count = 0;
start = std::time::Instant::now();
}
}
}
}

#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "10")]
/// Number of throughput measurements.
samples: usize,
/// express for sending data
#[arg(long, default_value = "false")]
express: bool,
/// Priority for sending data
#[arg(short, long)]
priority: Option<u8>,
/// Print the statistics
#[arg(short = 't', long)]
print: bool,
/// Number of messages in each throughput measurements
#[arg(short, long, default_value = "10000000")]
number: usize,
/// Sets the size of the payload to publish
payload_size: usize,
#[command(flatten)]
common: CommonArgs,
}
11 changes: 10 additions & 1 deletion zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::future::{IntoFuture, Ready};
use std::{
future::{IntoFuture, Ready},
sync::atomic::AtomicU64,
};

use zenoh_core::{Resolvable, Result as ZResult, Wait};
use zenoh_protocol::core::CongestionControl;
Expand Down Expand Up @@ -206,6 +209,7 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.session.0.resolve_put(
None,
&self.publisher.key_expr?,
self.kind.payload,
SampleKind::Put,
Expand All @@ -228,6 +232,7 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDel
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.session.0.resolve_put(
None,
&self.publisher.key_expr?,
ZBytes::new(),
SampleKind::Delete,
Expand Down Expand Up @@ -383,6 +388,8 @@ impl Wait for PublisherBuilder<'_, '_> {
.declare_publisher_inner(key_expr.clone(), self.destination)?;
Ok(Publisher {
session: self.session.downgrade(),
// TODO use constants here
cache: AtomicU64::new(0b11),
id,
key_expr,
encoding: self.encoding,
Expand Down Expand Up @@ -411,6 +418,7 @@ impl IntoFuture for PublisherBuilder<'_, '_> {
impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.session.resolve_put(
Some(&self.publisher.cache),
&self.publisher.key_expr,
self.kind.payload,
SampleKind::Put,
Expand All @@ -432,6 +440,7 @@ impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.session.resolve_put(
Some(&self.publisher.cache),
&self.publisher.key_expr,
ZBytes::new(),
SampleKind::Delete,
Expand Down
3 changes: 3 additions & 0 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
fmt,
future::{IntoFuture, Ready},
pin::Pin,
sync::atomic::AtomicU64,
task::{Context, Poll},
};

Expand Down Expand Up @@ -100,6 +101,7 @@ impl fmt::Debug for PublisherState {
#[derive(Debug)]
pub struct Publisher<'a> {
pub(crate) session: WeakSession,
pub(crate) cache: AtomicU64,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) encoding: Encoding,
Expand Down Expand Up @@ -385,6 +387,7 @@ impl Sink<Sample> for Publisher<'_> {
..
} = item.into();
self.session.resolve_put(
Some(&self.cache),
&self.key_expr,
payload,
kind,
Expand Down
Loading

0 comments on commit 5455ecc

Please sign in to comment.