Skip to content

Commit

Permalink
refactor: split background task
Browse files Browse the repository at this point in the history
  • Loading branch information
junha1 authored and f2koi committed Aug 19, 2022
1 parent 1204a35 commit 0a8a624
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions network/src/propagation_network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use tokio::{
/// This network discovers peers with Kademlia([`libp2p::kad`]),
/// and propagates data with FloodSub([`libp2p::floodsub`]).
pub struct PropagationNetwork {
/// A join handle for background network task.
///
/// The task running behind this handle is the main routine of [`PropagationNetwork`].
_task_join_handle: task::JoinHandle<()>,
/// The network event handling routine.
_event_handling_task: task::JoinHandle<()>,

/// The network bootstrapping (node discovery) routine.
_peer_discovery_task: task::JoinHandle<()>,

/// A sending endpoint of the queue that collects broadcasted messages through the network
/// and sends it to the simperby node.
Expand Down Expand Up @@ -108,15 +109,19 @@ impl PropagationNetwork {
// Create a message queue that a simperby node will use to receive messages from other nodes.
let (sender, _receiver) = broadcast::channel::<Vec<u8>>(config.message_queue_capacity);

let _task_join_handle = task::spawn(run_background_task(
swarm_mutex.clone(),
sender.clone(),
let _event_handling_task = task::spawn(run_event_handling_task(
Arc::clone(&swarm_mutex),
config.lock_release_interval,
));

let _peer_discovery_task = task::spawn(run_peer_discovery_task(
Arc::clone(&swarm_mutex),
config.peer_discovery_interval,
));

Ok(Self {
_task_join_handle,
_event_handling_task,
_peer_discovery_task,
sender,
swarm: swarm_mutex,
})
Expand Down Expand Up @@ -241,36 +246,35 @@ impl PropagationNetwork {
}
}

async fn run_background_task(
async fn run_peer_discovery_task(
swarm: Arc<Mutex<Swarm<Behaviour>>>,
_sender: broadcast::Sender<Vec<u8>>,
lock_release_interval: Duration,
bootstrap_interval: Duration,
) {
loop {
// Note: An `Err` is returned only if there is no known peer,
// which is not considered to be an error if this node is
// the first one to join the network. Thus we discard the result.
let _ = swarm.lock().await.behaviour_mut().kademlia.bootstrap();
tokio::time::sleep(bootstrap_interval).await;
}
}

async fn run_event_handling_task(
swarm: Arc<Mutex<Swarm<Behaviour>>>,
lock_release_interval: Duration,
) {
// This timer guarantees that the lock for swarm will be released
// regularly and within a finite time.
let mut lock_release_timer = time::interval(lock_release_interval);
lock_release_timer.set_missed_tick_behavior(time::MissedTickBehavior::Delay);

let mut bootstrap_timer = time::interval(bootstrap_interval);

loop {
let mut swarm = swarm.lock().await;
tokio::select! {
// Get k-closest peers after every preset interval.
// Todo: Update Floodsub publish targets to match them with the k-closest peers.
_ = bootstrap_timer.tick() => {
// Note: An `Err` is returned only if there is no known peer,
// which is not considered to be an error if this node is
// the first one to join the network.
let _ = swarm.behaviour_mut().kademlia.bootstrap();
}
// Listen on swarm events.
_event = swarm.select_next_some() => {}
// Release the lock so that other tasks can use swarm.
_ = lock_release_timer.tick() => ()
_item = swarm.select_next_some() => {
// do something with item
},
_ = lock_release_timer.tick() => (),
}
// The lock for swarm is automatically released here.
}
}

Expand Down

0 comments on commit 0a8a624

Please sign in to comment.