From 64fc273a1851bdb5d0882da648d56a67c13e7e2c Mon Sep 17 00:00:00 2001 From: Thibault Martinez Date: Fri, 17 Sep 2021 17:29:24 +0200 Subject: [PATCH] Add instrumented task spawn function (#737) * Add task_spawn function * Add console feature to runtime crate * Add origin to task_spawn * Switch all task spawns to the new function * Fmt * Fix warning * Add TaskSpawner trait --- Cargo.lock | 77 +++++++++++-------- bee-network/Cargo.toml | 3 +- bee-network/src/init.rs | 4 +- bee-network/src/network/host.rs | 4 +- bee-network/src/service/host.rs | 13 ++-- bee-network/src/swarm/builder.rs | 4 +- .../src/swarm/protocols/iota_gossip/io.rs | 22 +++--- bee-runtime/Cargo.toml | 5 ++ bee-runtime/src/lib.rs | 1 + bee-runtime/src/task.rs | 44 +++++++++++ 10 files changed, 125 insertions(+), 52 deletions(-) create mode 100644 bee-runtime/src/task.rs diff --git a/Cargo.lock b/Cargo.lock index 8c241a01f2..ded94dc389 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,6 +282,7 @@ dependencies = [ "futures", "log", "tokio", + "tracing", ] [[package]] @@ -1253,9 +1254,9 @@ checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" [[package]] name = "hyper" -version = "0.14.12" +version = "0.14.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13f67199e765030fa08fe0bd581af683f0d5bc04ea09c2b1102012c5fb90e7fd" +checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593" dependencies = [ "bytes", "futures-channel", @@ -1268,7 +1269,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.1", + "socket2 0.4.2", "tokio", "tower-service", "tracing", @@ -1394,9 +1395,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.54" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1866b355d9c878e5e607473cbe3f63282c0b7aad2db1dbebf55076c686918254" +checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" dependencies = [ "wasm-bindgen", ] @@ -1435,9 +1436,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.101" +version = "0.2.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21" +checksum = "a2a5ac8f984bfcf3a823267e5fde638acc3325f6496633a5da6bb6eb2171e103" [[package]] name = "libloading" @@ -1617,7 +1618,7 @@ dependencies = [ "libc", "libp2p-core", "log", - "socket2 0.4.1", + "socket2 0.4.2", "tokio", ] @@ -2177,9 +2178,9 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "proc-macro-crate" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41fdbd1df62156fbc5945f4762632564d7d038153091c3fcf1067f6aef7cff92" +checksum = "1ebace6889caf889b4d3f76becee12e90353f2b8c7d875534a71e5742f8f6f83" dependencies = [ "thiserror", "toml", @@ -2693,9 +2694,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.67" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7f9e390c27c3c0ce8bc5d725f6e4d30a29d26659494aa4b17535f7522c5c950" +checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ "itoa", "ryu", @@ -2853,9 +2854,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad" +checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", "winapi", @@ -3077,15 +3078,27 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" +checksum = "c2ba9ab62b7d6497a8638dfda5e5c4fb3b2d5a7fca4118f2b96151c8ef1a437e" dependencies = [ "cfg-if", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98863d0dd09fa59a1b79c6750ad80dbda6b75f4e71c437a6a1a8cb91a8bcbd77" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.20" @@ -3181,9 +3194,9 @@ checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" [[package]] name = "unicode-width" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" [[package]] name = "unicode-xid" @@ -3282,9 +3295,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c" +checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" dependencies = [ "cfg-if", "serde", @@ -3294,9 +3307,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f34c405b4f0658583dba0c1c7c9b694f3cac32655db463b56c254a1c75269523" +checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" dependencies = [ "bumpalo", "lazy_static", @@ -3309,9 +3322,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.27" +version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a87d738d4abc4cf22f6eb142f5b9a81301331ee3c767f2fef2fda4e325492060" +checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39" dependencies = [ "cfg-if", "js-sys", @@ -3321,9 +3334,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d5a6580be83b19dc570a8f9c324251687ab2184e57086f71625feb57ec77c8" +checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3331,9 +3344,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3775a030dc6f5a0afd8a84981a21cc92a781eb429acef9ecce476d0c9113e92" +checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" dependencies = [ "proc-macro2", "quote", @@ -3344,9 +3357,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c279e376c7a8e8752a8f1eaa35b7b0bee6bb9fb0cdacfa97cc3f1f289c87e2b4" +checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" [[package]] name = "wasm-timer" @@ -3365,9 +3378,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.54" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a84d70d1ec7d2da2d26a5bd78f4bca1b8c3254805363ce743b7a05bc30d195a" +checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/bee-network/Cargo.toml b/bee-network/Cargo.toml index d0c20a5cc2..3df885275e 100644 --- a/bee-network/Cargo.toml +++ b/bee-network/Cargo.toml @@ -22,7 +22,6 @@ development = [ "fern", "hex", "serial_test" ] default = [] full = [ "async-trait", - "bee-runtime", "futures", "libp2p/dns-tokio", "libp2p/identify", @@ -40,7 +39,7 @@ full = [ ] [dependencies] -bee-runtime = { version = "0.1.1-alpha", path = "../bee-runtime", optional = true } +bee-runtime = { version = "0.1.1-alpha", path = "../bee-runtime" } async-trait = { version = "0.1", optional = true } futures = { version = "0.3", optional = true } diff --git a/bee-network/src/init.rs b/bee-network/src/init.rs index f85266bffe..6af01c2a6e 100644 --- a/bee-network/src/init.rs +++ b/bee-network/src/init.rs @@ -24,6 +24,8 @@ use crate::{ swarm::builder::build_swarm, }; +use bee_runtime::task::{StandaloneSpawner, TaskSpawner}; + use libp2p::identity; use log::info; use once_cell::sync::OnceCell; @@ -93,7 +95,7 @@ pub mod standalone { let (shutdown_signal_tx1, shutdown_signal_rx1) = oneshot::channel::<()>(); let (shutdown_signal_tx2, shutdown_signal_rx2) = oneshot::channel::<()>(); - tokio::spawn(async move { + StandaloneSpawner::spawn(async move { shutdown.await; shutdown_signal_tx1.send(()).expect("sending shutdown signal"); diff --git a/bee-network/src/network/host.rs b/bee-network/src/network/host.rs index 995941c643..22363b9442 100644 --- a/bee-network/src/network/host.rs +++ b/bee-network/src/network/host.rs @@ -13,6 +13,8 @@ use crate::{ swarm::behavior::SwarmBehavior, }; +use bee_runtime::task::{StandaloneSpawner, TaskSpawner}; + use futures::{channel::oneshot, StreamExt}; use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId, Swarm}; use log::*; @@ -81,7 +83,7 @@ pub mod standalone { pub async fn start(self, config: NetworkHostConfig) { let NetworkHost { shutdown } = self; - tokio::spawn(async move { + StandaloneSpawner::spawn(async move { network_host_processor(config, shutdown) .await .expect("network host processor"); diff --git a/bee-network/src/service/host.rs b/bee-network/src/service/host.rs index 0be427d473..3267d005f2 100644 --- a/bee-network/src/service/host.rs +++ b/bee-network/src/service/host.rs @@ -18,7 +18,10 @@ use crate::{ swarm::protocols::iota_gossip, }; -use bee_runtime::shutdown_stream::ShutdownStream; +use bee_runtime::{ + shutdown_stream::ShutdownStream, + task::{StandaloneSpawner, TaskSpawner}, +}; use futures::{ channel::oneshot, @@ -138,26 +141,26 @@ pub mod standalone { let (shutdown_tx2, shutdown_rx2) = oneshot::channel::<()>(); let (shutdown_tx3, shutdown_rx3) = oneshot::channel::<()>(); - tokio::spawn(async move { + StandaloneSpawner::spawn(async move { shutdown.await.expect("receiving shutdown signal"); shutdown_tx1.send(()).expect("receiving shutdown signal"); shutdown_tx2.send(()).expect("receiving shutdown signal"); shutdown_tx3.send(()).expect("receiving shutdown signal"); }); - tokio::spawn(command_processor( + StandaloneSpawner::spawn(command_processor( shutdown_rx1, commands, senders.clone(), peerlist.clone(), )); - tokio::spawn(event_processor( + StandaloneSpawner::spawn(event_processor( shutdown_rx2, internal_events, senders.clone(), peerlist.clone(), )); - tokio::spawn(peerstate_checker(shutdown_rx3, senders, peerlist)); + StandaloneSpawner::spawn(peerstate_checker(shutdown_rx3, senders, peerlist)); info!("Network service started."); } diff --git a/bee-network/src/swarm/builder.rs b/bee-network/src/swarm/builder.rs index 7362614847..cc99343493 100644 --- a/bee-network/src/swarm/builder.rs +++ b/bee-network/src/swarm/builder.rs @@ -5,6 +5,8 @@ use super::{behavior::SwarmBehavior, error::Error}; use crate::service::event::InternalEventSender; +use bee_runtime::task::{StandaloneSpawner, TaskSpawner}; + use libp2p::{ core::{ connection::ConnectionLimits, @@ -64,7 +66,7 @@ pub fn build_swarm( // We want the connection background tasks to be spawned // onto the tokio runtime. .executor(Box::new(|fut| { - tokio::spawn(fut); + StandaloneSpawner::spawn(fut); })) .build(); diff --git a/bee-network/src/swarm/protocols/iota_gossip/io.rs b/bee-network/src/swarm/protocols/iota_gossip/io.rs index 86e3998c1c..a377723c6d 100644 --- a/bee-network/src/swarm/protocols/iota_gossip/io.rs +++ b/bee-network/src/swarm/protocols/iota_gossip/io.rs @@ -6,6 +6,8 @@ use crate::{ service::event::{InternalEvent, InternalEventSender}, }; +use bee_runtime::task::{StandaloneSpawner, TaskSpawner}; + use futures::{ io::{BufReader, BufWriter, ReadHalf, WriteHalf}, AsyncReadExt, AsyncWriteExt, StreamExt, @@ -34,7 +36,7 @@ pub fn start_incoming_processor( incoming_tx: GossipSender, internal_event_sender: InternalEventSender, ) { - tokio::spawn(async move { + StandaloneSpawner::spawn(async move { let mut msg_buf = vec![0u8; MSG_BUFFER_LEN]; loop { @@ -42,16 +44,16 @@ pub fn start_incoming_processor( if incoming_tx.send(msg_buf[..len].to_vec()).is_err() { debug!("gossip-in: receiver dropped locally."); - // The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do to - // salvage this situation, hence we drop the connection. + // The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do + // to salvage this situation, hence we drop the connection. break; } } else { debug!("gossip-in: stream closed remotely."); - // NB: The network service will not shut down before it has received the `ProtocolDropped` event from - // all once connected peers, hence if the following send fails, then it must be - // considered a bug. + // NB: The network service will not shut down before it has received the `ProtocolDropped` event + // from all once connected peers, hence if the following send fails, then it + // must be considered a bug. // The remote peer dropped the connection. internal_event_sender @@ -76,7 +78,7 @@ pub fn start_outgoing_processor( outgoing_rx: GossipReceiver, internal_event_sender: InternalEventSender, ) { - tokio::spawn(async move { + StandaloneSpawner::spawn(async move { let mut outgoing_gossip_receiver = outgoing_rx.fuse(); // If the gossip sender dropped we end the connection. @@ -88,9 +90,9 @@ pub fn start_outgoing_processor( if message.is_empty() { debug!("gossip-out: received shutdown message."); - // NB: The network service will not shut down before it has received the `ConnectionDropped` event from - // all once connected peers, hence if the following send fails, then it must be - // considered a bug. + // NB: The network service will not shut down before it has received the `ConnectionDropped` event + // from all once connected peers, hence if the following send fails, then it + // must be considered a bug. internal_event_sender .send(InternalEvent::ProtocolDropped { peer_id }) diff --git a/bee-runtime/Cargo.toml b/bee-runtime/Cargo.toml index e9315a3b68..06ae962e2a 100644 --- a/bee-runtime/Cargo.toml +++ b/bee-runtime/Cargo.toml @@ -17,6 +17,11 @@ async-trait = "0.1" dashmap = "4.0" futures = "0.3" log = { version = "0.4", features = [ "serde" ] } +tokio = "1.11" +tracing = { version = "0.1", optional = true } [dev-dependencies] tokio = { version = "1.11", features = [ "rt", "macros", "time" ] } + +[features] +console = [ "tracing" ] diff --git a/bee-runtime/src/lib.rs b/bee-runtime/src/lib.rs index 3aa6a15c2e..dd04fa9ac2 100644 --- a/bee-runtime/src/lib.rs +++ b/bee-runtime/src/lib.rs @@ -9,4 +9,5 @@ pub mod event; pub mod node; pub mod resource; pub mod shutdown_stream; +pub mod task; pub mod worker; diff --git a/bee-runtime/src/task.rs b/bee-runtime/src/task.rs new file mode 100644 index 0000000000..69bfbc8307 --- /dev/null +++ b/bee-runtime/src/task.rs @@ -0,0 +1,44 @@ +// Copyright 2020-2021 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +//! Task utilities. + +use std::future::Future; + +/// Instrumented task spawner with associated origin. +pub trait TaskSpawner { + /// Origin of the task. + const ORIGIN: &'static str; + + /// Spawns a task with or without instrumentation depending on a compile time feature. + #[track_caller] + fn spawn(future: F) -> tokio::task::JoinHandle<::Output> + where + F: Future + Send + 'static, + ::Output: Send, + { + #[cfg(feature = "console")] + { + let caller = std::panic::Location::caller(); + let span = tracing::info_span!( + target: "tokio::task", + "task", + origin = Self::ORIGIN, + file = caller.file(), + line = caller.line(), + ); + + tokio::spawn(tracing::Instrument::instrument(future, span)) + } + + #[cfg(not(feature = "console"))] + tokio::spawn(future) + } +} + +/// Instrumented task spawner with a standalone origin. +pub struct StandaloneSpawner; + +impl TaskSpawner for StandaloneSpawner { + const ORIGIN: &'static str = "standalone"; +}