Skip to content

Commit

Permalink
Add QoS configuration overwrites for publication messages (#1613)
Browse files Browse the repository at this point in the history
* Initial implementation of publisher builder config overwrites

* Add publisher builder overwrite test

* Remove unnecessary return

* Fix default config spacing

* Change config format to publishers/default_builders

* Update QoS test config

* Move new PublisherBuilder logic

* Log warning when multiple builder configs can apply for the same keyexpr

* Rename builders module to publishers

* Change config format to qos/put

* Rename publishers module to qos

* Get QoS config from runtime instead of drilling through session API

* Update QoS config test

* Remove unnecessary lifetime

* Overwrite PublisherBuilder API calls with config

* Update QoS config test

* Remove encoding from QoS config

* Allow API to change non-overwritten QoS config

* Rename config qos/put to qos/publication

* Enforce 1-to-1 mapping between config and zenoh enums at compile time

* Add session API tests

* Fix case-sensitive parameter

* Mark destination QoS config as unstable

* Apply QoS overwrites at resolution of PublisherBuilder and PublicationBuilder

* Remove impl lifetimes

* Remove unicity check of keyexpr in qos config

* Add copyright header
  • Loading branch information
oteffahi authored Dec 4, 2024
1 parent b8a2979 commit b3ccf82
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,27 @@
},
},

// /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values)
// qos: {
// /// Overwrite QoS options for PUT and DELETE messages
// publication: [
// {
// /// PUT and DELETE messages on key expressions that are included by these key expressions
// /// will have their QoS options overwritten by the given config.
// key_exprs: ["demo/**", "example/key"],
// /// Configurations that will be applied on the publisher.
// /// Options that are supplied here will overwrite the configuration given in Zenoh API
// config: {
// congestion_control: "block",
// priority: "data_high",
// express: true,
// reliability: "best_effort",
// allowed_destination: "remote",
// },
// },
// ],
// },

// /// The declarations aggregation strategy.
// aggregation: {
// /// A list of key-expressions for which all included subscribers will be aggregated into.
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ description = "Internal crate for zenoh."
[features]
internal = []
transport_tcp = []
unstable = []

[dependencies]
tracing = { workspace = true }
Expand All @@ -36,6 +37,7 @@ serde_json = { workspace = true }
serde_yaml = { workspace = true }
validated_struct = { workspace = true, features = ["json5", "json_get"] }
zenoh-core = { workspace = true }
zenoh-keyexpr = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants.
pub mod defaults;
mod include;
pub mod qos;
pub mod wrappers;

#[allow(unused_imports)]
Expand All @@ -30,6 +31,7 @@ use std::{
};

use include::recursive_include;
use qos::PublisherQoSConfList;
use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
Expand Down Expand Up @@ -360,6 +362,14 @@ validated_struct::validator! {
/// A list of key-expressions for which all included publishers will be aggregated into.
publishers: Vec<OwnedKeyExpr>,
},

/// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config)
pub qos: #[derive(Default)]
QoSConfig {
/// A list of QoS configurations for PUT and DELETE messages by key expressions
publication: PublisherQoSConfList,
},

pub transport: #[derive(Default)]
TransportConf {
pub unicast: TransportUnicastConf {
Expand Down
119 changes: 119 additions & 0 deletions commons/zenoh-config/src/qos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use serde::{Deserialize, Serialize};
use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree};
use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability};

#[derive(Debug, Deserialize, Default, Serialize, Clone)]
pub struct PublisherQoSConfList(pub(crate) Vec<PublisherQoSConf>);

impl From<PublisherQoSConfList> for KeBoxTree<PublisherQoSConfig> {
fn from(value: PublisherQoSConfList) -> KeBoxTree<PublisherQoSConfig> {
let mut tree = KeBoxTree::new();
for conf in value.0 {
for key_expr in conf.key_exprs {
// NOTE: we don't check key_expr unicity
tree.insert(&key_expr, conf.config.clone());
}
}
tree
}
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct PublisherQoSConf {
pub key_exprs: Vec<OwnedKeyExpr>,
pub config: PublisherQoSConfig,
}

#[derive(Debug, Default, Deserialize, Serialize, Clone)]
pub struct PublisherQoSConfig {
pub congestion_control: Option<PublisherCongestionControlConf>,
pub priority: Option<PublisherPriorityConf>,
pub express: Option<bool>,
#[cfg(feature = "unstable")]
pub reliability: Option<PublisherReliabilityConf>,
#[cfg(feature = "unstable")]
pub allowed_destination: Option<PublisherLocalityConf>,
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "lowercase")]
pub enum PublisherCongestionControlConf {
Drop,
Block,
}

