From 0a8a624906f29d31edce734a2bf4ee25e008c8d3 Mon Sep 17 00:00:00 2001 From: Junha Yang Date: Fri, 19 Aug 2022 01:59:38 +0900 Subject: [PATCH] refactor: split background task --- network/src/propagation_network/mod.rs | 58 ++++++++++++++------------ 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/network/src/propagation_network/mod.rs b/network/src/propagation_network/mod.rs index 959668d34..174ff1986 100644 --- a/network/src/propagation_network/mod.rs +++ b/network/src/propagation_network/mod.rs @@ -26,10 +26,11 @@ use tokio::{ /// This network discovers peers with Kademlia([`libp2p::kad`]), /// and propagates data with FloodSub([`libp2p::floodsub`]). pub struct PropagationNetwork { - /// A join handle for background network task. - /// - /// The task running behind this handle is the main routine of [`PropagationNetwork`]. - _task_join_handle: task::JoinHandle<()>, + /// The network event handling routine. + _event_handling_task: task::JoinHandle<()>, + + /// The network bootstrapping (node discovery) routine. + _peer_discovery_task: task::JoinHandle<()>, /// A sending endpoint of the queue that collects broadcasted messages through the network /// and sends it to the simperby node. @@ -108,15 +109,19 @@ impl PropagationNetwork { // Create a message queue that a simperby node will use to receive messages from other nodes. let (sender, _receiver) = broadcast::channel::>(config.message_queue_capacity); - let _task_join_handle = task::spawn(run_background_task( - swarm_mutex.clone(), - sender.clone(), + let _event_handling_task = task::spawn(run_event_handling_task( + Arc::clone(&swarm_mutex), config.lock_release_interval, + )); + + let _peer_discovery_task = task::spawn(run_peer_discovery_task( + Arc::clone(&swarm_mutex), config.peer_discovery_interval, )); Ok(Self { - _task_join_handle, + _event_handling_task, + _peer_discovery_task, sender, swarm: swarm_mutex, }) @@ -241,36 +246,35 @@ impl PropagationNetwork { } } -async fn run_background_task( +async fn run_peer_discovery_task( swarm: Arc>>, - _sender: broadcast::Sender>, - lock_release_interval: Duration, bootstrap_interval: Duration, +) { + loop { + // Note: An `Err` is returned only if there is no known peer, + // which is not considered to be an error if this node is + // the first one to join the network. Thus we discard the result. + let _ = swarm.lock().await.behaviour_mut().kademlia.bootstrap(); + tokio::time::sleep(bootstrap_interval).await; + } +} + +async fn run_event_handling_task( + swarm: Arc>>, + lock_release_interval: Duration, ) { // This timer guarantees that the lock for swarm will be released // regularly and within a finite time. let mut lock_release_timer = time::interval(lock_release_interval); lock_release_timer.set_missed_tick_behavior(time::MissedTickBehavior::Delay); - - let mut bootstrap_timer = time::interval(bootstrap_interval); - loop { let mut swarm = swarm.lock().await; tokio::select! { - // Get k-closest peers after every preset interval. - // Todo: Update Floodsub publish targets to match them with the k-closest peers. - _ = bootstrap_timer.tick() => { - // Note: An `Err` is returned only if there is no known peer, - // which is not considered to be an error if this node is - // the first one to join the network. - let _ = swarm.behaviour_mut().kademlia.bootstrap(); - } - // Listen on swarm events. - _event = swarm.select_next_some() => {} - // Release the lock so that other tasks can use swarm. - _ = lock_release_timer.tick() => () + _item = swarm.select_next_some() => { + // do something with item + }, + _ = lock_release_timer.tick() => (), } - // The lock for swarm is automatically released here. } }