Skip to content

Commit

Permalink
Replace trees computation tasks with a worker (#1303)
Browse files Browse the repository at this point in the history
* Replace trees computation tasks with a worker

* Address review comments

* Remove review comments
  • Loading branch information
OlivierHecart authored Aug 9, 2024
1 parent f47354c commit fef38dc
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 70 deletions.
62 changes: 35 additions & 27 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,42 @@ macro_rules! face_hat_mut {
}
use face_hat_mut;

struct TreesComputationWorker {
_task: TerminatableTask,
tx: flume::Sender<Arc<TablesLock>>,
}

impl TreesComputationWorker {
fn new() -> Self {
let (tx, rx) = flume::bounded::<Arc<TablesLock>>(1);
let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(
*TREES_COMPUTATION_DELAY_MS,
))
.await;
if let Ok(tables_ref) = rx.recv_async().await {
let mut tables = zwrite!(tables_ref.tables);

tracing::trace!("Compute trees");
let new_children = hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees();

tracing::trace!("Compute routes");
pubsub::pubsub_tree_change(&mut tables, &new_children);
queries::queries_tree_change(&mut tables, &new_children);
drop(tables);
}
}
});
Self { _task: task, tx }
}
}

struct HatTables {
peer_subs: HashSet<Arc<Resource>>,
peer_qabls: HashSet<Arc<Resource>>,
peers_net: Option<Network>,
peers_trees_task: Option<TerminatableTask>,
peers_trees_worker: TreesComputationWorker,
}

impl HatTables {
Expand All @@ -121,36 +152,13 @@ impl HatTables {
peer_subs: HashSet::new(),
peer_qabls: HashSet::new(),
peers_net: None,
peers_trees_task: None,
peers_trees_worker: TreesComputationWorker::new(),
}
}

fn schedule_compute_trees(&mut self, tables_ref: Arc<TablesLock>) {
tracing::trace!("Schedule computations");
if self.peers_trees_task.is_none() {
let task = TerminatableTask::spawn(
zenoh_runtime::ZRuntime::Net,
async move {
tokio::time::sleep(std::time::Duration::from_millis(
*TREES_COMPUTATION_DELAY_MS,
))
.await;
let mut tables = zwrite!(tables_ref.tables);

tracing::trace!("Compute trees");
let new_children = hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees();

tracing::trace!("Compute routes");
pubsub::pubsub_tree_change(&mut tables, &new_children);
queries::queries_tree_change(&mut tables, &new_children);

tracing::trace!("Computations completed");
hat_mut!(tables).peers_trees_task = None;
},
TerminatableTask::create_cancellation_token(),
);
self.peers_trees_task = Some(task);
}
tracing::trace!("Schedule trees computation");
let _ = self.peers_trees_worker.tx.try_send(tables_ref);
}
}

Expand Down
94 changes: 51 additions & 43 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,44 @@ macro_rules! face_hat_mut {
}
use face_hat_mut;

struct TreesComputationWorker {
_task: TerminatableTask,
tx: flume::Sender<Arc<TablesLock>>,
}

impl TreesComputationWorker {
fn new(net_type: WhatAmI) -> Self {
let (tx, rx) = flume::bounded::<Arc<TablesLock>>(1);
let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(
*TREES_COMPUTATION_DELAY_MS,
))
.await;
if let Ok(tables_ref) = rx.recv_async().await {
let mut tables = zwrite!(tables_ref.tables);

tracing::trace!("Compute trees");
let new_children = match net_type {
WhatAmI::Router => hat_mut!(tables)
.routers_net
.as_mut()
.unwrap()
.compute_trees(),
_ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(),
};

tracing::trace!("Compute routes");
pubsub::pubsub_tree_change(&mut tables, &new_children, net_type);
queries::queries_tree_change(&mut tables, &new_children, net_type);
drop(tables);
}
}
});
Self { _task: task, tx }
}
}

struct HatTables {
router_subs: HashSet<Arc<Resource>>,
peer_subs: HashSet<Arc<Resource>>,
Expand All @@ -121,8 +159,8 @@ struct HatTables {
routers_net: Option<Network>,
peers_net: Option<Network>,
shared_nodes: Vec<ZenohId>,
routers_trees_task: Option<TerminatableTask>,
peers_trees_task: Option<TerminatableTask>,
routers_trees_worker: TreesComputationWorker,
peers_trees_worker: TreesComputationWorker,
router_peers_failover_brokering: bool,
}

Expand All @@ -136,8 +174,8 @@ impl HatTables {
routers_net: None,
peers_net: None,
shared_nodes: vec![],
routers_trees_task: None,
peers_trees_task: None,
routers_trees_worker: TreesComputationWorker::new(WhatAmI::Router),
peers_trees_worker: TreesComputationWorker::new(WhatAmI::Peer),
router_peers_failover_brokering,
}
}
Expand Down Expand Up @@ -240,45 +278,15 @@ impl HatTables {
}

fn schedule_compute_trees(&mut self, tables_ref: Arc<TablesLock>, net_type: WhatAmI) {
tracing::trace!("Schedule computations");
if (net_type == WhatAmI::Router && self.routers_trees_task.is_none())
|| (net_type == WhatAmI::Peer && self.peers_trees_task.is_none())
{
let task = TerminatableTask::spawn(
zenoh_runtime::ZRuntime::Net,
async move {
tokio::time::sleep(std::time::Duration::from_millis(
*TREES_COMPUTATION_DELAY_MS,
))
.await;
let mut tables = zwrite!(tables_ref.tables);

tracing::trace!("Compute trees");
let new_children = match net_type {
WhatAmI::Router => hat_mut!(tables)
.routers_net
.as_mut()
.unwrap()
.compute_trees(),
_ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(),
};

tracing::trace!("Compute routes");
pubsub::pubsub_tree_change(&mut tables, &new_children, net_type);
queries::queries_tree_change(&mut tables, &new_children, net_type);

tracing::trace!("Computations completed");
match net_type {
WhatAmI::Router => hat_mut!(tables).routers_trees_task = None,
_ => hat_mut!(tables).peers_trees_task = None,
};
},
TerminatableTask::create_cancellation_token(),
);
match net_type {
WhatAmI::Router => self.routers_trees_task = Some(task),
_ => self.peers_trees_task = Some(task),
};
tracing::trace!("Schedule trees computation");
match net_type {
WhatAmI::Router => {
let _ = self.routers_trees_worker.tx.try_send(tables_ref);
}
WhatAmI::Peer => {
let _ = self.peers_trees_worker.tx.try_send(tables_ref);
}
_ => (),
}
}
}
Expand Down

0 comments on commit fef38dc

Please sign in to comment.