impl From<PublisherCongestionControlConf> for CongestionControl {
fn from(value: PublisherCongestionControlConf) -> Self {
match value {
PublisherCongestionControlConf::Drop => Self::Drop,
PublisherCongestionControlConf::Block => Self::Block,
}
}
}

impl From<CongestionControl> for PublisherCongestionControlConf {
fn from(value: CongestionControl) -> Self {
match value {
CongestionControl::Drop => Self::Drop,
CongestionControl::Block => Self::Block,
}
}
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PublisherPriorityConf {
RealTime = 1,
InteractiveHigh = 2,
InteractiveLow = 3,
DataHigh = 4,
Data = 5,
DataLow = 6,
Background = 7,
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PublisherReliabilityConf {
BestEffort,
Reliable,
}

impl From<PublisherReliabilityConf> for Reliability {
fn from(value: PublisherReliabilityConf) -> Self {
match value {
PublisherReliabilityConf::BestEffort => Self::BestEffort,
PublisherReliabilityConf::Reliable => Self::Reliable,
}
}
}

impl From<Reliability> for PublisherReliabilityConf {
fn from(value: Reliability) -> Self {
match value {
Reliability::BestEffort => Self::BestEffort,
Reliability::Reliable => Self::Reliable,
}
}
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PublisherLocalityConf {
SessionLocal,
Remote,
Any,
}
2 changes: 1 addition & 1 deletion zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ transport_udp = ["zenoh-transport/transport_udp"]
transport_unixsock-stream = ["zenoh-transport/transport_unixsock-stream"]
transport_ws = ["zenoh-transport/transport_ws"]
transport_vsock = ["zenoh-transport/transport_vsock"]
unstable = ["internal_config", "zenoh-keyexpr/unstable"]
unstable = ["internal_config", "zenoh-keyexpr/unstable", "zenoh-config/unstable"]
internal_config = []

[dependencies]
Expand Down
64 changes: 61 additions & 3 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
//
use std::future::{IntoFuture, Ready};

use itertools::Itertools;
use zenoh_config::qos::PublisherQoSConfig;
use zenoh_core::{Resolvable, Result as ZResult, Wait};
use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode};
use zenoh_protocol::core::CongestionControl;
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;
Expand Down Expand Up @@ -204,7 +207,8 @@ impl<P, T> Resolvable for PublicationBuilder<P, T> {

impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher = self.publisher.apply_qos_overwrites();
self.publisher.session.0.resolve_put(
&self.publisher.key_expr?,
self.kind.payload,
Expand All @@ -226,7 +230,8 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut

impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
self.publisher = self.publisher.apply_qos_overwrites();
self.publisher.session.0.resolve_put(
&self.publisher.key_expr?,
ZBytes::new(),
Expand Down Expand Up @@ -341,6 +346,58 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
}

impl PublisherBuilder<'_, '_> {
/// Looks up if any configured QoS overwrites apply on the builder's key expression.
/// Returns a new builder with the overwritten QoS parameters.
pub(crate) fn apply_qos_overwrites(self) -> Self {
let mut qos_overwrites = PublisherQoSConfig::default();
if let Ok(key_expr) = &self.key_expr {
// get overwritten builder
let state = zread!(self.session.0.state);
let nodes_including = state
.publisher_qos_tree
.nodes_including(key_expr)
.collect_vec();
for node in &nodes_including {
// Take the first one yielded by the iterator that has overwrites
if let Some(overwrites) = node.weight() {
qos_overwrites = overwrites.clone();
// log warning if multiple keyexprs include it
if nodes_including.len() > 1 {
tracing::warn!(
"Publisher declared on `{}` which is included by multiple key_exprs in qos config. Using qos config for `{}`",
key_expr,
node.keyexpr(),
);
}
break;
}
}
}

Self {
congestion_control: qos_overwrites
.congestion_control
.map(|cc| cc.into())
.unwrap_or(self.congestion_control),
priority: qos_overwrites
.priority
.map(|p| p.into())
.unwrap_or(self.priority),
is_express: qos_overwrites.express.unwrap_or(self.is_express),
#[cfg(feature = "unstable")]
reliability: qos_overwrites
.reliability
.map(|r| r.into())
.unwrap_or(self.reliability),
#[cfg(feature = "unstable")]
destination: qos_overwrites
.allowed_destination
.map(|d| d.into())
.unwrap_or(self.destination),
..self
}
}

/// Changes the [`crate::sample::Locality`] applied when routing the data.
///
/// This restricts the matching subscribers that will receive the published data to the ones
Expand Down Expand Up @@ -372,7 +429,8 @@ impl<'b> Resolvable for PublisherBuilder<'_, 'b> {
}

impl Wait for PublisherBuilder<'_, '_> {
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
self = self.apply_qos_overwrites();
let mut key_expr = self.key_expr?;
if !key_expr.is_fully_optimized(&self.session.0) {
key_expr = self.session.declare_keyexpr(key_expr).wait()?;
Expand Down
29 changes: 29 additions & 0 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{

use futures::Sink;
use tracing::error;
use zenoh_config::qos::PublisherPriorityConf;
use zenoh_core::{Resolvable, Resolve, Wait};
use zenoh_protocol::core::CongestionControl;
use zenoh_result::{Error, ZResult};
Expand Down Expand Up @@ -477,6 +478,34 @@ impl TryFrom<u8> for Priority {
}
}

impl From<PublisherPriorityConf> for Priority {
fn from(value: PublisherPriorityConf) -> Self {
match value {
PublisherPriorityConf::RealTime => Self::RealTime,
PublisherPriorityConf::InteractiveHigh => Self::InteractiveHigh,
PublisherPriorityConf::InteractiveLow => Self::InteractiveLow,
PublisherPriorityConf::DataHigh => Self::DataHigh,
PublisherPriorityConf::Data => Self::Data,
PublisherPriorityConf::DataLow => Self::DataLow,
PublisherPriorityConf::Background => Self::Background,
}
}
}

impl From<Priority> for PublisherPriorityConf {
fn from(value: Priority) -> Self {
match value {
Priority::RealTime => Self::RealTime,
Priority::InteractiveHigh => Self::InteractiveHigh,
Priority::InteractiveLow => Self::InteractiveLow,
Priority::DataHigh => Self::DataHigh,
Priority::Data => Self::Data,
Priority::DataLow => Self::DataLow,
Priority::Background => Self::Background,
}
}
}

type ProtocolPriority = zenoh_protocol::core::Priority;
impl From<Priority> for ProtocolPriority {
fn from(prio: Priority) -> Self {
Expand Down
22 changes: 21 additions & 1 deletion zenoh/src/api/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::{convert::TryFrom, fmt};

use serde::{Deserialize, Serialize};
use zenoh_config::wrappers::EntityGlobalId;
use zenoh_config::{qos::PublisherLocalityConf, wrappers::EntityGlobalId};
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;
use zenoh_protocol::{
Expand Down Expand Up @@ -50,6 +50,26 @@ pub(crate) enum Locality {
Any,
}

impl From<PublisherLocalityConf> for Locality {
fn from(value: PublisherLocalityConf) -> Self {
match value {
PublisherLocalityConf::SessionLocal => Self::SessionLocal,
PublisherLocalityConf::Remote => Self::Remote,
PublisherLocalityConf::Any => Self::Any,
}
}
}

impl From<Locality> for PublisherLocalityConf {
fn from(value: Locality) -> Self {
match value {
Locality::SessionLocal => Self::SessionLocal,
Locality::Remote => Self::Remote,
Locality::Any => Self::Any,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub(crate) struct DataInfo {
pub kind: SampleKind,
Expand Down
Loading

0 comments on commit b3ccf82

Please sign in to comment.