Skip to content

Commit

Permalink
refactor: lazily compute the push body (#1713)
Browse files Browse the repository at this point in the history
* refactor: lazily compute the push body

It allows to optimize the local workflow, as it remove all the
memmove caused by the body computation if there is no remote route.

* fix: fix lint

* fix: remove duplicated code
  • Loading branch information
wyfo authored Jan 15, 2025
1 parent 1e3a496 commit 7bebf94
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 168 deletions.
62 changes: 28 additions & 34 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2151,40 +2151,34 @@ impl SessionInner {
let timestamp = timestamp.or_else(|| self.runtime.new_timestamp());
let wire_expr = key_expr.to_wire(self);
if destination != Locality::SessionLocal {
primitives.send_push(
Push {
wire_expr: wire_expr.to_owned(),
ext_qos: push::ext::QoSType::new(
priority.into(),
congestion_control,
is_express,
),
ext_tstamp: None,
ext_nodeid: push::ext::NodeIdType::DEFAULT,
payload: match kind {
SampleKind::Put => PushBody::Put(Put {
timestamp,
encoding: encoding.clone().into(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.clone().into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
payload: payload.clone().into(),
}),
SampleKind::Delete => PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.clone().into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
}),
},
primitives.send_push_lazy(
wire_expr.to_owned(),
push::ext::QoSType::new(priority.into(), congestion_control, is_express),
None,
push::ext::NodeIdType::DEFAULT,
|| match kind {
SampleKind::Put => PushBody::Put(Put {
timestamp,
encoding: encoding.clone().into(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.clone().into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
payload: payload.clone().into(),
}),
SampleKind::Delete => PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.clone().into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
}),
},
#[cfg(feature = "unstable")]
reliability,
Expand Down
38 changes: 34 additions & 4 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use std::{

use tokio_util::sync::CancellationToken;
use zenoh_protocol::{
core::{ExprId, Reliability, WhatAmI, ZenohIdProto},
core::{ExprId, Reliability, WhatAmI, WireExpr, ZenohIdProto},
network::{
interest::{InterestId, InterestMode, InterestOptions},
Mapping, Push, Request, RequestId, Response, ResponseFinal,
push, Mapping, Push, Request, RequestId, Response, ResponseFinal,
},
zenoh::RequestBody,
zenoh::{PushBody, RequestBody},
};
use zenoh_sync::get_mut_unchecked;
use zenoh_task::TaskController;
Expand Down Expand Up @@ -207,6 +207,27 @@ pub struct Face {
}

impl Face {
pub(crate) fn send_push_lazy(
&self,
wire_expr: WireExpr,
qos: push::ext::QoSType,
ext_tstamp: Option<push::ext::TimestampType>,
ext_nodeid: push::ext::NodeIdType,
body: impl FnOnce() -> PushBody,
reliability: Reliability,
) {
route_data(
&self.tables,
&self.state,
wire_expr,
qos,
ext_tstamp,
ext_nodeid,
body,
reliability,
);
}

pub fn downgrade(&self) -> WeakFace {
WeakFace {
tables: Arc::downgrade(&self.tables),
Expand Down Expand Up @@ -388,7 +409,16 @@ impl Primitives for Face {

#[inline]
fn send_push(&self, msg: Push, reliability: Reliability) {
route_data(&self.tables, &self.state, msg, reliability);
route_data(
&self.tables,
&self.state,
msg.wire_expr,
msg.ext_qos,
msg.ext_tstamp,
msg.ext_nodeid,
move || msg.payload,
reliability,
);
}

fn send_request(&self, msg: Request) {
Expand Down
52 changes: 30 additions & 22 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use std::sync::Arc;
use zenoh_core::zread;
use zenoh_protocol::{
core::{key_expr::keyexpr, Reliability, WireExpr},
network::{
declare::{ext, SubscriberId},
Push,
},
network::{declare::SubscriberId, push::ext, Push},
zenoh::PushBody,
};
use zenoh_sync::get_mut_unchecked;
Expand Down Expand Up @@ -356,42 +353,53 @@ macro_rules! inc_stats {
};
}

// having all the arguments instead of an intermediate struct seems to enable a better inlining
// see https://github.com/eclipse-zenoh/zenoh/pull/1713#issuecomment-2590130026
#[allow(clippy::too_many_arguments)]
pub fn route_data(
tables_ref: &Arc<TablesLock>,
face: &FaceState,
mut msg: Push,
wire_expr: WireExpr,
ext_qos: ext::QoSType,
ext_tstamp: Option<ext::TimestampType>,
ext_nodeid: ext::NodeIdType,
payload: impl FnOnce() -> PushBody,
reliability: Reliability,
) {
let tables = zread!(tables_ref.tables);
match tables
.get_mapping(face, &msg.wire_expr.scope, msg.wire_expr.mapping)
.get_mapping(face, &wire_expr.scope, wire_expr.mapping)
.cloned()
{
Some(prefix) => {
tracing::trace!(
"{} Route data for res {}{}",
face,
prefix.expr(),
msg.wire_expr.suffix.as_ref()
wire_expr.suffix.as_ref()
);
let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref());
let mut expr = RoutingExpr::new(&prefix, wire_expr.suffix.as_ref());

#[cfg(feature = "stats")]
let admin = expr.full_expr().starts_with("@/");
#[cfg(feature = "stats")]
let mut payload = payload();
#[cfg(feature = "stats")]
if !admin {
inc_stats!(face, rx, user, msg.payload)
inc_stats!(face, rx, user, payload);
} else {
inc_stats!(face, rx, admin, msg.payload)
inc_stats!(face, rx, admin, payload);
}

if tables.hat_code.ingress_filter(&tables, face, &mut expr) {
let res = Resource::get_resource(&prefix, expr.suffix);

let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id);
let route = get_data_route(&tables, face, &res, &mut expr, ext_nodeid.node_id);

if !route.is_empty() {
treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp);
#[cfg(not(feature = "stats"))]
let mut payload = payload();
treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp);

if route.len() == 1 {
let (outface, key_expr, context) = route.values().next().unwrap();
Expand All @@ -402,18 +410,18 @@ pub fn route_data(
drop(tables);
#[cfg(feature = "stats")]
if !admin {
inc_stats!(outface, tx, user, msg.payload)
inc_stats!(outface, tx, user, payload);
} else {
inc_stats!(outface, tx, admin, msg.payload)
inc_stats!(outface, tx, admin, payload);
}

outface.primitives.send_push(
Push {
wire_expr: key_expr.into(),
ext_qos: msg.ext_qos,
ext_tstamp: msg.ext_tstamp,
ext_qos,
ext_tstamp,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload: msg.payload,
payload,
},
reliability,
)
Expand All @@ -433,18 +441,18 @@ pub fn route_data(
for (outface, key_expr, context) in route {
#[cfg(feature = "stats")]
if !admin {
inc_stats!(outface, tx, user, msg.payload)
inc_stats!(outface, tx, user, payload)
} else {
inc_stats!(outface, tx, admin, msg.payload)
inc_stats!(outface, tx, admin, payload)
}

outface.primitives.send_push(
Push {
wire_expr: key_expr,
ext_qos: msg.ext_qos,
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: context },
payload: msg.payload.clone(),
payload: payload.clone(),
},
reliability,
)
Expand All @@ -457,7 +465,7 @@ pub fn route_data(
tracing::error!(
"{} Route data with unknown scope {}!",
face,
msg.wire_expr.scope
wire_expr.scope
);
}
}
Expand Down
Loading

0 comments on commit 7bebf94

Please sign in to comment.