diff --git a/.github/workflows/coverage.yaml b/.github/workflows/coverage.yaml index 7f5bf2946..06529d53d 100644 --- a/.github/workflows/coverage.yaml +++ b/.github/workflows/coverage.yaml @@ -55,6 +55,9 @@ jobs: name: Run Build Checks run: cargo check --tests --benches --examples --workspace --all-targets --all-features + # Run Test Locally: + # RUSTFLAGS="-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests" RUSTDOCFLAGS="-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests" CARGO_INCREMENTAL="0" RUST_BACKTRACE=1 cargo test --tests --benches --examples --workspace --all-targets --all-features + - id: test name: Run Unit Tests run: cargo test --tests --benches --examples --workspace --all-targets --all-features diff --git a/.vscode/settings.json b/.vscode/settings.json index 038da4c18..2fbd8c28c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,12 @@ "[rust]": { "editor.formatOnSave": true }, + // "rust-analyzer.cargo.extraEnv": { + // "RUSTFLAGS": "-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests", + // "RUSTDOCFLAGS": "-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests", + // "CARGO_INCREMENTAL": "0", + // "RUST_BACKTRACE": "1" + // }, "rust-analyzer.checkOnSave": true, "rust-analyzer.check.command": "clippy", "rust-analyzer.check.allTargets": true, diff --git a/Cargo.lock b/Cargo.lock index 9b7c10f39..62b659782 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1979,6 +1979,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.4" @@ -2104,6 +2114,12 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -2611,6 +2627,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ringbuf" +version = "0.4.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b8f7d58e4f67752d63318605656be063e333154aa35b70126075e9d05552979" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "rkyv" version = "0.7.43" @@ -3002,6 +3027,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.2.0" @@ -3214,6 +3248,16 @@ dependencies = [ "syn 2.0.47", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.31" @@ -3426,7 +3470,6 @@ dependencies = [ "hyper 1.1.0", "lazy_static", "local-ip-address", - "log", "mockall", "multimap", "once_cell", @@ -3437,6 +3480,7 @@ dependencies = [ "r2d2_sqlite", "rand", "reqwest", + "ringbuf", "serde", "serde_bencode", "serde_bytes", @@ -3454,6 +3498,8 @@ dependencies = [ "torrust-tracker-primitives", "torrust-tracker-test-helpers", "tower-http", + "tracing", + "tracing-subscriber", "uuid", ] @@ -3484,8 +3530,8 @@ dependencies = [ name = "torrust-tracker-located-error" version = "3.0.0-alpha.12-develop" dependencies = [ - "log", "thiserror", + "tracing", ] [[package]] @@ -3562,9 +3608,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.47", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -3572,6 +3630,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -3657,6 +3741,12 @@ dependencies = [ "rand", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 64f913e4f..8f89b6f6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,8 @@ fern = "0" futures = "0" hyper = "1" lazy_static = "1" -log = { version = "0", features = ["release_max_level_info"] } +tracing = "0.1" +tracing-subscriber = "0.3" multimap = "0" openssl = { version = "0", features = ["vendored"] } percent-encoding = "2" @@ -55,6 +56,7 @@ reqwest = "0" serde = { version = "1", features = ["derive"] } serde_bencode = "0" serde_json = "1" +ringbuf = "0.4.0-rc.2" serde_with = "3" tdyne-peer-id = "1" tdyne-peer-id-registry = "0" diff --git a/cSpell.json b/cSpell.json index a8ba5be6a..562f939c3 100644 --- a/cSpell.json +++ b/cSpell.json @@ -92,8 +92,10 @@ "reannounce", "Registar", "repr", + "reqs", "reqwest", "rerequests", + "ringbuf", "rngs", "routable", "rusqlite", diff --git a/packages/located-error/Cargo.toml b/packages/located-error/Cargo.toml index fa3d1d76d..d8633bc59 100644 --- a/packages/located-error/Cargo.toml +++ b/packages/located-error/Cargo.toml @@ -15,7 +15,7 @@ rust-version.workspace = true version.workspace = true [dependencies] -log = { version = "0", features = ["release_max_level_info"] } +tracing = "0.1" [dev-dependencies] thiserror = "1" diff --git a/packages/located-error/src/lib.rs b/packages/located-error/src/lib.rs index 49e135600..bfd4d4a86 100644 --- a/packages/located-error/src/lib.rs +++ b/packages/located-error/src/lib.rs @@ -33,7 +33,7 @@ use std::error::Error; use std::panic::Location; use std::sync::Arc; -use log::debug; +use tracing::debug; pub type DynError = Arc; diff --git a/src/app.rs b/src/app.rs index fb7131878..a042e3d62 100644 --- a/src/app.rs +++ b/src/app.rs @@ -26,10 +26,10 @@ use std::net::SocketAddr; use std::sync::Arc; use derive_more::Constructor; -use log::warn; use tokio::sync::Mutex; use tokio::task::JoinHandle; use torrust_tracker_configuration::Configuration; +use tracing::warn; use crate::bootstrap::jobs::{health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker}; use crate::{core, servers}; diff --git a/src/bootstrap/jobs/health_check_api.rs b/src/bootstrap/jobs/health_check_api.rs index 1690f3fb0..bc4a691f6 100644 --- a/src/bootstrap/jobs/health_check_api.rs +++ b/src/bootstrap/jobs/health_check_api.rs @@ -14,10 +14,10 @@ //! Refer to the [configuration documentation](https://docs.rs/torrust-tracker-configuration) //! for the API configuration options. -use log::info; use tokio::sync::oneshot; use tokio::task::JoinHandle; use torrust_tracker_configuration::HealthCheckApi; +use tracing::debug; use super::Started; use crate::app::Registry; @@ -46,18 +46,18 @@ pub async fn start_job(config: &HealthCheckApi, register: Registry) -> JoinHandl // Run the API server let join_handle = tokio::spawn(async move { - info!("Starting Health Check API server: http://{}", bind_addr); + debug!("Starting Health Check API server: http://{}", bind_addr); let handle = server::start(bind_addr, tx_start, rx_halt, register); if let Ok(()) = handle.await { - info!("Health Check API server on http://{} stopped", bind_addr); + debug!("Health Check API server on http://{} stopped", bind_addr); } }); // Wait until the API server job is running match rx_start.await { - Ok(msg) => info!("Torrust Health Check API server started on socket: {}", msg.address), + Ok(msg) => debug!("Torrust Health Check API server started on socket: {}", msg.address), Err(e) => panic!("the Health Check API server was dropped: {e}"), } diff --git a/src/bootstrap/jobs/http_tracker.rs b/src/bootstrap/jobs/http_tracker.rs index 41565d3cc..4a732d454 100644 --- a/src/bootstrap/jobs/http_tracker.rs +++ b/src/bootstrap/jobs/http_tracker.rs @@ -14,9 +14,9 @@ use std::net::SocketAddr; use std::sync::Arc; use axum_server::tls_rustls::RustlsConfig; -use log::info; use tokio::task::JoinHandle; use torrust_tracker_configuration::HttpTracker; +use tracing::debug; use super::make_rust_tls; use crate::app::RegistrationForm; @@ -53,7 +53,7 @@ pub async fn start_job( Version::V1 => Some(start_v1(socket, tls, tracker.clone(), form).await), } } else { - info!("Note: Not loading Http Tracker Service, Not Enabled in Configuration."); + debug!("Note: Not loading Http Tracker Service, Not Enabled in Configuration."); None } } diff --git a/src/bootstrap/jobs/mod.rs b/src/bootstrap/jobs/mod.rs index 3a9936882..35219ca75 100644 --- a/src/bootstrap/jobs/mod.rs +++ b/src/bootstrap/jobs/mod.rs @@ -22,13 +22,13 @@ pub struct Started { pub async fn make_rust_tls(enabled: bool, cert: &Option, key: &Option) -> Option> { if !enabled { - info!("tls not enabled"); + debug!("tls not enabled"); return None; } if let (Some(cert), Some(key)) = (cert, key) { - info!("Using https: cert path: {cert}."); - info!("Using https: key path: {cert}."); + debug!("Using https: cert path: {cert}."); + debug!("Using https: key path: {cert}."); Some( RustlsConfig::from_pem_file(cert, key) @@ -77,9 +77,9 @@ use std::panic::Location; use std::sync::Arc; use axum_server::tls_rustls::RustlsConfig; -use log::info; use thiserror::Error; use torrust_tracker_located_error::{DynError, LocatedError}; +use tracing::debug; /// Error returned by the Bootstrap Process. #[derive(Error, Debug)] diff --git a/src/bootstrap/jobs/torrent_cleanup.rs b/src/bootstrap/jobs/torrent_cleanup.rs index d3b084d31..d3bc4c298 100644 --- a/src/bootstrap/jobs/torrent_cleanup.rs +++ b/src/bootstrap/jobs/torrent_cleanup.rs @@ -13,9 +13,9 @@ use std::sync::Arc; use chrono::Utc; -use log::info; use tokio::task::JoinHandle; use torrust_tracker_configuration::Configuration; +use tracing::debug; use crate::core; @@ -37,15 +37,15 @@ pub fn start_job(config: &Arc, tracker: &Arc) -> J loop { tokio::select! { _ = tokio::signal::ctrl_c() => { - info!("Stopping torrent cleanup job.."); + debug!("Stopping torrent cleanup job.."); break; } _ = interval.tick() => { if let Some(tracker) = weak_tracker.upgrade() { let start_time = Utc::now().time(); - info!("Cleaning up torrents.."); + debug!("Cleaning up torrents.."); tracker.cleanup_torrents().await; - info!("Cleaned up torrents in: {}ms", (Utc::now().time() - start_time).num_milliseconds()); + debug!("Cleaned up torrents in: {}ms", (Utc::now().time() - start_time).num_milliseconds()); } else { break; } diff --git a/src/bootstrap/jobs/tracker_apis.rs b/src/bootstrap/jobs/tracker_apis.rs index 940297012..698f9f978 100644 --- a/src/bootstrap/jobs/tracker_apis.rs +++ b/src/bootstrap/jobs/tracker_apis.rs @@ -24,9 +24,9 @@ use std::net::SocketAddr; use std::sync::Arc; use axum_server::tls_rustls::RustlsConfig; -use log::info; use tokio::task::JoinHandle; use torrust_tracker_configuration::{AccessTokens, HttpApi}; +use tracing::debug; use super::make_rust_tls; use crate::app::RegistrationForm; @@ -76,7 +76,7 @@ pub async fn start_job( Version::V1 => Some(start_v1(bind_to, tls, tracker.clone(), form, access_tokens).await), } } else { - info!("Note: Not loading Http Tracker Service, Not Enabled in Configuration."); + debug!("Note: Not loading Http Tracker Service, Not Enabled in Configuration."); None } } diff --git a/src/bootstrap/logging.rs b/src/bootstrap/logging.rs index 97e26919d..bd86ad318 100644 --- a/src/bootstrap/logging.rs +++ b/src/bootstrap/logging.rs @@ -10,51 +10,41 @@ //! - `Trace` //! //! Refer to the [configuration crate documentation](https://docs.rs/torrust-tracker-configuration) to know how to change log settings. -use std::str::FromStr; use std::sync::Once; -use log::{info, LevelFilter}; use torrust_tracker_configuration::Configuration; +use tracing::level_filters::LevelFilter; static INIT: Once = Once::new(); /// It redirects the log info to the standard output with the log level defined in the configuration pub fn setup(cfg: &Configuration) { - let level = config_level_or_default(&cfg.log_level); + let filter = config_level_or_default(&cfg.log_level); - if level == log::LevelFilter::Off { + if filter == LevelFilter::OFF { return; } INIT.call_once(|| { - stdout_config(level); + tracing_subscriber::fmt() + .pretty() + .with_thread_names(true) + .with_max_level(filter) + // sets this to be the default, global collector for this application. + .init(); }); } fn config_level_or_default(log_level: &Option) -> LevelFilter { match log_level { - None => log::LevelFilter::Info, - Some(level) => LevelFilter::from_str(level).unwrap(), + None => LevelFilter::WARN, + Some(ref level) => match level.as_str() { + "Off" => LevelFilter::OFF, + "Error" => LevelFilter::ERROR, + "Warn" => LevelFilter::WARN, + "Debug" => LevelFilter::DEBUG, + "Trace" => LevelFilter::TRACE, + _ => LevelFilter::INFO, + }, } } - -fn stdout_config(level: LevelFilter) { - if let Err(_err) = fern::Dispatch::new() - .format(|out, message, record| { - out.finish(format_args!( - "{} [{}][{}] {}", - chrono::Local::now().format("%+"), - record.target(), - record.level(), - message - )); - }) - .level(level) - .chain(std::io::stdout()) - .apply() - { - panic!("Failed to initialize logging.") - } - - info!("logging initialized."); -} diff --git a/src/core/auth.rs b/src/core/auth.rs index 9fc9d6e7b..682ded126 100644 --- a/src/core/auth.rs +++ b/src/core/auth.rs @@ -42,12 +42,12 @@ use std::sync::Arc; use std::time::Duration; use derive_more::Display; -use log::debug; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; use thiserror::Error; use torrust_tracker_located_error::{DynError, LocatedError}; +use tracing::debug; use crate::shared::bit_torrent::common::AUTH_KEY_LENGTH; use crate::shared::clock::{convert_from_timestamp_to_datetime_utc, Current, DurationSinceUnixEpoch, Time, TimeNow}; diff --git a/src/core/databases/mysql.rs b/src/core/databases/mysql.rs index c46300829..7f9e43b18 100644 --- a/src/core/databases/mysql.rs +++ b/src/core/databases/mysql.rs @@ -3,12 +3,12 @@ use std::str::FromStr; use std::time::Duration; use async_trait::async_trait; -use log::debug; use r2d2::Pool; use r2d2_mysql::mysql::prelude::Queryable; use r2d2_mysql::mysql::{params, Opts, OptsBuilder}; use r2d2_mysql::MySqlConnectionManager; use torrust_tracker_primitives::DatabaseDriver; +use tracing::debug; use super::{Database, Error}; use crate::core::auth::{self, Key}; diff --git a/src/core/mod.rs b/src/core/mod.rs index 27682d94c..39b7e196e 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -447,10 +447,10 @@ use std::time::Duration; use derive_more::Constructor; use futures::future::join_all; -use log::debug; use tokio::sync::mpsc::error::SendError; use torrust_tracker_configuration::Configuration; use torrust_tracker_primitives::TrackerMode; +use tracing::debug; use self::auth::Key; use self::error::Error; diff --git a/src/core/statistics.rs b/src/core/statistics.rs index f38662cdd..d7192f5d1 100644 --- a/src/core/statistics.rs +++ b/src/core/statistics.rs @@ -20,11 +20,11 @@ use std::sync::Arc; use async_trait::async_trait; -use log::debug; #[cfg(test)] use mockall::{automock, predicate::str}; use tokio::sync::mpsc::error::SendError; use tokio::sync::{mpsc, RwLock, RwLockReadGuard}; +use tracing::debug; const CHANNEL_BUFFER_SIZE: usize = 65_535; diff --git a/src/core/torrent/mod.rs b/src/core/torrent/mod.rs index ea0bb4fdb..e98f46665 100644 --- a/src/core/torrent/mod.rs +++ b/src/core/torrent/mod.rs @@ -33,8 +33,8 @@ pub mod repository; use std::time::Duration; use aquatic_udp_protocol::AnnounceEvent; -use log::debug; use serde::{Deserialize, Serialize}; +use tracing::debug; use super::peer::{self, Peer}; use crate::shared::clock::{Current, TimeNow}; diff --git a/src/main.rs b/src/main.rs index 87c0fc367..2354e8b51 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ -use log::info; use torrust_tracker::{app, bootstrap}; +use tracing::info; #[tokio::main] async fn main() { diff --git a/src/servers/apis/v1/context/torrent/handlers.rs b/src/servers/apis/v1/context/torrent/handlers.rs index 101a25c8d..036b1764d 100644 --- a/src/servers/apis/v1/context/torrent/handlers.rs +++ b/src/servers/apis/v1/context/torrent/handlers.rs @@ -6,8 +6,8 @@ use std::sync::Arc; use axum::extract::{Path, Query, State}; use axum::response::{IntoResponse, Json, Response}; -use log::debug; use serde::{de, Deserialize, Deserializer}; +use tracing::debug; use super::resources::torrent::ListItem; use super::responses::{torrent_info_response, torrent_list_response, torrent_not_known_response}; diff --git a/src/servers/http/v1/handlers/announce.rs b/src/servers/http/v1/handlers/announce.rs index 3e41f6427..d496d64aa 100644 --- a/src/servers/http/v1/handlers/announce.rs +++ b/src/servers/http/v1/handlers/announce.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; use axum::extract::State; use axum::response::{IntoResponse, Response}; -use log::debug; +use tracing::debug; use crate::core::auth::Key; use crate::core::peer::Peer; diff --git a/src/servers/http/v1/handlers/scrape.rs b/src/servers/http/v1/handlers/scrape.rs index 99c5ad94d..a5dc56b3b 100644 --- a/src/servers/http/v1/handlers/scrape.rs +++ b/src/servers/http/v1/handlers/scrape.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use axum::extract::State; use axum::response::{IntoResponse, Response}; -use log::debug; +use tracing::debug; use crate::core::auth::Key; use crate::core::{ScrapeData, Tracker}; diff --git a/src/servers/signals.rs b/src/servers/signals.rs index cb0675d65..3ecb9addd 100644 --- a/src/servers/signals.rs +++ b/src/servers/signals.rs @@ -2,8 +2,8 @@ use std::time::Duration; use derive_more::Display; -use log::info; use tokio::time::sleep; +use tracing::debug; /// This is the message that the "launcher" spawned task receives from the main /// application process to notify the service to shutdown. @@ -58,13 +58,13 @@ pub async fn shutdown_signal(rx_halt: tokio::sync::oneshot::Receiver) { pub async fn shutdown_signal_with_message(rx_halt: tokio::sync::oneshot::Receiver, message: String) { shutdown_signal(rx_halt).await; - info!("{message}"); + debug!("{message}"); } pub async fn graceful_shutdown(handle: axum_server::Handle, rx_halt: tokio::sync::oneshot::Receiver, message: String) { shutdown_signal_with_message(rx_halt, message).await; - info!("sending graceful shutdown signal"); + debug!("sending graceful shutdown signal"); handle.graceful_shutdown(Some(Duration::from_secs(90))); println!("!! shuting down in 90 seconds !!"); @@ -72,6 +72,6 @@ pub async fn graceful_shutdown(handle: axum_server::Handle, rx_halt: tokio::sync loop { sleep(Duration::from_secs(1)).await; - info!("remaining alive connections: {}", handle.connection_count()); + debug!("remaining alive connections: {}", handle.connection_count()); } } diff --git a/src/servers/udp/handlers.rs b/src/servers/udp/handlers.rs index 0f37d6436..4cb042392 100644 --- a/src/servers/udp/handlers.rs +++ b/src/servers/udp/handlers.rs @@ -7,10 +7,11 @@ use aquatic_udp_protocol::{ AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId, }; -use log::{debug, info}; use torrust_tracker_located_error::DynError; +use tracing::debug; use super::connection_cookie::{check, from_connection_id, into_connection_id, make}; +use super::UdpRequest; use crate::core::{statistics, ScrapeData, Tracker}; use crate::servers::udp::error::Error; use crate::servers::udp::peer_builder; @@ -27,10 +28,13 @@ use crate::shared::bit_torrent::info_hash::InfoHash; /// type. /// /// It will return an `Error` response if the request is invalid. -pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec, tracker: &Tracker) -> Response { - match Request::from_bytes(&payload[..payload.len()], MAX_SCRAPE_TORRENTS).map_err(|e| Error::InternalServer { - message: format!("{e:?}"), - location: Location::caller(), +pub(crate) async fn handle_packet(udp_request: UdpRequest, tracker: &Arc) -> Response { + debug!("Handling Packets: {udp_request:?}"); + match Request::from_bytes(&udp_request.payload[..udp_request.payload.len()], MAX_SCRAPE_TORRENTS).map_err(|e| { + Error::InternalServer { + message: format!("{e:?}"), + location: Location::caller(), + } }) { Ok(request) => { let transaction_id = match &request { @@ -39,7 +43,7 @@ pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec, tracker: & Request::Scrape(scrape_request) => scrape_request.transaction_id, }; - match handle_request(request, remote_addr, tracker).await { + match handle_request(request, udp_request.from, tracker).await { Ok(response) => response, Err(e) => handle_error(&e, transaction_id), } @@ -60,6 +64,8 @@ pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec, tracker: & /// /// If a error happens in the `handle_request` function, it will just return the `ServerError`. pub async fn handle_request(request: Request, remote_addr: SocketAddr, tracker: &Tracker) -> Result { + debug!("Handling Request: {request:?} to: {remote_addr:?}"); + match request { Request::Connect(connect_request) => handle_connect(remote_addr, &connect_request, tracker).await, Request::Announce(announce_request) => handle_announce(remote_addr, &announce_request, tracker).await, @@ -74,7 +80,7 @@ pub async fn handle_request(request: Request, remote_addr: SocketAddr, tracker: /// /// This function does not ever return an error. pub async fn handle_connect(remote_addr: SocketAddr, request: &ConnectRequest, tracker: &Tracker) -> Result { - info!(target: "UDP", "\"CONNECT TxID {}\"", request.transaction_id.0); + debug!(target: "UDP", "\"CONNECT TxID {}\"", request.transaction_id.0); debug!("udp connect request: {:#?}", request); let connection_cookie = make(&remote_addr); @@ -132,7 +138,7 @@ pub async fn handle_announce( source: (Arc::new(e) as Arc).into(), })?; - info!(target: "UDP", "\"ANNOUNCE TxID {} IH {}\"", announce_request.transaction_id.0, info_hash.to_hex_string()); + debug!(target: "UDP", "\"ANNOUNCE TxID {} IH {}\"", announce_request.transaction_id.0, info_hash.to_hex_string()); let mut peer = peer_builder::from_request(&wrapped_announce_request, &remote_client_ip); @@ -208,7 +214,7 @@ pub async fn handle_announce( /// /// This function does not ever return an error. pub async fn handle_scrape(remote_addr: SocketAddr, request: &ScrapeRequest, tracker: &Tracker) -> Result { - info!(target: "UDP", "\"SCRAPE TxID {}\"", request.transaction_id.0); + debug!(target: "UDP", "\"SCRAPE TxID {}\"", request.transaction_id.0); debug!("udp scrape request: {:#?}", request); // Convert from aquatic infohashes diff --git a/src/servers/udp/mod.rs b/src/servers/udp/mod.rs index 985c1cec7..8f54ee803 100644 --- a/src/servers/udp/mod.rs +++ b/src/servers/udp/mod.rs @@ -638,6 +638,13 @@ //! documentation by [Arvid Norberg](https://github.com/arvidn) was very //! supportive in the development of this documentation. Some descriptions were //! taken from the [libtorrent](https://www.rasterbar.com/products/libtorrent/udp_tracker_protocol.html). + +use std::net::SocketAddr; + +use tracing::trace; + +use crate::app::CheckJob; +use crate::shared::bit_torrent::udp::client; pub mod connection_cookie; pub mod error; pub mod handlers; @@ -652,3 +659,20 @@ pub type Port = u16; /// The transaction id. A random number generated byt the peer that is used to /// match requests and responses. pub type TransactionId = i64; + +#[derive(Clone, Debug)] +pub(crate) struct UdpRequest { + payload: Vec, + from: SocketAddr, +} + +fn check(binding: &SocketAddr) -> CheckJob { + trace!("Checking Service: {binding:?}."); + + let binding = *binding; + let info = format!("checking the udp tracker health check at: {binding}"); + + let job = tokio::spawn(async move { client::check(&binding).await }); + + CheckJob::new(binding, info, job) +} diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index b5df6164e..fe66a9b18 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -23,18 +23,21 @@ use std::sync::Arc; use aquatic_udp_protocol::Response; use derive_more::Constructor; -use futures::pin_mut; -use log::{debug, error, info}; +use ringbuf::storage::Static; +use ringbuf::traits::{Consumer, Observer, RingBuffer}; +use ringbuf::LocalRb; use tokio::net::UdpSocket; -use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::task::JoinHandle; +use tokio::sync::oneshot; +use tokio::task::{AbortHandle, JoinHandle}; +use tokio::{select, task}; +use tracing::{debug, error, trace}; -use crate::app::{CheckJob, Listing, RegistrationForm}; +use super::UdpRequest; +use crate::app::{Listing, RegistrationForm}; use crate::bootstrap::jobs::Started; use crate::core::Tracker; use crate::servers::signals::{shutdown_signal_with_message, Halted}; -use crate::servers::udp::handlers::handle_packet; -use crate::shared::bit_torrent::udp::client::check; +use crate::servers::udp::handlers; use crate::shared::bit_torrent::udp::MAX_PACKET_SIZE; /// Error that can occur when starting or stopping the UDP server. @@ -122,16 +125,11 @@ impl UdpServer { let (tx_start, rx_start) = tokio::sync::oneshot::channel::(); let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::(); - let launcher = self.state.launcher; - - let task = tokio::spawn(async move { - launcher.start(tracker, tx_start, rx_halt).await; - launcher - }); + let task = self.state.launcher.start(tracker, tx_start, rx_halt); let binding = rx_start.await.expect("it should be able to start the service").address; - form.send(Listing::new(binding, Udp::check)) + form.send(Listing::new(binding, super::check)) .expect("it should be able to send service registration"); let running_udp_server: UdpServer = UdpServer { @@ -142,7 +140,7 @@ impl UdpServer { }, }; - info!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding); + trace!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding); Ok(running_udp_server) } @@ -166,7 +164,7 @@ impl UdpServer { .send(Halted::Normal) .map_err(|e| Error::Error(e.to_string()))?; - let launcher = self.state.task.await.expect("unable to shutdown service"); + let launcher = self.state.task.await.expect("it should complete"); let stopped_api_server: UdpServer = UdpServer { state: Stopped { launcher }, @@ -176,7 +174,7 @@ impl UdpServer { } } -#[derive(Constructor, Debug)] +#[derive(Constructor, Copy, Clone, Debug)] pub struct Launcher { bind_to: SocketAddr, } @@ -187,14 +185,48 @@ impl Launcher { /// # Panics /// /// It would panic if unable to resolve the `local_addr` from the supplied ´socket´. - pub async fn start(&self, tracker: Arc, tx_start: Sender, rx_halt: Receiver) -> JoinHandle<()> { - Udp::start_with_graceful_shutdown(tracker, self.bind_to, tx_start, rx_halt).await + pub fn start( + &self, + tracker: Arc, + tx_start: oneshot::Sender, + rx_halt: oneshot::Receiver, + ) -> JoinHandle { + let launcher = Launcher::new(self.bind_to); + tokio::spawn(async move { + Udp::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await; + launcher + }) + } +} + +#[derive(Default)] +struct ActiveRequests { + rb: LocalRb>, // the number of requests we handle at the same time. +} + +impl std::fmt::Debug for ActiveRequests { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let (left, right) = &self.rb.as_slices(); + let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity()); + f.debug_struct("ActiveRequests").field("rb", &dbg).finish() + } +} + +impl Drop for ActiveRequests { + fn drop(&mut self) { + for h in self.rb.pop_iter() { + if !h.is_finished() { + h.abort(); + } + } } } /// A UDP server instance launcher. #[derive(Constructor)] -pub struct Udp; +pub struct Udp { + // shutdown: bool, +} impl Udp { /// It starts the UDP server instance with graceful shutdown. @@ -203,38 +235,31 @@ impl Udp { /// /// It panics if unable to bind to udp socket, and get the address from the udp socket. /// It also panics if unable to send address of socket. - async fn start_with_graceful_shutdown( + async fn run_with_graceful_shutdown( tracker: Arc, bind_to: SocketAddr, - tx_start: Sender, - rx_halt: Receiver, - ) -> JoinHandle<()> { - let binding = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}.")); - let address = binding.local_addr().expect("Could not get local_addr from {binding}."); - + tx_start: oneshot::Sender, + rx_halt: oneshot::Receiver, + ) { + let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}.")); + let address = socket.local_addr().expect("Could not get local_addr from {binding}."); let halt = shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}")); let running = tokio::task::spawn(async move { - pin_mut!(halt); - - loop { - let mut data = [0; MAX_PACKET_SIZE]; - let binding = binding.clone(); - - tokio::select! { - Ok((valid_bytes, remote_addr)) = binding.recv_from(&mut data) => { - let payload = data[..valid_bytes].to_vec(); - - debug!("Received {} bytes", payload.len()); - debug!("From: {}", &remote_addr); - debug!("Payload: {:?}", payload); + let tracker = tracker.clone(); + let socket = socket.clone(); - let response = handle_packet(remote_addr, payload, &tracker).await; + let reqs = &mut ActiveRequests::default(); - Udp::send_response(binding, remote_addr, response).await; - }, - - () = & mut halt => {}, + loop { + let old = reqs.rb.push_overwrite( + Self::do_request(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone()).abort_handle(), + ); + + if let Some(h) = old { + if !h.is_finished() { + h.abort(); + } } } }); @@ -243,10 +268,62 @@ impl Udp { .send(Started { address }) .expect("the UDP Tracker service should not be dropped"); - running + let stop = running.abort_handle(); + + select! { + _ = running => {}, + () = halt => {} + } + stop.abort(); + + task::yield_now().await; // lets allow the other threads to complete. } - async fn send_response(socket: Arc, remote_addr: SocketAddr, response: Response) { + async fn receive_request(socket: Arc) -> Result> { + // Wait for the socket to be readable + socket.readable().await?; + + let mut buf = Vec::with_capacity(MAX_PACKET_SIZE); + + match socket.recv_buf_from(&mut buf).await { + Ok((n, from)) => { + Vec::truncate(&mut buf, n); + trace!("GOT {buf:?}"); + Ok(UdpRequest { payload: buf, from }) + } + + Err(e) => Err(Box::new(e)), + } + } + + fn do_request( + result: Result>, + tracker: Arc, + socket: Arc, + ) -> JoinHandle<()> { + tokio::task::spawn(async move { + match result { + Ok(udp_request) => { + trace!("Received Request from: {}", udp_request.from); + Self::make_response(tracker.clone(), socket.clone(), udp_request).await; + } + Err(error) => { + debug!("error: {error}"); + } + } + }) + } + + async fn make_response(tracker: Arc, socket: Arc, udp_request: UdpRequest) { + trace!("Making Response to {udp_request:?}"); + let from = udp_request.from; + let response = handlers::handle_packet(udp_request, &tracker.clone()).await; + Self::send_response(&socket.clone(), from, response).await; + } + + async fn send_response(socket: &Arc, to: SocketAddr, response: Response) { + trace!("Sending Response: {response:?} to: {to:?}"); + let buffer = vec![0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(buffer); @@ -257,10 +334,10 @@ impl Udp { let inner = cursor.get_ref(); debug!("Sending {} bytes ...", &inner[..position].len()); - debug!("To: {:?}", &remote_addr); + debug!("To: {:?}", &to); debug!("Payload: {:?}", &inner[..position]); - Udp::send_packet(socket, &remote_addr, &inner[..position]).await; + Self::send_packet(socket, &to, &inner[..position]).await; debug!("{} bytes sent", &inner[..position].len()); } @@ -270,25 +347,20 @@ impl Udp { } } - async fn send_packet(socket: Arc, remote_addr: &SocketAddr, payload: &[u8]) { + async fn send_packet(socket: &Arc, remote_addr: &SocketAddr, payload: &[u8]) { + trace!("Sending Packets: {payload:?} to: {remote_addr:?}"); + // doesn't matter if it reaches or not drop(socket.send_to(payload, remote_addr).await); } - - fn check(binding: &SocketAddr) -> CheckJob { - let binding = *binding; - let info = format!("checking the udp tracker health check at: {binding}"); - - let job = tokio::spawn(async move { check(&binding).await }); - - CheckJob::new(binding, info, job) - } } #[cfg(test)] mod tests { use std::sync::Arc; + use std::time::Duration; + use tokio::time::sleep; use torrust_tracker_test_helpers::configuration::ephemeral_mode_public; use crate::app::Registar; @@ -315,6 +387,8 @@ mod tests { .expect("it should start the server"); let stopped = started.stop().await.expect("it should stop the server"); + sleep(Duration::from_secs(1)).await; + assert_eq!(stopped.state.launcher.bind_to, bind_to); } } diff --git a/src/shared/bit_torrent/udp/client.rs b/src/shared/bit_torrent/udp/client.rs index 5d1e3accb..563063f09 100644 --- a/src/shared/bit_torrent/udp/client.rs +++ b/src/shared/bit_torrent/udp/client.rs @@ -6,6 +6,7 @@ use std::time::Duration; use aquatic_udp_protocol::{ConnectRequest, Request, Response, TransactionId}; use tokio::net::UdpSocket; use tokio::time; +use tracing::debug; use crate::shared::bit_torrent::udp::{source_address, MAX_PACKET_SIZE}; @@ -117,6 +118,8 @@ pub async fn new_udp_tracker_client_connected(remote_address: &str) -> UdpTracke /// /// # Panics pub async fn check(binding: &SocketAddr) -> Result { + debug!("Checking Service (detail): {binding:?}."); + let client = new_udp_tracker_client_connected(binding.to_string().as_str()).await; let connect_request = ConnectRequest { diff --git a/tests/servers/health_check_api/contract.rs b/tests/servers/health_check_api/contract.rs index 6af8673ee..2764775f9 100644 --- a/tests/servers/health_check_api/contract.rs +++ b/tests/servers/health_check_api/contract.rs @@ -77,10 +77,9 @@ async fn health_check_endpoint_should_return_status_err_when_a_service_was_regis let udp = udp::Started::new(&configuration).await; let udp_binding = udp.bind_address(); - let registar = udp.registar.clone(); - udp.server.stop().await.expect("it should stop udp server"); + udp.stop().await; { let config = configuration.health_check_api.clone(); diff --git a/tests/servers/udp/contract.rs b/tests/servers/udp/contract.rs index d5c11082c..ab2b8bf60 100644 --- a/tests/servers/udp/contract.rs +++ b/tests/servers/udp/contract.rs @@ -47,6 +47,8 @@ async fn should_return_a_bad_request_response_when_the_client_sends_an_empty_req let response = Response::from_bytes(&buffer, true).unwrap(); assert!(is_error_response(&response, "bad request")); + + env.stop().await; } mod receiving_a_connection_request { @@ -72,6 +74,8 @@ mod receiving_a_connection_request { let response = client.receive().await; assert!(is_connect_response(&response, TransactionId(123))); + + env.stop().await; } } @@ -119,6 +123,8 @@ mod receiving_an_announce_request { let response = client.receive().await; assert!(is_ipv4_announce_response(&response)); + + env.stop().await; } } @@ -156,5 +162,7 @@ mod receiving_an_scrape_request { let response = client.receive().await; assert!(is_scrape_response(&response)); + + env.stop().await; } } diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index 17c0a0a21..1707d594f 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -68,7 +68,7 @@ impl Environment { config: self.config, tracker: self.tracker, registar: Registar::default(), - server: self.server.stop().await.unwrap(), + server: self.server.stop().await.expect("it stop the udp tracker service"), } } @@ -76,3 +76,21 @@ impl Environment { self.server.state.binding } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::time::sleep; + use torrust_tracker_test_helpers::configuration; + + use crate::servers::udp::Started; + + #[tokio::test] + async fn it_should_make_and_stop_udp_server() { + let env = Started::new(&configuration::ephemeral().into()).await; + sleep(Duration::from_secs(1)).await; + env.stop().await; + sleep(Duration::from_secs(1)).await; + } +}