Skip to content

Commit

Permalink
Fix bug declaring/undeclaring tokens with the same key (eclipse-zenoh…
Browse files Browse the repository at this point in the history
…#1619)

* Fix bug declaring/undeclaring tokens with the same key

* Add test

* Simplified
  • Loading branch information
OlivierHecart authored Nov 29, 2024
1 parent 92fd3a6 commit c764bf9
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 55 deletions.
15 changes: 4 additions & 11 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
session
.0
.declare_liveliness_inner(&key_expr)
.map(|tok_state| LivelinessToken {
.map(|id| LivelinessToken {
session: self.session.downgrade(),
state: tok_state,
id,
undeclare_on_drop: true,
})
}
Expand All @@ -272,13 +272,6 @@ impl IntoFuture for LivelinessTokenBuilder<'_, '_> {
}
}

#[zenoh_macros::unstable]
#[derive(Debug)]
pub(crate) struct LivelinessTokenState {
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'static>,
}

/// A token whose liveliness is tied to the Zenoh [`Session`](Session).
///
/// A declared liveliness token will be seen as alive by any other Zenoh
Expand Down Expand Up @@ -308,7 +301,7 @@ pub(crate) struct LivelinessTokenState {
#[derive(Debug)]
pub struct LivelinessToken {
session: WeakSession,
state: Arc<LivelinessTokenState>,
id: Id,
undeclare_on_drop: bool,
}

Expand Down Expand Up @@ -382,7 +375,7 @@ impl LivelinessToken {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.undeclare_on_drop = false;
self.session.undeclare_liveliness(self.state.id)
self.session.undeclare_liveliness(self.id)
}
}

Expand Down
61 changes: 17 additions & 44 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use zenoh_task::TaskController;
use crate::api::selector::ZenohParameters;
#[cfg(feature = "unstable")]
use crate::api::{
liveliness::{Liveliness, LivelinessTokenState},
liveliness::Liveliness,
publisher::Publisher,
publisher::{MatchingListenerState, MatchingStatus},
query::LivelinessQueryState,
Expand Down Expand Up @@ -137,8 +137,6 @@ pub(crate) struct SessionState {
pub(crate) liveliness_subscribers: HashMap<Id, Arc<SubscriberState>>,
pub(crate) queryables: HashMap<Id, Arc<QueryableState>>,
#[cfg(feature = "unstable")]
pub(crate) tokens: HashMap<Id, Arc<LivelinessTokenState>>,
#[cfg(feature = "unstable")]
pub(crate) matching_listeners: HashMap<Id, Arc<MatchingListenerState>>,
pub(crate) queries: HashMap<RequestId, QueryState>,
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -170,8 +168,6 @@ impl SessionState {
liveliness_subscribers: HashMap::new(),
queryables: HashMap::new(),
#[cfg(feature = "unstable")]
tokens: HashMap::new(),
#[cfg(feature = "unstable")]
matching_listeners: HashMap::new(),
queries: HashMap::new(),
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -1110,7 +1106,6 @@ impl SessionInner {
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _tokens = std::mem::take(&mut state.tokens);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
Expand Down Expand Up @@ -1548,21 +1543,10 @@ impl SessionInner {
}

#[zenoh_macros::unstable]
pub(crate) fn declare_liveliness_inner(
&self,
key_expr: &KeyExpr,
) -> ZResult<Arc<LivelinessTokenState>> {
let mut state = zwrite!(self.state);
pub(crate) fn declare_liveliness_inner(&self, key_expr: &KeyExpr) -> ZResult<Id> {
tracing::trace!("declare_liveliness({:?})", key_expr);
let id = self.runtime.next_id();
let tok_state = Arc::new(LivelinessTokenState {
id,
key_expr: key_expr.clone().into_owned(),
});

state.tokens.insert(tok_state.id, tok_state.clone());
let primitives = state.primitives()?;
drop(state);
let primitives = zread!(self.state).primitives()?;
primitives.send_declare(Declare {
interest_id: None,
ext_qos: declare::ext::QoSType::DECLARE,
Expand All @@ -1573,7 +1557,7 @@ impl SessionInner {
wire_expr: key_expr.to_wire(self).to_owned(),
}),
});
Ok(tok_state)
Ok(id)
}

#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -1679,32 +1663,21 @@ impl SessionInner {

#[zenoh_macros::unstable]
pub(crate) fn undeclare_liveliness(&self, tid: Id) -> ZResult<()> {
let mut state = zwrite!(self.state);
let Ok(primitives) = state.primitives() else {
let Ok(primitives) = zread!(self.state).primitives() else {
return Ok(());
};
if let Some(tok_state) = state.tokens.remove(&tid) {
trace!("undeclare_liveliness({:?})", tok_state);
// Note: there might be several Tokens on the same KeyExpr.
let key_expr = &tok_state.key_expr;
let twin_tok = state.tokens.values().any(|s| s.key_expr == *key_expr);
if !twin_tok {
drop(state);
primitives.send_declare(Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareToken(UndeclareToken {
id: tok_state.id,
ext_wire_expr: WireExprType::null(),
}),
});
}
Ok(())
} else {
Err(zerror!("Unable to find liveliness token").into())
}
trace!("undeclare_liveliness({:?})", tid);
primitives.send_declare(Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareToken(UndeclareToken {
id: tid,
ext_wire_expr: WireExprType::null(),
}),
});
Ok(())
}

#[zenoh_macros::unstable]
Expand Down
76 changes: 76 additions & 0 deletions zenoh/tests/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4718,3 +4718,79 @@ async fn test_liveliness_issue_1470() {
client0.close().await.unwrap();
client1.close().await.unwrap();
}

/// -------------------------------------------------------
/// DOUBLE UNDECLARE CLIQUE
/// -------------------------------------------------------
#[cfg(feature = "unstable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_liveliness_double_undeclare_clique() {
use std::time::Duration;

use zenoh::{config::WhatAmI, sample::SampleKind};
use zenoh_config::EndPoint;
const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const PEER1_ENDPOINT: &str = "tcp/localhost:30515";
const LIVELINESS_KEYEXPR: &str = "test/liveliness/double/undeclare/clique";

zenoh_util::init_log_from_env_or("error");

let peer1 = {
let mut c = zenoh::Config::default();
c.listen
.endpoints
.set(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (1) ZID: {}", s.zid());
s
};

let peer2 = {
let mut c = zenoh::Config::default();
c.connect
.endpoints
.set(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (2) ZID: {}", s.zid());
s
};

let sub = ztimeout!(peer1.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let token = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

assert!(sub.try_recv().unwrap().is_none());

token.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

assert!(sub.try_recv().unwrap().is_none());

token2.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Delete);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

sub.undeclare().await.unwrap();

peer1.close().await.unwrap();
peer2.close().await.unwrap();
}

0 comments on commit c764bf9

Please sign in to comment.