Skip to content

Commit

Permalink
Add instrumented task spawn function (#737)
Browse files Browse the repository at this point in the history
* Add task_spawn function

* Add console feature to runtime crate

* Add origin to task_spawn

* Switch all task spawns to the new function

* Fmt

* Fix warning

* Add TaskSpawner trait
  • Loading branch information
thibault-martinez authored Sep 17, 2021
1 parent 82ba8f3 commit 64fc273
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 52 deletions.
77 changes: 45 additions & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions bee-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ development = [ "fern", "hex", "serial_test" ]
default = []
full = [
"async-trait",
"bee-runtime",
"futures",
"libp2p/dns-tokio",
"libp2p/identify",
Expand All @@ -40,7 +39,7 @@ full = [
]

[dependencies]
bee-runtime = { version = "0.1.1-alpha", path = "../bee-runtime", optional = true }
bee-runtime = { version = "0.1.1-alpha", path = "../bee-runtime" }

async-trait = { version = "0.1", optional = true }
futures = { version = "0.3", optional = true }
Expand Down
4 changes: 3 additions & 1 deletion bee-network/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::{
swarm::builder::build_swarm,
};

use bee_runtime::task::{StandaloneSpawner, TaskSpawner};

use libp2p::identity;
use log::info;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -93,7 +95,7 @@ pub mod standalone {
let (shutdown_signal_tx1, shutdown_signal_rx1) = oneshot::channel::<()>();
let (shutdown_signal_tx2, shutdown_signal_rx2) = oneshot::channel::<()>();

tokio::spawn(async move {
StandaloneSpawner::spawn(async move {
shutdown.await;

shutdown_signal_tx1.send(()).expect("sending shutdown signal");
Expand Down
4 changes: 3 additions & 1 deletion bee-network/src/network/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::{
swarm::behavior::SwarmBehavior,
};

use bee_runtime::task::{StandaloneSpawner, TaskSpawner};

use futures::{channel::oneshot, StreamExt};
use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId, Swarm};
use log::*;
Expand Down Expand Up @@ -81,7 +83,7 @@ pub mod standalone {
pub async fn start(self, config: NetworkHostConfig) {
let NetworkHost { shutdown } = self;

tokio::spawn(async move {
StandaloneSpawner::spawn(async move {
network_host_processor(config, shutdown)
.await
.expect("network host processor");
Expand Down
13 changes: 8 additions & 5 deletions bee-network/src/service/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use crate::{
swarm::protocols::iota_gossip,
};

use bee_runtime::shutdown_stream::ShutdownStream;
use bee_runtime::{
shutdown_stream::ShutdownStream,
task::{StandaloneSpawner, TaskSpawner},
};

use futures::{
channel::oneshot,
Expand Down Expand Up @@ -138,26 +141,26 @@ pub mod standalone {
let (shutdown_tx2, shutdown_rx2) = oneshot::channel::<()>();
let (shutdown_tx3, shutdown_rx3) = oneshot::channel::<()>();

tokio::spawn(async move {
StandaloneSpawner::spawn(async move {
shutdown.await.expect("receiving shutdown signal");

shutdown_tx1.send(()).expect("receiving shutdown signal");
shutdown_tx2.send(()).expect("receiving shutdown signal");
shutdown_tx3.send(()).expect("receiving shutdown signal");
});
tokio::spawn(command_processor(
StandaloneSpawner::spawn(command_processor(
shutdown_rx1,
commands,
senders.clone(),
peerlist.clone(),
));
tokio::spawn(event_processor(
StandaloneSpawner::spawn(event_processor(
shutdown_rx2,
internal_events,
senders.clone(),
peerlist.clone(),
));
tokio::spawn(peerstate_checker(shutdown_rx3, senders, peerlist));
StandaloneSpawner::spawn(peerstate_checker(shutdown_rx3, senders, peerlist));

info!("Network service started.");
}
Expand Down
4 changes: 3 additions & 1 deletion bee-network/src/swarm/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use super::{behavior::SwarmBehavior, error::Error};

use crate::service::event::InternalEventSender;

use bee_runtime::task::{StandaloneSpawner, TaskSpawner};

use libp2p::{
core::{
connection::ConnectionLimits,
Expand Down Expand Up @@ -64,7 +66,7 @@ pub fn build_swarm(
// We want the connection background tasks to be spawned
// onto the tokio runtime.
.executor(Box::new(|fut| {
tokio::spawn(fut);
StandaloneSpawner::spawn(fut);
}))
.build();

Expand Down
22 changes: 12 additions & 10 deletions bee-network/src/swarm/protocols/iota_gossip/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use crate::{
service::event::{InternalEvent, InternalEventSender},
};

use bee_runtime::task::{StandaloneSpawner, TaskSpawner};

use futures::{
io::{BufReader, BufWriter, ReadHalf, WriteHalf},
AsyncReadExt, AsyncWriteExt, StreamExt,
Expand Down Expand Up @@ -34,24 +36,24 @@ pub fn start_incoming_processor(
incoming_tx: GossipSender,
internal_event_sender: InternalEventSender,
) {
tokio::spawn(async move {
StandaloneSpawner::spawn(async move {
let mut msg_buf = vec![0u8; MSG_BUFFER_LEN];

loop {
if let Some(len) = (&mut reader).read(&mut msg_buf).await.ok().filter(|len| *len > 0) {
if incoming_tx.send(msg_buf[..len].to_vec()).is_err() {
debug!("gossip-in: receiver dropped locally.");

// The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do to
// salvage this situation, hence we drop the connection.
// The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do
// to salvage this situation, hence we drop the connection.
break;
}
} else {
debug!("gossip-in: stream closed remotely.");

// NB: The network service will not shut down before it has received the `ProtocolDropped` event from
// all once connected peers, hence if the following send fails, then it must be
// considered a bug.
// NB: The network service will not shut down before it has received the `ProtocolDropped` event
// from all once connected peers, hence if the following send fails, then it
// must be considered a bug.

// The remote peer dropped the connection.
internal_event_sender
Expand All @@ -76,7 +78,7 @@ pub fn start_outgoing_processor(
outgoing_rx: GossipReceiver,
internal_event_sender: InternalEventSender,
) {
tokio::spawn(async move {
StandaloneSpawner::spawn(async move {
let mut outgoing_gossip_receiver = outgoing_rx.fuse();

// If the gossip sender dropped we end the connection.
Expand All @@ -88,9 +90,9 @@ pub fn start_outgoing_processor(
if message.is_empty() {
debug!("gossip-out: received shutdown message.");

// NB: The network service will not shut down before it has received the `ConnectionDropped` event from
// all once connected peers, hence if the following send fails, then it must be
// considered a bug.
// NB: The network service will not shut down before it has received the `ConnectionDropped` event
// from all once connected peers, hence if the following send fails, then it
// must be considered a bug.

internal_event_sender
.send(InternalEvent::ProtocolDropped { peer_id })
Expand Down
Loading

0 comments on commit 64fc273

Please sign in to comment.