Skip to content

Commit

Permalink
refactor: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Dec 3, 2024
1 parent 5455ecc commit 1753654
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ rustls-pemfile = "2.1.3"
rustls-webpki = "0.102.8"
rustls-pki-types = "1.8.0"
schemars = { version = "0.8.21", features = ["either"] }
scopeguard = "1.2.0"
secrecy = { version = "0.8.0", features = ["serde", "alloc"] }
serde = { version = "1.0.210", default-features = false, features = [
"derive",
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ paste = { workspace = true }
petgraph = { workspace = true }
phf = { workspace = true }
rand = { workspace = true, features = ["default"] }
scopeguard = { workspace = true }
serde = { workspace = true, features = ["default"] }
serde_json = { workspace = true }
socket2 = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,7 @@ impl Wait for PublisherBuilder<'_, '_> {
.declare_publisher_inner(key_expr.clone(), self.destination)?;
Ok(Publisher {
session: self.session.downgrade(),
// TODO use constants here
cache: AtomicU64::new(0b11),
cache: AtomicU64::new(0),
id,
key_expr,
encoding: self.encoding,
Expand Down
36 changes: 19 additions & 17 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2171,8 +2171,8 @@ impl SessionInner {
#[cfg(feature = "unstable")] source_info: SourceInfo,
attachment: Option<ZBytes>,
) -> ZResult<()> {
const REMOTE_TAG: u64 = 0b01;
const LOCAL_TAG: u64 = 0b10;
const NO_REMOTE_FLAG: u64 = 0b01;
const NO_LOCAL_FLAG: u64 = 0b10;
const VERSION_SHIFT: u64 = 2;
trace!("write({:?}, [...])", key_expr);
let state = zread!(self.state);
Expand All @@ -2181,21 +2181,26 @@ impl SessionInner {
.as_ref()
.cloned()
.ok_or(SessionClosedError)?;
let mut cached = REMOTE_TAG | LOCAL_TAG;
let mut to_cache = REMOTE_TAG | LOCAL_TAG;
let mut cached = 0;
let mut update_cache = None;
if let Some(cache) = cache {
cached = cache.load(Ordering::Relaxed);
let version = cached >> VERSION_SHIFT;
let c = cache.load(Ordering::Relaxed);
let version = c >> VERSION_SHIFT;
if version == state.subscription_version {
to_cache = cached;
cached = c;
} else {
to_cache = (state.subscription_version << VERSION_SHIFT) | REMOTE_TAG | LOCAL_TAG;
cached = (state.subscription_version << VERSION_SHIFT);
}
update_cache = Some(scopeguard::guard((), |_| {
if cached != c {
let _ = cache.compare_exchange(c, cached, Ordering::Relaxed, Ordering::Relaxed);
}
}));
}
drop(state);
let timestamp = timestamp.or_else(|| self.runtime.new_timestamp());
let wire_expr = key_expr.to_wire(self);
if (to_cache & REMOTE_TAG) != 0 && destination != Locality::SessionLocal {
if (cached & NO_REMOTE_FLAG) == 0 && destination != Locality::SessionLocal {
let remote = primitives.route_data(
Push {
wire_expr: wire_expr.to_owned(),
Expand Down Expand Up @@ -2237,10 +2242,10 @@ impl SessionInner {
Reliability::DEFAULT,
);
if !remote {
to_cache &= !REMOTE_TAG;
cached |= NO_REMOTE_FLAG
}
}
if (to_cache & LOCAL_TAG) != 0 && destination != Locality::Remote {
if (cached & NO_LOCAL_FLAG) == 0 && destination != Locality::Remote {
let data_info = DataInfo {
kind,
encoding: Some(encoding),
Expand All @@ -2265,12 +2270,9 @@ impl SessionInner {
attachment,
);
if !local {
to_cache &= !LOCAL_TAG;
cached |= NO_LOCAL_FLAG;
}
}
if let Some(cache) = cache.filter(|_| to_cache != cached) {
let _ = cache.compare_exchange(cached, to_cache, Ordering::Relaxed, Ordering::Relaxed);
}
Ok(())
}

Expand Down Expand Up @@ -2581,10 +2583,10 @@ impl Primitives for WeakSession {
}
zenoh_protocol::network::DeclareBody::DeclareSubscriber(m) => {
trace!("recv DeclareSubscriber {} {:?}", m.id, m.wire_expr);
let mut state = zwrite!(self.state);
state.subscription_version += 1;
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
state.subscription_version += 1;
if state.primitives.is_none() {
return; // Session closing or closed
}
Expand Down

0 comments on commit 1753654

Please sign in to comment.