diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 9d7760247e..4edc9c98e6 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -356,47 +356,49 @@ impl HatPubSubTrait for HatCode { } }; - for face in tables - .faces - .values() - .filter(|f| f.whatami != WhatAmI::Client) - { - if face.local_interests.values().any(|interest| { - interest.finalized - && interest.options.subscribers() - && interest - .res - .as_ref() - .map(|res| { - KeyExpr::try_from(res.expr()) - .and_then(|intres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| intres.includes(&putres)) - }) - .unwrap_or(false) - }) - .unwrap_or(true) - }) { - if face_hat!(face).remote_subs.values().any(|sub| { - KeyExpr::try_from(sub.expr()) - .and_then(|subres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| subres.intersects(&putres)) - }) - .unwrap_or(false) + if source_type == WhatAmI::Client { + for face in tables + .faces + .values() + .filter(|f| f.whatami != WhatAmI::Client) + { + if face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.subscribers() + && interest + .res + .as_ref() + .map(|res| { + KeyExpr::try_from(res.expr()) + .and_then(|intres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| intres.includes(&putres)) + }) + .unwrap_or(false) + }) + .unwrap_or(true) }) { + if face_hat!(face).remote_subs.values().any(|sub| { + KeyExpr::try_from(sub.expr()) + .and_then(|subres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| subres.intersects(&putres)) + }) + .unwrap_or(false) + }) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } else { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.insert( face.id, (face.clone(), key_expr.to_owned(), NodeId::default()), ); } - } else { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.insert( - face.id, - (face.clone(), key_expr.to_owned(), NodeId::default()), - ); } } diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 8ef3ec1fb7..7658a509da 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -349,13 +349,15 @@ impl HatQueriesTrait for HatCode { } }; - if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.push(QueryTargetQabl { - direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - complete: 0, - distance: f64::MAX, - }); + if source_type == WhatAmI::Client { + if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + complete: 0, + distance: f64::MAX, + }); + } } let res = Resource::get_resource(expr.prefix, expr.suffix); diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index dffdb4e1de..12a1e67186 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -604,62 +604,64 @@ impl HatPubSubTrait for HatCode { } }; - for face in tables - .faces - .values() - .filter(|f| f.whatami == WhatAmI::Router) - { - if face.local_interests.values().any(|interest| { - interest.finalized - && interest.options.subscribers() - && interest - .res - .as_ref() - .map(|res| { - KeyExpr::try_from(res.expr()) - .and_then(|intres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| intres.includes(&putres)) - }) - .unwrap_or(false) - }) - .unwrap_or(true) - }) { - if face_hat!(face).remote_subs.values().any(|sub| { - KeyExpr::try_from(sub.expr()) - .and_then(|subres| { - KeyExpr::try_from(expr.full_expr()) - .map(|putres| subres.intersects(&putres)) - }) - .unwrap_or(false) + if source_type == WhatAmI::Client { + for face in tables + .faces + .values() + .filter(|f| f.whatami == WhatAmI::Router) + { + if face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.subscribers() + && interest + .res + .as_ref() + .map(|res| { + KeyExpr::try_from(res.expr()) + .and_then(|intres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| intres.includes(&putres)) + }) + .unwrap_or(false) + }) + .unwrap_or(true) }) { + if face_hat!(face).remote_subs.values().any(|sub| { + KeyExpr::try_from(sub.expr()) + .and_then(|subres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| subres.intersects(&putres)) + }) + .unwrap_or(false) + }) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } else { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.insert( face.id, (face.clone(), key_expr.to_owned(), NodeId::default()), ); } - } else { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.insert( - face.id, - (face.clone(), key_expr.to_owned(), NodeId::default()), - ); } - } - for face in tables.faces.values().filter(|f| { - f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true) - }) { - route.entry(face.id).or_insert_with(|| { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - (face.clone(), key_expr.to_owned(), NodeId::default()) - }); + for face in tables.faces.values().filter(|f| { + f.whatami == WhatAmI::Peer + && !f + .local_interests + .get(&0) + .map(|i| i.finalized) + .unwrap_or(true) + }) { + route.entry(face.id).or_insert_with(|| { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + (face.clone(), key_expr.to_owned(), NodeId::default()) + }); + } } let res = Resource::get_resource(expr.prefix, expr.suffix); diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 166f63b301..87b6372dae 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -589,30 +589,32 @@ impl HatQueriesTrait for HatCode { } }; - // TODO: BNestMatching: What if there is a local compete ? - if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.push(QueryTargetQabl { - direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - complete: 0, - distance: f64::MAX, - }); - } + if source_type == WhatAmI::Client { + // TODO: BNestMatching: What if there is a local compete ? + if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + complete: 0, + distance: f64::MAX, + }); + } - for face in tables.faces.values().filter(|f| { - f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true) - }) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.push(QueryTargetQabl { - direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - complete: 0, - distance: 0.5, - }); + for face in tables.faces.values().filter(|f| { + f.whatami == WhatAmI::Peer + && !f + .local_interests + .get(&0) + .map(|i| i.finalized) + .unwrap_or(true) + }) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + complete: 0, + distance: 0.5, + }); + } } let res = Resource::get_resource(expr.prefix, expr.suffix); diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index eaaf4ff921..506c85888c 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -855,7 +855,7 @@ pub(super) fn pubsub_linkstate_change( && !client_subs && !res.session_ctxs.values().any(|ctx| { ctx.face.whatami == WhatAmI::Peer - && src_face.zid != ctx.face.zid + && src_face.id != ctx.face.id && HatTables::failover_brokering_to(links, ctx.face.zid) }) }) @@ -884,7 +884,9 @@ pub(super) fn pubsub_linkstate_change( } for dst_face in tables.faces.values_mut() { - if HatTables::failover_brokering_to(links, dst_face.zid) { + if src_face.id != dst_face.id + && HatTables::failover_brokering_to(links, dst_face.zid) + { for res in face_hat!(src_face).remote_subs.values() { if !face_hat!(dst_face).local_subs.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 4703625fff..d706435179 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -946,86 +946,77 @@ pub(super) fn queries_linkstate_change( links: &[ZenohIdProto], send_declare: &mut SendDeclare, ) { - if let Some(src_face) = tables.get_face(zid) { + if let Some(mut src_face) = tables.get_face(zid).cloned() { if hat!(tables).router_peers_failover_brokering && src_face.whatami == WhatAmI::Peer { - for res in face_hat!(src_face).remote_qabls.values() { - let client_qabls = res - .session_ctxs - .values() - .any(|ctx| ctx.face.whatami == WhatAmI::Client && ctx.qabl.is_some()); - if !remote_router_qabls(tables, res) && !client_qabls { - for ctx in get_mut_unchecked(&mut res.clone()) + let to_forget = face_hat!(src_face) + .local_qabls + .keys() + .filter(|res| { + let client_qabls = res .session_ctxs - .values_mut() - { - let dst_face = &mut get_mut_unchecked(ctx).face; - if dst_face.whatami == WhatAmI::Peer && src_face.zid != dst_face.zid { - if let Some((id, _)) = face_hat!(dst_face).local_qabls.get(res).cloned() - { - let forget = !HatTables::failover_brokering_to(links, dst_face.zid) - && { - let ctx_links = hat!(tables) - .linkstatepeers_net - .as_ref() - .map(|net| net.get_links(dst_face.zid)) - .unwrap_or_else(|| &[]); - res.session_ctxs.values().any(|ctx2| { - ctx2.face.whatami == WhatAmI::Peer - && ctx2.qabl.is_some() - && HatTables::failover_brokering_to( - ctx_links, - ctx2.face.zid, - ) - }) - }; - if forget { - send_declare( - &dst_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareQueryable( - UndeclareQueryable { - id, - ext_wire_expr: WireExprType::null(), - }, - ), - }, - res.expr(), - ), - ); - - face_hat_mut!(dst_face).local_qabls.remove(res); - } - } else if HatTables::failover_brokering_to(links, ctx.face.zid) { - let dst_face = &mut get_mut_unchecked(ctx).face; - let info = local_qabl_info(tables, res, dst_face); - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face) - .local_qabls - .insert(res.clone(), (id, info)); - let key_expr = Resource::decl_key(res, dst_face); - send_declare( - &dst_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id, - wire_expr: key_expr, - ext_info: info, - }), - }, - res.expr(), - ), - ); - } + .values() + .any(|ctx| ctx.face.whatami == WhatAmI::Client && ctx.qabl.is_some()); + !remote_router_qabls(tables, res) + && !client_qabls + && !res.session_ctxs.values().any(|ctx| { + ctx.face.whatami == WhatAmI::Peer + && src_face.id != ctx.face.id + && HatTables::failover_brokering_to(links, ctx.face.zid) + }) + }) + .cloned() + .collect::>>(); + for res in to_forget { + if let Some((id, _)) = face_hat_mut!(&mut src_face).local_qabls.remove(&res) { + let wire_expr = Resource::get_best_key(&res, "", src_face.id); + send_declare( + &src_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); + } + } + + for mut dst_face in tables.faces.values().cloned() { + if src_face.id != dst_face.id + && HatTables::failover_brokering_to(links, dst_face.zid) + { + for res in face_hat!(src_face).remote_qabls.values() { + if !face_hat!(dst_face).local_qabls.contains_key(res) { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + let info = local_qabl_info(tables, res, &dst_face); + face_hat_mut!(&mut dst_face) + .local_qabls + .insert(res.clone(), (id, info)); + let key_expr = Resource::decl_key(res, &mut dst_face); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id, + wire_expr: key_expr, + ext_info: info, + }), + }, + res.expr(), + ), + ); } } } diff --git a/zenoh/src/net/routing/hat/router/token.rs b/zenoh/src/net/routing/hat/router/token.rs index e6f18a5ea2..5677901987 100644 --- a/zenoh/src/net/routing/hat/router/token.rs +++ b/zenoh/src/net/routing/hat/router/token.rs @@ -872,79 +872,73 @@ pub(super) fn token_linkstate_change( links: &[ZenohIdProto], send_declare: &mut SendDeclare, ) { - if let Some(src_face) = tables.get_face(zid).cloned() { + if let Some(mut src_face) = tables.get_face(zid).cloned() { if hat!(tables).router_peers_failover_brokering && src_face.whatami == WhatAmI::Peer { - for res in face_hat!(src_face).remote_tokens.values() { - let client_tokens = res - .session_ctxs - .values() - .any(|ctx| ctx.face.whatami == WhatAmI::Client && ctx.token); - if !remote_router_tokens(tables, res) && !client_tokens { - for ctx in get_mut_unchecked(&mut res.clone()) + let to_forget = face_hat!(src_face) + .local_tokens + .keys() + .filter(|res| { + let client_tokens = res .session_ctxs - .values_mut() - { - let dst_face = &mut get_mut_unchecked(ctx).face; - if dst_face.whatami == WhatAmI::Peer && src_face.zid != dst_face.zid { - if let Some(id) = face_hat!(dst_face).local_tokens.get(res).cloned() { - let forget = !HatTables::failover_brokering_to(links, dst_face.zid) - && { - let ctx_links = hat!(tables) - .linkstatepeers_net - .as_ref() - .map(|net| net.get_links(dst_face.zid)) - .unwrap_or_else(|| &[]); - res.session_ctxs.values().any(|ctx2| { - ctx2.face.whatami == WhatAmI::Peer - && ctx2.token - && HatTables::failover_brokering_to( - ctx_links, - ctx2.face.zid, - ) - }) - }; - if forget { - send_declare( - &dst_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr(), - ), - ); + .values() + .any(|ctx| ctx.face.whatami == WhatAmI::Client && ctx.token); + !remote_router_tokens(tables, res) + && !client_tokens + && !res.session_ctxs.values().any(|ctx| { + ctx.face.whatami == WhatAmI::Peer + && src_face.id != ctx.face.id + && HatTables::failover_brokering_to(links, ctx.face.zid) + }) + }) + .cloned() + .collect::>>(); + for res in to_forget { + if let Some(id) = face_hat_mut!(&mut src_face).local_tokens.remove(&res) { + let wire_expr = Resource::get_best_key(&res, "", src_face.id); + send_declare( + &src_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); + } + } - face_hat_mut!(dst_face).local_tokens.remove(res); - } - } else if HatTables::failover_brokering_to(links, ctx.face.zid) { - let dst_face = &mut get_mut_unchecked(ctx).face; - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); - send_declare( - &dst_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareToken(DeclareToken { - id, - wire_expr: key_expr, - }), - }, - res.expr(), - ), - ); - } + for dst_face in tables.faces.values_mut() { + if src_face.id != dst_face.id + && HatTables::failover_brokering_to(links, dst_face.zid) + { + for res in face_hat!(src_face).remote_tokens.values() { + if !face_hat!(dst_face).local_tokens.contains_key(res) { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareToken(DeclareToken { + id, + wire_expr: key_expr, + }), + }, + res.expr(), + ), + ); } } }