diff --git a/Cargo.lock b/Cargo.lock index 83afae727..71140b9f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4450,6 +4450,7 @@ dependencies = [ "torrust-tracker-configuration", "torrust-tracker-primitives", "torrust-tracker-test-helpers", + "torrust-udp-tracker-server", "tower 0.5.2", "tower-http", "tracing", @@ -4525,6 +4526,7 @@ dependencies = [ "torrust-tracker-configuration", "torrust-tracker-primitives", "torrust-tracker-test-helpers", + "torrust-udp-tracker-server", ] [[package]] diff --git a/packages/axum-tracker-api-server/Cargo.toml b/packages/axum-tracker-api-server/Cargo.toml index 480ee2a54..e1deb9b8a 100644 --- a/packages/axum-tracker-api-server/Cargo.toml +++ b/packages/axum-tracker-api-server/Cargo.toml @@ -38,6 +38,7 @@ torrust-tracker-api-core = { version = "3.0.0-develop", path = "../tracker-api-c torrust-tracker-clock = { version = "3.0.0-develop", path = "../clock" } torrust-tracker-configuration = { version = "3.0.0-develop", path = "../configuration" } torrust-tracker-primitives = { version = "3.0.0-develop", path = "../primitives" } +torrust-udp-tracker-server = { version = "3.0.0-develop", path = "../udp-tracker-server" } tower = { version = "0", features = ["timeout"] } tower-http = { version = "0", features = ["compression-full", "cors", "propagate-header", "request-id", "trace"] } tracing = "0" diff --git a/packages/axum-tracker-api-server/src/environment.rs b/packages/axum-tracker-api-server/src/environment.rs index f6d6fb4e4..7390bc659 100644 --- a/packages/axum-tracker-api-server/src/environment.rs +++ b/packages/axum-tracker-api-server/src/environment.rs @@ -12,6 +12,7 @@ use torrust_tracker_api_client::connection_info::{ConnectionInfo, Origin}; use torrust_tracker_api_core::container::TrackerHttpApiCoreContainer; use torrust_tracker_configuration::{logging, Configuration}; use torrust_tracker_primitives::peer; +use torrust_udp_tracker_server::container::UdpTrackerServerContainer; use crate::server::{ApiServer, Launcher, Running, Stopped}; @@ -175,11 +176,13 @@ impl EnvContainer { let http_tracker_core_container = HttpTrackerCoreContainer::initialize_from(&tracker_core_container, &http_tracker_config); let udp_tracker_core_container = UdpTrackerCoreContainer::initialize_from(&tracker_core_container, &udp_tracker_config); + let udp_tracker_server_container = UdpTrackerServerContainer::initialize(&core_config); let tracker_http_api_core_container = TrackerHttpApiCoreContainer::initialize_from( &tracker_core_container, &http_tracker_core_container, &udp_tracker_core_container, + &udp_tracker_server_container, &http_api_config, ); diff --git a/packages/axum-tracker-api-server/src/v1/context/stats/handlers.rs b/packages/axum-tracker-api-server/src/v1/context/stats/handlers.rs index e0149cb23..5e23211a6 100644 --- a/packages/axum-tracker-api-server/src/v1/context/stats/handlers.rs +++ b/packages/axum-tracker-api-server/src/v1/context/stats/handlers.rs @@ -44,10 +44,18 @@ pub async fn get_stats_handler( Arc>, Arc, Arc, + Arc, )>, params: Query, ) -> Response { - let metrics = get_metrics(state.0.clone(), state.1.clone(), state.2.clone(), state.3.clone()).await; + let metrics = get_metrics( + state.0.clone(), + state.1.clone(), + state.2.clone(), + state.3.clone(), + state.4.clone(), + ) + .await; match params.0.format { Some(format) => match format { diff --git a/packages/axum-tracker-api-server/src/v1/context/stats/routes.rs b/packages/axum-tracker-api-server/src/v1/context/stats/routes.rs index e73de8625..6caaf13bf 100644 --- a/packages/axum-tracker-api-server/src/v1/context/stats/routes.rs +++ b/packages/axum-tracker-api-server/src/v1/context/stats/routes.rs @@ -19,7 +19,8 @@ pub fn add(prefix: &str, router: Router, http_api_container: &Arc>, - pub udp_stats_repository: Arc, + pub udp_core_stats_repository: Arc, + + // todo: replace with UdpTrackerServerContainer + pub udp_server_stats_repository: Arc, pub http_api_config: Arc, } @@ -39,11 +43,13 @@ impl TrackerHttpApiCoreContainer { let tracker_core_container = Arc::new(TrackerCoreContainer::initialize(core_config)); let http_tracker_core_container = HttpTrackerCoreContainer::initialize_from(&tracker_core_container, http_tracker_config); let udp_tracker_core_container = UdpTrackerCoreContainer::initialize_from(&tracker_core_container, udp_tracker_config); + let udp_tracker_server_container = UdpTrackerServerContainer::initialize(core_config); Self::initialize_from( &tracker_core_container, &http_tracker_core_container, &udp_tracker_core_container, + &udp_tracker_server_container, http_api_config, ) } @@ -53,6 +59,7 @@ impl TrackerHttpApiCoreContainer { tracker_core_container: &Arc, http_tracker_core_container: &Arc, udp_tracker_core_container: &Arc, + udp_tracker_server_container: &Arc, http_api_config: &Arc, ) -> Arc { Arc::new(TrackerHttpApiCoreContainer { @@ -64,7 +71,9 @@ impl TrackerHttpApiCoreContainer { http_stats_repository: http_tracker_core_container.http_stats_repository.clone(), ban_service: udp_tracker_core_container.ban_service.clone(), - udp_stats_repository: udp_tracker_core_container.udp_stats_repository.clone(), + udp_core_stats_repository: udp_tracker_core_container.udp_core_stats_repository.clone(), + + udp_server_stats_repository: udp_tracker_server_container.udp_server_stats_repository.clone(), http_api_config: http_api_config.clone(), }) diff --git a/packages/tracker-api-core/src/statistics/services.rs b/packages/tracker-api-core/src/statistics/services.rs index 178c8ca0f..c4dfcf533 100644 --- a/packages/tracker-api-core/src/statistics/services.rs +++ b/packages/tracker-api-core/src/statistics/services.rs @@ -2,9 +2,10 @@ use std::sync::Arc; use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository; use bittorrent_udp_tracker_core::services::banning::BanService; -use bittorrent_udp_tracker_core::{self, statistics}; +use bittorrent_udp_tracker_core::{self, statistics as udp_core_statistics}; use tokio::sync::RwLock; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; +use torrust_udp_tracker_server::statistics as udp_server_statistics; use crate::statistics::metrics::Metrics; @@ -27,12 +28,14 @@ pub async fn get_metrics( in_memory_torrent_repository: Arc, ban_service: Arc>, http_stats_repository: Arc, - udp_stats_repository: Arc, + udp_core_stats_repository: Arc, + udp_server_stats_repository: Arc, ) -> TrackerMetrics { let torrents_metrics = in_memory_torrent_repository.get_torrents_metrics(); let udp_banned_ips_total = ban_service.read().await.get_banned_ips_total(); let http_stats = http_stats_repository.get_stats().await; - let udp_stats = udp_stats_repository.get_stats().await; + let udp_core_stats = udp_core_stats_repository.get_stats().await; + let udp_server_stats = udp_server_stats_repository.get_stats().await; TrackerMetrics { torrents_metrics, @@ -46,26 +49,26 @@ pub async fn get_metrics( tcp6_announces_handled: http_stats.tcp6_announces_handled, tcp6_scrapes_handled: http_stats.tcp6_scrapes_handled, // UDP - udp_requests_aborted: udp_stats.udp_requests_aborted, - udp_requests_banned: udp_stats.udp_requests_banned, + udp_requests_aborted: udp_server_stats.udp_requests_aborted, + udp_requests_banned: udp_server_stats.udp_requests_banned, udp_banned_ips_total: udp_banned_ips_total as u64, - udp_avg_connect_processing_time_ns: udp_stats.udp_avg_connect_processing_time_ns, - udp_avg_announce_processing_time_ns: udp_stats.udp_avg_announce_processing_time_ns, - udp_avg_scrape_processing_time_ns: udp_stats.udp_avg_scrape_processing_time_ns, + udp_avg_connect_processing_time_ns: udp_server_stats.udp_avg_connect_processing_time_ns, + udp_avg_announce_processing_time_ns: udp_server_stats.udp_avg_announce_processing_time_ns, + udp_avg_scrape_processing_time_ns: udp_server_stats.udp_avg_scrape_processing_time_ns, // UDPv4 - udp4_requests: udp_stats.udp4_requests, - udp4_connections_handled: udp_stats.udp4_connections_handled, - udp4_announces_handled: udp_stats.udp4_announces_handled, - udp4_scrapes_handled: udp_stats.udp4_scrapes_handled, - udp4_responses: udp_stats.udp4_responses, - udp4_errors_handled: udp_stats.udp4_errors_handled, + udp4_requests: udp_server_stats.udp4_requests, + udp4_connections_handled: udp_core_stats.udp4_connections_handled, + udp4_announces_handled: udp_core_stats.udp4_announces_handled, + udp4_scrapes_handled: udp_core_stats.udp4_scrapes_handled, + udp4_responses: udp_server_stats.udp4_responses, + udp4_errors_handled: udp_server_stats.udp4_errors_handled, // UDPv6 - udp6_requests: udp_stats.udp6_requests, - udp6_connections_handled: udp_stats.udp6_connections_handled, - udp6_announces_handled: udp_stats.udp6_announces_handled, - udp6_scrapes_handled: udp_stats.udp6_scrapes_handled, - udp6_responses: udp_stats.udp6_responses, - udp6_errors_handled: udp_stats.udp6_errors_handled, + udp6_requests: udp_server_stats.udp6_requests, + udp6_connections_handled: udp_core_stats.udp6_connections_handled, + udp6_announces_handled: udp_core_stats.udp6_announces_handled, + udp6_scrapes_handled: udp_core_stats.udp6_scrapes_handled, + udp6_responses: udp_server_stats.udp6_responses, + udp6_errors_handled: udp_server_stats.udp6_errors_handled, }, } } @@ -97,21 +100,27 @@ mod tests { let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); - // HTTP stats + // HTTP core stats let (_http_stats_event_sender, http_stats_repository) = bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics); let http_stats_repository = Arc::new(http_stats_repository); - // UDP stats + // UDP core stats let (_udp_stats_event_sender, udp_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics); let udp_stats_repository = Arc::new(udp_stats_repository); + // UDP server stats + let (_udp_server_stats_event_sender, udp_server_stats_repository) = + torrust_udp_tracker_server::statistics::setup::factory(config.core.tracker_usage_statistics); + let udp_server_stats_repository = Arc::new(udp_server_stats_repository); + let tracker_metrics = get_metrics( in_memory_torrent_repository.clone(), ban_service.clone(), http_stats_repository.clone(), udp_stats_repository.clone(), + udp_server_stats_repository.clone(), ) .await; diff --git a/packages/udp-tracker-core/src/container.rs b/packages/udp-tracker-core/src/container.rs index 62378e0af..1467134c5 100644 --- a/packages/udp-tracker-core/src/container.rs +++ b/packages/udp-tracker-core/src/container.rs @@ -18,8 +18,8 @@ pub struct UdpTrackerCoreContainer { pub whitelist_authorization: Arc, pub udp_tracker_config: Arc, - pub udp_stats_event_sender: Arc>>, - pub udp_stats_repository: Arc, + pub udp_core_stats_event_sender: Arc>>, + pub udp_core_stats_repository: Arc, pub ban_service: Arc>, } @@ -35,10 +35,10 @@ impl UdpTrackerCoreContainer { tracker_core_container: &Arc, udp_tracker_config: &Arc, ) -> Arc { - let (udp_stats_event_sender, udp_stats_repository) = + let (udp_core_stats_event_sender, udp_core_stats_repository) = statistics::setup::factory(tracker_core_container.core_config.tracker_usage_statistics); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); - let udp_stats_repository = Arc::new(udp_stats_repository); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + let udp_core_stats_repository = Arc::new(udp_core_stats_repository); let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); @@ -49,8 +49,8 @@ impl UdpTrackerCoreContainer { whitelist_authorization: tracker_core_container.whitelist_authorization.clone(), udp_tracker_config: udp_tracker_config.clone(), - udp_stats_event_sender: udp_stats_event_sender.clone(), - udp_stats_repository: udp_stats_repository.clone(), + udp_core_stats_event_sender: udp_core_stats_event_sender.clone(), + udp_core_stats_repository: udp_core_stats_repository.clone(), ban_service: ban_service.clone(), }) } diff --git a/packages/udp-tracker-core/src/services/connect.rs b/packages/udp-tracker-core/src/services/connect.rs index 9cb419bbc..3354595e5 100644 --- a/packages/udp-tracker-core/src/services/connect.rs +++ b/packages/udp-tracker-core/src/services/connect.rs @@ -55,10 +55,10 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() { - let (udp_stats_event_sender, _udp_stats_repository) = statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); - let response = handle_connect(sample_ipv4_remote_addr(), &udp_stats_event_sender, sample_issue_time()).await; + let response = handle_connect(sample_ipv4_remote_addr(), &udp_core_stats_event_sender, sample_issue_time()).await; assert_eq!( response, @@ -68,10 +68,10 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id() { - let (udp_stats_event_sender, _udp_stats_repository) = statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); - let response = handle_connect(sample_ipv4_remote_addr(), &udp_stats_event_sender, sample_issue_time()).await; + let response = handle_connect(sample_ipv4_remote_addr(), &udp_core_stats_event_sender, sample_issue_time()).await; assert_eq!( response, @@ -81,10 +81,10 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id_ipv6() { - let (udp_stats_event_sender, _udp_stats_repository) = statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = statistics::setup::factory(false); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); - let response = handle_connect(sample_ipv6_remote_addr(), &udp_stats_event_sender, sample_issue_time()).await; + let response = handle_connect(sample_ipv6_remote_addr(), &udp_core_stats_event_sender, sample_issue_time()).await; assert_eq!( response, diff --git a/packages/udp-tracker-core/src/statistics/event/handler.rs b/packages/udp-tracker-core/src/statistics/event/handler.rs index 91be32ad1..096059b91 100644 --- a/packages/udp-tracker-core/src/statistics/event/handler.rs +++ b/packages/udp-tracker-core/src/statistics/event/handler.rs @@ -1,20 +1,9 @@ -use crate::statistics::event::{Event, UdpResponseKind}; +use crate::statistics::event::Event; use crate::statistics::repository::Repository; pub async fn handle_event(event: Event, stats_repository: &Repository) { match event { - // UDP - Event::UdpRequestAborted => { - stats_repository.increase_udp_requests_aborted().await; - } - Event::UdpRequestBanned => { - stats_repository.increase_udp_requests_banned().await; - } - // UDP4 - Event::Udp4Request => { - stats_repository.increase_udp4_requests().await; - } Event::Udp4Connect => { stats_repository.increase_udp4_connections().await; } @@ -24,39 +13,8 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { Event::Udp4Scrape => { stats_repository.increase_udp4_scrapes().await; } - Event::Udp4Response { - kind, - req_processing_time, - } => { - stats_repository.increase_udp4_responses().await; - - match kind { - UdpResponseKind::Connect => { - stats_repository - .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) - .await; - } - UdpResponseKind::Announce => { - stats_repository - .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) - .await; - } - UdpResponseKind::Scrape => { - stats_repository - .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) - .await; - } - UdpResponseKind::Error => {} - } - } - Event::Udp4Error => { - stats_repository.increase_udp4_errors().await; - } // UDP6 - Event::Udp6Request => { - stats_repository.increase_udp6_requests().await; - } Event::Udp6Connect => { stats_repository.increase_udp6_connections().await; } @@ -66,15 +24,6 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { Event::Udp6Scrape => { stats_repository.increase_udp6_scrapes().await; } - Event::Udp6Response { - kind: _, - req_processing_time: _, - } => { - stats_repository.increase_udp6_responses().await; - } - Event::Udp6Error => { - stats_repository.increase_udp6_errors().await; - } } tracing::debug!("stats: {:?}", stats_repository.get_stats().await); @@ -151,100 +100,4 @@ mod tests { assert_eq!(stats.udp6_scrapes_handled, 1); } - - #[tokio::test] - async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() { - let stats_repository = Repository::new(); - - handle_event(Event::UdpRequestAborted, &stats_repository).await; - let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_aborted, 1); - } - #[tokio::test] - async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() { - let stats_repository = Repository::new(); - - handle_event(Event::UdpRequestBanned, &stats_repository).await; - let stats = stats_repository.get_stats().await; - assert_eq!(stats.udp_requests_banned, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_requests_counter_when_it_receives_a_udp4_request_event() { - let stats_repository = Repository::new(); - - handle_event(Event::Udp4Request, &stats_repository).await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_requests, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::Udp4Response { - kind: crate::statistics::event::UdpResponseKind::Announce, - req_processing_time: std::time::Duration::from_secs(1), - }, - &stats_repository, - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_responses, 1); - } - - #[tokio::test] - async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() { - let stats_repository = Repository::new(); - - handle_event(Event::Udp4Error, &stats_repository).await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp4_errors_handled, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_requests_counter_when_it_receives_a_udp6_request_event() { - let stats_repository = Repository::new(); - - handle_event(Event::Udp6Request, &stats_repository).await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_requests, 1); - } - - #[tokio::test] - async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() { - let stats_repository = Repository::new(); - - handle_event( - Event::Udp6Response { - kind: crate::statistics::event::UdpResponseKind::Announce, - req_processing_time: std::time::Duration::from_secs(1), - }, - &stats_repository, - ) - .await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_responses, 1); - } - #[tokio::test] - async fn should_increase_the_udp6_errors_counter_when_it_receives_a_udp6_error_event() { - let stats_repository = Repository::new(); - - handle_event(Event::Udp6Error, &stats_repository).await; - - let stats = stats_repository.get_stats().await; - - assert_eq!(stats.udp6_errors_handled, 1); - } } diff --git a/packages/udp-tracker-core/src/statistics/event/mod.rs b/packages/udp-tracker-core/src/statistics/event/mod.rs index 6a5343933..bfc733657 100644 --- a/packages/udp-tracker-core/src/statistics/event/mod.rs +++ b/packages/udp-tracker-core/src/statistics/event/mod.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - pub mod handler; pub mod listener; pub mod sender; @@ -16,32 +14,10 @@ pub mod sender; pub enum Event { // code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 } // Attributes are enums too. - UdpRequestAborted, - UdpRequestBanned, - Udp4Request, Udp4Connect, Udp4Announce, Udp4Scrape, - Udp4Response { - kind: UdpResponseKind, - req_processing_time: Duration, - }, - Udp4Error, - Udp6Request, Udp6Connect, Udp6Announce, Udp6Scrape, - Udp6Response { - kind: UdpResponseKind, - req_processing_time: Duration, - }, - Udp6Error, -} - -#[derive(Debug, PartialEq, Eq)] -pub enum UdpResponseKind { - Connect, - Announce, - Scrape, - Error, } diff --git a/packages/udp-tracker-core/src/statistics/metrics.rs b/packages/udp-tracker-core/src/statistics/metrics.rs index 23357aab6..1b3805288 100644 --- a/packages/udp-tracker-core/src/statistics/metrics.rs +++ b/packages/udp-tracker-core/src/statistics/metrics.rs @@ -8,29 +8,6 @@ /// and also for each IP version used by the peers: IPv4 and IPv6. #[derive(Debug, PartialEq, Default)] pub struct Metrics { - // UDP - /// Total number of UDP (UDP tracker) requests aborted. - pub udp_requests_aborted: u64, - - /// Total number of UDP (UDP tracker) requests banned. - pub udp_requests_banned: u64, - - /// Total number of banned IPs. - pub udp_banned_ips_total: u64, - - /// Average rounded time spent processing UDP connect requests. - pub udp_avg_connect_processing_time_ns: u64, - - /// Average rounded time spent processing UDP announce requests. - pub udp_avg_announce_processing_time_ns: u64, - - /// Average rounded time spent processing UDP scrape requests. - pub udp_avg_scrape_processing_time_ns: u64, - - // UDPv4 - /// Total number of UDP (UDP tracker) requests from IPv4 peers. - pub udp4_requests: u64, - /// Total number of UDP (UDP tracker) connections from IPv4 peers. pub udp4_connections_handled: u64, @@ -40,16 +17,6 @@ pub struct Metrics { /// Total number of UDP (UDP tracker) `scrape` requests from IPv4 peers. pub udp4_scrapes_handled: u64, - /// Total number of UDP (UDP tracker) responses from IPv4 peers. - pub udp4_responses: u64, - - /// Total number of UDP (UDP tracker) `error` requests from IPv4 peers. - pub udp4_errors_handled: u64, - - // UDPv6 - /// Total number of UDP (UDP tracker) requests from IPv6 peers. - pub udp6_requests: u64, - /// Total number of UDP (UDP tracker) `connection` requests from IPv6 peers. pub udp6_connections_handled: u64, @@ -58,10 +25,4 @@ pub struct Metrics { /// Total number of UDP (UDP tracker) `scrape` requests from IPv6 peers. pub udp6_scrapes_handled: u64, - - /// Total number of UDP (UDP tracker) responses from IPv6 peers. - pub udp6_responses: u64, - - /// Total number of UDP (UDP tracker) `error` requests from IPv6 peers. - pub udp6_errors_handled: u64, } diff --git a/packages/udp-tracker-core/src/statistics/repository.rs b/packages/udp-tracker-core/src/statistics/repository.rs index 22e793036..f7609e5c2 100644 --- a/packages/udp-tracker-core/src/statistics/repository.rs +++ b/packages/udp-tracker-core/src/statistics/repository.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::time::Duration; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -29,24 +28,6 @@ impl Repository { self.stats.read().await } - pub async fn increase_udp_requests_aborted(&self) { - let mut stats_lock = self.stats.write().await; - stats_lock.udp_requests_aborted += 1; - drop(stats_lock); - } - - pub async fn increase_udp_requests_banned(&self) { - let mut stats_lock = self.stats.write().await; - stats_lock.udp_requests_banned += 1; - drop(stats_lock); - } - - pub async fn increase_udp4_requests(&self) { - let mut stats_lock = self.stats.write().await; - stats_lock.udp4_requests += 1; - drop(stats_lock); - } - pub async fn increase_udp4_connections(&self) { let mut stats_lock = self.stats.write().await; stats_lock.udp4_connections_handled += 1; @@ -65,82 +46,6 @@ impl Repository { drop(stats_lock); } - pub async fn increase_udp4_responses(&self) { - let mut stats_lock = self.stats.write().await; - stats_lock.udp4_responses += 1; - drop(stats_lock); - } - - pub async fn increase_udp4_errors(&self) { - let mut stats_lock = self.stats.write().await; - stats_lock.udp4_errors_handled += 1; - drop(stats_lock); - } - - #[allow(clippy::cast_precision_loss)] - #[allow(clippy::cast_possible_truncation)] - #[allow(clippy::cast_sign_loss)] - pub async fn recalculate_udp_avg_connect_processing_time_ns(&self, req_processing_time: Duration) { - let mut stats_lock = self.stats.write().await; - - let req_processing_time = req_processing_time.as_nanos() as f64; - let udp_connections_handled = (stats_lock.udp4_connections_handled + stats_lock.udp6_connections_handled) as f64; - - let previous_avg = stats_lock.udp_avg_connect_processing_time_ns; - - // Moving average: https://en.wikipedia.org/wiki/Moving_average - let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_connections_handled; - - stats_lock.udp_avg_connect_processing_time_ns = new_avg.ceil() as u64; - - drop(stats_lock); - } - - #[allow(clippy::cast_precision_loss)] - #[allow(clippy::cast_possible_truncation)] - #[allow(clippy::cast_sign_loss)] - pub async fn recalculate_udp_avg_announce_processing_time_ns(&self, req_processing_time: Duration) { - let mut stats_lock = self.stats.write().await; - - let req_processing_time = req_processing_time.as_nanos() as f64; - - let udp_announces_handled = (stats_lock.udp4_announces_handled + stats_lock.udp6_announces_handled) as f64; - - let previous_avg = stats_lock.udp_avg_announce_processing_time_ns; - - // Moving average: https://en.wikipedia.org/wiki/Moving_average - let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_announces_handled; - - stats_lock.udp_avg_announce_processing_time_ns = new_avg.ceil() as u64; - - drop(stats_lock); - } - - #[allow(clippy::cast_precision_loss)] - #[allow(clippy::cast_possible_truncation)] - #[allow(clippy::cast_sign_loss)] - pub async fn recalculate_udp_avg_scrape_processing_time_ns(&self, req_processing_time: Duration) { - let mut stats_lock = self.stats.write().await; - - let req_processing_time = req_processing_time.as_nanos() as f64; - let udp_scrapes_handled = (stats_lock.udp4_scrapes_handled + stats_lock.udp6_scrapes_handled) as f64; - - let previous_avg = stats_lock.udp_avg_scrape_processing_time_ns; - - // Moving average: https://en.wikipedia.org/wiki/Moving_average - let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_scrapes_handled; - - stats_lock.udp_avg_scrape_processing_time_ns = new_avg.ceil() as u64; - - drop(stats_lock); - } - - pub async fn increase_udp6_requests(&self) { - let mut stats_lock = self.stats.write().await; - stats_lock.udp6_requests += 1; - drop(stats_lock); - } - pub async fn increase_udp6_connections(&self) { let mut stats_lock = self.stats.write().await; stats_lock.udp6_connections_handled += 1; @@ -158,16 +63,4 @@ impl Repository { stats_lock.udp6_scrapes_handled += 1; drop(stats_lock); } - - pub async fn increase_udp6_responses(&self) { - let mut stats_lock = self.stats.write().await; - stats_lock.udp6_responses += 1; - drop(stats_lock); - } - - pub async fn increase_udp6_errors(&self) { - let mut stats_lock = self.stats.write().await; - stats_lock.udp6_errors_handled += 1; - drop(stats_lock); - } } diff --git a/packages/udp-tracker-core/src/statistics/services.rs b/packages/udp-tracker-core/src/statistics/services.rs index 486aaac06..7ffa127e6 100644 --- a/packages/udp-tracker-core/src/statistics/services.rs +++ b/packages/udp-tracker-core/src/statistics/services.rs @@ -39,10 +39,8 @@ use std::sync::Arc; use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository; -use tokio::sync::RwLock; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use crate::services::banning::BanService; use crate::statistics::metrics::Metrics; use crate::statistics::repository::Repository; @@ -63,37 +61,22 @@ pub struct TrackerMetrics { /// It returns all the [`TrackerMetrics`] pub async fn get_metrics( in_memory_torrent_repository: Arc, - ban_service: Arc>, stats_repository: Arc, ) -> TrackerMetrics { let torrents_metrics = in_memory_torrent_repository.get_torrents_metrics(); let stats = stats_repository.get_stats().await; - let udp_banned_ips_total = ban_service.read().await.get_banned_ips_total(); TrackerMetrics { torrents_metrics, protocol_metrics: Metrics { - // UDP - udp_requests_aborted: stats.udp_requests_aborted, - udp_requests_banned: stats.udp_requests_banned, - udp_banned_ips_total: udp_banned_ips_total as u64, - udp_avg_connect_processing_time_ns: stats.udp_avg_connect_processing_time_ns, - udp_avg_announce_processing_time_ns: stats.udp_avg_announce_processing_time_ns, - udp_avg_scrape_processing_time_ns: stats.udp_avg_scrape_processing_time_ns, // UDPv4 - udp4_requests: stats.udp4_requests, udp4_connections_handled: stats.udp4_connections_handled, udp4_announces_handled: stats.udp4_announces_handled, udp4_scrapes_handled: stats.udp4_scrapes_handled, - udp4_responses: stats.udp4_responses, - udp4_errors_handled: stats.udp4_errors_handled, // UDPv6 - udp6_requests: stats.udp6_requests, udp6_connections_handled: stats.udp6_connections_handled, udp6_announces_handled: stats.udp6_announces_handled, udp6_scrapes_handled: stats.udp6_scrapes_handled, - udp6_responses: stats.udp6_responses, - udp6_errors_handled: stats.udp6_errors_handled, }, } } @@ -104,14 +87,12 @@ mod tests { use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository; use bittorrent_tracker_core::{self}; - use tokio::sync::RwLock; use torrust_tracker_configuration::Configuration; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; use torrust_tracker_test_helpers::configuration; - use crate::services::banning::BanService; + use crate::statistics; use crate::statistics::services::{get_metrics, TrackerMetrics}; - use crate::{statistics, MAX_CONNECTION_ID_ERRORS_PER_IP}; pub fn tracker_configuration() -> Configuration { configuration::ephemeral() @@ -122,17 +103,12 @@ mod tests { let config = tracker_configuration(); let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); - let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); - let (_udp_stats_event_sender, udp_stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics); - let udp_stats_repository = Arc::new(udp_stats_repository); + let (_udp_core_stats_event_sender, udp_core_stats_repository) = + crate::statistics::setup::factory(config.core.tracker_usage_statistics); + let udp_core_stats_repository = Arc::new(udp_core_stats_repository); - let tracker_metrics = get_metrics( - in_memory_torrent_repository.clone(), - ban_service.clone(), - udp_stats_repository.clone(), - ) - .await; + let tracker_metrics = get_metrics(in_memory_torrent_repository.clone(), udp_core_stats_repository.clone()).await; assert_eq!( tracker_metrics, diff --git a/packages/udp-tracker-server/src/container.rs b/packages/udp-tracker-server/src/container.rs new file mode 100644 index 000000000..36ad0e671 --- /dev/null +++ b/packages/udp-tracker-server/src/container.rs @@ -0,0 +1,25 @@ +use std::sync::Arc; + +use torrust_tracker_configuration::Core; + +use crate::statistics; + +pub struct UdpTrackerServerContainer { + pub udp_server_stats_event_sender: Arc>>, + pub udp_server_stats_repository: Arc, +} + +impl UdpTrackerServerContainer { + #[must_use] + pub fn initialize(core_config: &Arc) -> Arc { + let (udp_server_stats_event_sender, udp_server_stats_repository) = + statistics::setup::factory(core_config.tracker_usage_statistics); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); + let udp_server_stats_repository = Arc::new(udp_server_stats_repository); + + Arc::new(Self { + udp_server_stats_event_sender: udp_server_stats_event_sender.clone(), + udp_server_stats_repository: udp_server_stats_repository.clone(), + }) + } +} diff --git a/packages/udp-tracker-server/src/environment.rs b/packages/udp-tracker-server/src/environment.rs index 0ab3bdea1..c6ec98290 100644 --- a/packages/udp-tracker-server/src/environment.rs +++ b/packages/udp-tracker-server/src/environment.rs @@ -8,6 +8,7 @@ use torrust_server_lib::registar::Registar; use torrust_tracker_configuration::{logging, Configuration, DEFAULT_TIMEOUT}; use torrust_tracker_primitives::peer; +use crate::container::UdpTrackerServerContainer; use crate::server::spawner::Spawner; use crate::server::states::{Running, Stopped}; use crate::server::Server; @@ -71,6 +72,7 @@ impl Environment { .server .start( self.container.udp_tracker_core_container.clone(), + self.container.udp_tracker_server_container.clone(), self.registar.give_form(), cookie_lifetime, ) @@ -115,6 +117,7 @@ impl Environment { pub struct EnvContainer { pub tracker_core_container: Arc, pub udp_tracker_core_container: Arc, + pub udp_tracker_server_container: Arc, } impl EnvContainer { @@ -129,10 +132,12 @@ impl EnvContainer { let tracker_core_container = Arc::new(TrackerCoreContainer::initialize(&core_config)); let udp_tracker_core_container = UdpTrackerCoreContainer::initialize_from(&tracker_core_container, &udp_tracker_config); + let udp_tracker_server_container = UdpTrackerServerContainer::initialize(&core_config); Self { tracker_core_container, udp_tracker_core_container, + udp_tracker_server_container, } } } diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index 7e3b8e7dd..97ce6ba4a 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -10,13 +10,15 @@ use aquatic_udp_protocol::{ use bittorrent_primitives::info_hash::InfoHash; use bittorrent_tracker_core::announce_handler::AnnounceHandler; use bittorrent_tracker_core::whitelist; -use bittorrent_udp_tracker_core::{services, statistics}; +use bittorrent_udp_tracker_core::{services, statistics as core_statistics}; use torrust_tracker_configuration::Core; use torrust_tracker_primitives::core::AnnounceData; use tracing::{instrument, Level}; use zerocopy::network_endian::I32; use crate::error::Error; +use crate::statistics as server_statistics; +use crate::statistics::event::UdpResponseKind; /// It handles the `Announce` request. /// @@ -24,14 +26,15 @@ use crate::error::Error; /// /// If a error happens in the `handle_announce` function, it will just return the `ServerError`. #[allow(clippy::too_many_arguments)] -#[instrument(fields(transaction_id, connection_id, info_hash), skip(announce_handler, whitelist_authorization, opt_udp_stats_event_sender), ret(level = Level::TRACE))] +#[instrument(fields(transaction_id, connection_id, info_hash), skip(announce_handler, whitelist_authorization, opt_udp_core_stats_event_sender, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_announce( remote_addr: SocketAddr, request: &AnnounceRequest, core_config: &Arc, announce_handler: &Arc, whitelist_authorization: &Arc, - opt_udp_stats_event_sender: &Arc>>, + opt_udp_core_stats_event_sender: &Arc>>, + opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, ) -> Result { tracing::Span::current() @@ -41,12 +44,31 @@ pub async fn handle_announce( tracing::trace!("handle announce"); + if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { + match remote_addr.ip() { + IpAddr::V4(_) => { + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::Udp4Request { + kind: UdpResponseKind::Announce, + }) + .await; + } + IpAddr::V6(_) => { + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::Udp6Request { + kind: UdpResponseKind::Announce, + }) + .await; + } + } + } + let announce_data = services::announce::handle_announce( remote_addr, request, announce_handler, whitelist_authorization, - opt_udp_stats_event_sender, + opt_udp_core_stats_event_sender, cookie_valid_range, ) .await @@ -205,7 +227,7 @@ mod tests { use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository; use bittorrent_tracker_core::whitelist; use bittorrent_udp_tracker_core::connection_cookie::{gen_remote_fingerprint, make}; - use bittorrent_udp_tracker_core::statistics; + use bittorrent_udp_tracker_core::statistics as core_statistics; use mockall::predicate::eq; use torrust_tracker_configuration::Core; @@ -214,12 +236,15 @@ mod tests { use crate::handlers::tests::{ initialize_core_tracker_services_for_default_tracker_configuration, initialize_core_tracker_services_for_public_tracker, sample_cookie_valid_range, sample_ipv4_socket_address, - sample_issue_time, MockUdpStatsEventSender, TorrentPeerBuilder, + sample_issue_time, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, TorrentPeerBuilder, }; + use crate::statistics as server_statistics; + use crate::statistics::event::UdpResponseKind; #[tokio::test] async fn an_announced_peer_should_be_added_to_the_tracker() { - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); let client_ip = Ipv4Addr::new(126, 0, 0, 1); let client_port = 8080; @@ -242,7 +267,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -263,7 +289,8 @@ mod tests { #[tokio::test] async fn the_announced_peer_should_not_be_included_in_the_response() { - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); @@ -277,7 +304,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -304,7 +332,8 @@ mod tests { // From the BEP 15 (https://www.bittorrent.org/beps/bep_0015.html): // "Do note that most trackers will only honor the IP address field under limited circumstances." - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); let info_hash = AquaticInfoHash([0u8; 20]); let peer_id = AquaticPeerId([255u8; 20]); @@ -330,7 +359,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -364,9 +394,12 @@ mod tests { announce_handler: Arc, whitelist_authorization: Arc, ) -> Response { - let (udp_stats_event_sender, _udp_stats_repository) = + let (udp_core_stats_event_sender, _udp_core_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + + let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); let request = AnnounceRequestBuilder::default() @@ -379,7 +412,8 @@ mod tests { &core_config, &announce_handler, &whitelist_authorization, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -388,7 +422,8 @@ mod tests { #[tokio::test] async fn when_the_announce_request_comes_from_a_client_using_ipv4_the_response_should_not_include_peers_using_ipv6() { - let (core_tracker_services, _core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, _core_udp_tracker_services, _server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); add_a_torrent_peer_using_ipv6(&core_tracker_services.in_memory_torrent_repository); @@ -410,16 +445,27 @@ mod tests { #[tokio::test] async fn should_send_the_upd4_announce_event() { - let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new(); - udp_stats_event_sender_mock + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); + udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp4Announce)) + .with(eq(core_statistics::event::Event::Udp4Announce)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let udp_stats_event_sender: Arc>> = - Arc::new(Some(Box::new(udp_stats_event_sender_mock))); + let udp_core_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); - let (core_tracker_services, _core_udp_tracker_services) = + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); + udp_server_stats_event_sender_mock + .expect_send_event() + .with(eq(server_statistics::event::Event::Udp4Request { + kind: UdpResponseKind::Announce, + })) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let udp_server_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); + + let (core_tracker_services, _core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); handle_announce( @@ -428,7 +474,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -451,7 +498,7 @@ mod tests { #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration_if_defined() { - let (core_tracker_services, core_udp_tracker_services) = + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); let client_ip = Ipv4Addr::new(127, 0, 0, 1); @@ -475,7 +522,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -512,7 +560,7 @@ mod tests { use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository; use bittorrent_tracker_core::whitelist; use bittorrent_udp_tracker_core::connection_cookie::{gen_remote_fingerprint, make}; - use bittorrent_udp_tracker_core::statistics; + use bittorrent_udp_tracker_core::statistics as core_statistics; use mockall::predicate::eq; use torrust_tracker_configuration::Core; @@ -521,12 +569,15 @@ mod tests { use crate::handlers::tests::{ initialize_core_tracker_services_for_default_tracker_configuration, initialize_core_tracker_services_for_public_tracker, sample_cookie_valid_range, sample_ipv6_remote_addr, - sample_issue_time, MockUdpStatsEventSender, TorrentPeerBuilder, + sample_issue_time, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, TorrentPeerBuilder, }; + use crate::statistics as server_statistics; + use crate::statistics::event::UdpResponseKind; #[tokio::test] async fn an_announced_peer_should_be_added_to_the_tracker() { - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); @@ -550,7 +601,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -571,7 +623,8 @@ mod tests { #[tokio::test] async fn the_announced_peer_should_not_be_included_in_the_response() { - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); @@ -588,7 +641,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -615,7 +669,8 @@ mod tests { // From the BEP 15 (https://www.bittorrent.org/beps/bep_0015.html): // "Do note that most trackers will only honor the IP address field under limited circumstances." - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_service) = + initialize_core_tracker_services_for_public_tracker(); let info_hash = AquaticInfoHash([0u8; 20]); let peer_id = AquaticPeerId([255u8; 20]); @@ -641,7 +696,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_service.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -675,9 +731,12 @@ mod tests { announce_handler: Arc, whitelist_authorization: Arc, ) -> Response { - let (udp_stats_event_sender, _udp_stats_repository) = + let (udp_core_stats_event_sender, _udp_core_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + + let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); @@ -693,7 +752,8 @@ mod tests { &core_config, &announce_handler, &whitelist_authorization, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -702,7 +762,8 @@ mod tests { #[tokio::test] async fn when_the_announce_request_comes_from_a_client_using_ipv6_the_response_should_not_include_peers_using_ipv4() { - let (core_tracker_services, _core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, _core_udp_tracker_services, _server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); add_a_torrent_peer_using_ipv4(&core_tracker_services.in_memory_torrent_repository); @@ -724,16 +785,27 @@ mod tests { #[tokio::test] async fn should_send_the_upd6_announce_event() { - let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new(); - udp_stats_event_sender_mock + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); + udp_core_stats_event_sender_mock + .expect_send_event() + .with(eq(core_statistics::event::Event::Udp6Announce)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let udp_core_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); + + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); + udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp6Announce)) + .with(eq(server_statistics::event::Event::Udp6Request { + kind: UdpResponseKind::Announce, + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let udp_stats_event_sender: Arc>> = - Arc::new(Some(Box::new(udp_stats_event_sender_mock))); + let udp_server_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); - let (core_tracker_services, _core_udp_tracker_services) = + let (core_tracker_services, _core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); let remote_addr = sample_ipv6_remote_addr(); @@ -748,7 +820,8 @@ mod tests { &core_tracker_services.core_config, &core_tracker_services.announce_handler, &core_tracker_services.whitelist_authorization, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -768,14 +841,17 @@ mod tests { use bittorrent_tracker_core::whitelist::authorization::WhitelistAuthorization; use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist; use bittorrent_udp_tracker_core::connection_cookie::{gen_remote_fingerprint, make}; - use bittorrent_udp_tracker_core::{self, statistics}; + use bittorrent_udp_tracker_core::{self, statistics as core_statistics}; use mockall::predicate::eq; use crate::handlers::announce::tests::announce_request::AnnounceRequestBuilder; use crate::handlers::handle_announce; use crate::handlers::tests::{ - sample_cookie_valid_range, sample_issue_time, MockUdpStatsEventSender, TrackerConfigurationBuilder, + sample_cookie_valid_range, sample_issue_time, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, + TrackerConfigurationBuilder, }; + use crate::statistics as server_statistics; + use crate::statistics::event::UdpResponseKind; #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { @@ -788,14 +864,25 @@ mod tests { let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); let db_torrent_repository = Arc::new(DatabasePersistentTorrentRepository::new(&database)); - let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new(); - udp_stats_event_sender_mock + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); + udp_core_stats_event_sender_mock + .expect_send_event() + .with(eq(core_statistics::event::Event::Udp6Announce)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let udp_core_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); + + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); + udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp6Announce)) + .with(eq(server_statistics::event::Event::Udp6Request { + kind: UdpResponseKind::Announce, + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let udp_stats_event_sender: Arc>> = - Arc::new(Some(Box::new(udp_stats_event_sender_mock))); + let udp_server_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let announce_handler = Arc::new(AnnounceHandler::new( &config.core, @@ -832,7 +919,8 @@ mod tests { &core_config, &announce_handler, &whitelist_authorization, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await diff --git a/packages/udp-tracker-server/src/handlers/connect.rs b/packages/udp-tracker-server/src/handlers/connect.rs index d1c3a05d8..be6dc45d4 100644 --- a/packages/udp-tracker-server/src/handlers/connect.rs +++ b/packages/udp-tracker-server/src/handlers/connect.rs @@ -1,23 +1,46 @@ //! UDP tracker connect handler. -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, ConnectionId, Response}; -use bittorrent_udp_tracker_core::{services, statistics}; +use bittorrent_udp_tracker_core::{services, statistics as core_statistics}; use tracing::{instrument, Level}; +use crate::statistics as server_statistics; +use crate::statistics::event::UdpResponseKind; + /// It handles the `Connect` request. -#[instrument(fields(transaction_id), skip(opt_udp_stats_event_sender), ret(level = Level::TRACE))] +#[instrument(fields(transaction_id), skip(opt_udp_core_stats_event_sender, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_connect( remote_addr: SocketAddr, request: &ConnectRequest, - opt_udp_stats_event_sender: &Arc>>, + opt_udp_core_stats_event_sender: &Arc>>, + opt_udp_server_stats_event_sender: &Arc>>, cookie_issue_time: f64, ) -> Response { tracing::Span::current().record("transaction_id", request.transaction_id.0.to_string()); tracing::trace!("handle connect"); - let connection_id = services::connect::handle_connect(remote_addr, opt_udp_stats_event_sender, cookie_issue_time).await; + if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { + match remote_addr.ip() { + IpAddr::V4(_) => { + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::Udp4Request { + kind: UdpResponseKind::Connect, + }) + .await; + } + IpAddr::V6(_) => { + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::Udp6Request { + kind: UdpResponseKind::Connect, + }) + .await; + } + } + } + + let connection_id = services::connect::handle_connect(remote_addr, opt_udp_core_stats_event_sender, cookie_issue_time).await; build_response(*request, connection_id) } @@ -41,14 +64,16 @@ mod tests { use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId}; use bittorrent_udp_tracker_core::connection_cookie::make; - use bittorrent_udp_tracker_core::statistics; + use bittorrent_udp_tracker_core::statistics as core_statistics; use mockall::predicate::eq; use crate::handlers::handle_connect; use crate::handlers::tests::{ sample_ipv4_remote_addr, sample_ipv4_remote_addr_fingerprint, sample_ipv4_socket_address, sample_ipv6_remote_addr, - sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpStatsEventSender, + sample_ipv6_remote_addr_fingerprint, sample_issue_time, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, }; + use crate::statistics as server_statistics; + use crate::statistics::event::UdpResponseKind; fn sample_connect_request() -> ConnectRequest { ConnectRequest { @@ -58,8 +83,12 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() { - let (udp_stats_event_sender, _udp_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = + bittorrent_udp_tracker_core::statistics::setup::factory(false); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + + let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); let request = ConnectRequest { transaction_id: TransactionId(0i32.into()), @@ -68,7 +97,8 @@ mod tests { let response = handle_connect( sample_ipv4_remote_addr(), &request, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_issue_time(), ) .await; @@ -84,8 +114,12 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id() { - let (udp_stats_event_sender, _udp_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = + bittorrent_udp_tracker_core::statistics::setup::factory(false); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + + let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); let request = ConnectRequest { transaction_id: TransactionId(0i32.into()), @@ -94,7 +128,8 @@ mod tests { let response = handle_connect( sample_ipv4_remote_addr(), &request, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_issue_time(), ) .await; @@ -110,8 +145,12 @@ mod tests { #[tokio::test] async fn a_connect_response_should_contain_a_new_connection_id_ipv6() { - let (udp_stats_event_sender, _udp_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = + bittorrent_udp_tracker_core::statistics::setup::factory(false); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + + let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); let request = ConnectRequest { transaction_id: TransactionId(0i32.into()), @@ -120,7 +159,8 @@ mod tests { let response = handle_connect( sample_ipv6_remote_addr(), &request, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_issue_time(), ) .await; @@ -136,21 +176,33 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { - let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new(); - udp_stats_event_sender_mock + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); + udp_core_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp4Connect)) + .with(eq(core_statistics::event::Event::Udp4Connect)) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let udp_stats_event_sender: Arc>> = - Arc::new(Some(Box::new(udp_stats_event_sender_mock))); + let udp_core_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); + + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); + udp_server_stats_event_sender_mock + .expect_send_event() + .with(eq(server_statistics::event::Event::Udp4Request { + kind: UdpResponseKind::Connect, + })) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let udp_server_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let client_socket_address = sample_ipv4_socket_address(); handle_connect( client_socket_address, &sample_connect_request(), - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_issue_time(), ) .await; @@ -158,19 +210,31 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { - let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new(); - udp_stats_event_sender_mock + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); + udp_core_stats_event_sender_mock + .expect_send_event() + .with(eq(core_statistics::event::Event::Udp6Connect)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let udp_core_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); + + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); + udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp6Connect)) + .with(eq(server_statistics::event::Event::Udp6Request { + kind: UdpResponseKind::Connect, + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let udp_stats_event_sender: Arc>> = - Arc::new(Some(Box::new(udp_stats_event_sender_mock))); + let udp_server_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); handle_connect( sample_ipv6_remote_addr(), &sample_connect_request(), - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_issue_time(), ) .await; diff --git a/packages/udp-tracker-server/src/handlers/error.rs b/packages/udp-tracker-server/src/handlers/error.rs index 4f2457126..e4bd382da 100644 --- a/packages/udp-tracker-server/src/handlers/error.rs +++ b/packages/udp-tracker-server/src/handlers/error.rs @@ -5,20 +5,21 @@ use std::sync::Arc; use aquatic_udp_protocol::{ErrorResponse, RequestParseError, Response, TransactionId}; use bittorrent_udp_tracker_core::connection_cookie::{check, gen_remote_fingerprint}; -use bittorrent_udp_tracker_core::{self, statistics, UDP_TRACKER_LOG_TARGET}; +use bittorrent_udp_tracker_core::{self, UDP_TRACKER_LOG_TARGET}; use tracing::{instrument, Level}; use uuid::Uuid; use zerocopy::network_endian::I32; use crate::error::Error; +use crate::statistics as server_statistics; #[allow(clippy::too_many_arguments)] -#[instrument(fields(transaction_id), skip(opt_udp_stats_event_sender), ret(level = Level::TRACE))] +#[instrument(fields(transaction_id), skip(opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_error( remote_addr: SocketAddr, local_addr: SocketAddr, request_id: Uuid, - opt_udp_stats_event_sender: &Arc>>, + opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, e: &Error, transaction_id: Option, @@ -55,13 +56,17 @@ pub async fn handle_error( }; if e.1.is_some() { - if let Some(udp_stats_event_sender) = opt_udp_stats_event_sender.as_deref() { + if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { match remote_addr { SocketAddr::V4(_) => { - udp_stats_event_sender.send_event(statistics::event::Event::Udp4Error).await; + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::Udp4Error) + .await; } SocketAddr::V6(_) => { - udp_stats_event_sender.send_event(statistics::event::Event::Udp6Error).await; + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::Udp6Error) + .await; } } } diff --git a/packages/udp-tracker-server/src/handlers/mod.rs b/packages/udp-tracker-server/src/handlers/mod.rs index 5d7fdb3b3..fd0536b8b 100644 --- a/packages/udp-tracker-server/src/handlers/mod.rs +++ b/packages/udp-tracker-server/src/handlers/mod.rs @@ -22,6 +22,7 @@ use tracing::{instrument, Level}; use uuid::Uuid; use super::RawRequest; +use crate::container::UdpTrackerServerContainer; use crate::error::Error; use crate::CurrentClock; @@ -52,10 +53,11 @@ impl CookieTimeValues { /// - Delegating the request to the correct handler depending on the request type. /// /// It will return an `Error` response if the request is invalid. -#[instrument(fields(request_id), skip(udp_request, udp_tracker_container, cookie_time_values), ret(level = Level::TRACE))] +#[instrument(fields(request_id), skip(udp_request, udp_tracker_core_container, udp_tracker_server_container, cookie_time_values), ret(level = Level::TRACE))] pub(crate) async fn handle_packet( udp_request: RawRequest, - udp_tracker_container: Arc, + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, local_addr: SocketAddr, cookie_time_values: CookieTimeValues, ) -> Response { @@ -71,7 +73,8 @@ pub(crate) async fn handle_packet( Ok(request) => match handle_request( request, udp_request.from, - udp_tracker_container.clone(), + udp_tracker_core_container.clone(), + udp_tracker_server_container.clone(), cookie_time_values.clone(), ) .await @@ -83,7 +86,7 @@ pub(crate) async fn handle_packet( } = error { // code-review: should we include `RequestParseError` and `BadRequest`? - let mut ban_service = udp_tracker_container.ban_service.write().await; + let mut ban_service = udp_tracker_core_container.ban_service.write().await; ban_service.increase_counter(&udp_request.from.ip()); } @@ -91,7 +94,7 @@ pub(crate) async fn handle_packet( udp_request.from, local_addr, request_id, - &udp_tracker_container.udp_stats_event_sender, + &udp_tracker_server_container.udp_server_stats_event_sender, cookie_time_values.valid_range.clone(), &error, Some(transaction_id), @@ -104,7 +107,7 @@ pub(crate) async fn handle_packet( udp_request.from, local_addr, request_id, - &udp_tracker_container.udp_stats_event_sender, + &udp_tracker_server_container.udp_server_stats_event_sender, cookie_time_values.valid_range.clone(), &e, None, @@ -124,11 +127,18 @@ pub(crate) async fn handle_packet( /// # Errors /// /// If a error happens in the `handle_request` function, it will just return the `ServerError`. -#[instrument(skip(request, remote_addr, udp_tracker_container, cookie_time_values))] +#[instrument(skip( + request, + remote_addr, + udp_tracker_core_container, + udp_tracker_server_container, + cookie_time_values +))] pub async fn handle_request( request: Request, remote_addr: SocketAddr, - udp_tracker_container: Arc, + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, cookie_time_values: CookieTimeValues, ) -> Result { tracing::trace!("handle request"); @@ -137,7 +147,8 @@ pub async fn handle_request( Request::Connect(connect_request) => Ok(handle_connect( remote_addr, &connect_request, - &udp_tracker_container.udp_stats_event_sender, + &udp_tracker_core_container.udp_core_stats_event_sender, + &udp_tracker_server_container.udp_server_stats_event_sender, cookie_time_values.issue_time, ) .await), @@ -145,10 +156,11 @@ pub async fn handle_request( handle_announce( remote_addr, &announce_request, - &udp_tracker_container.core_config, - &udp_tracker_container.announce_handler, - &udp_tracker_container.whitelist_authorization, - &udp_tracker_container.udp_stats_event_sender, + &udp_tracker_core_container.core_config, + &udp_tracker_core_container.announce_handler, + &udp_tracker_core_container.whitelist_authorization, + &udp_tracker_core_container.udp_core_stats_event_sender, + &udp_tracker_server_container.udp_server_stats_event_sender, cookie_time_values.valid_range, ) .await @@ -157,8 +169,9 @@ pub async fn handle_request( handle_scrape( remote_addr, &scrape_request, - &udp_tracker_container.scrape_handler, - &udp_tracker_container.udp_stats_event_sender, + &udp_tracker_core_container.scrape_handler, + &udp_tracker_core_container.udp_core_stats_event_sender, + &udp_tracker_server_container.udp_server_stats_event_sender, cookie_time_values.valid_range, ) .await @@ -183,7 +196,7 @@ pub(crate) mod tests { use bittorrent_tracker_core::whitelist::authorization::WhitelistAuthorization; use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist; use bittorrent_udp_tracker_core::connection_cookie::gen_remote_fingerprint; - use bittorrent_udp_tracker_core::{self, statistics}; + use bittorrent_udp_tracker_core::{self, statistics as core_statistics}; use futures::future::BoxFuture; use mockall::mock; use tokio::sync::mpsc::error::SendError; @@ -192,7 +205,7 @@ pub(crate) mod tests { use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use torrust_tracker_test_helpers::configuration; - use crate::CurrentClock; + use crate::{statistics as server_statistics, CurrentClock}; pub(crate) struct CoreTrackerServices { pub core_config: Arc, @@ -204,7 +217,11 @@ pub(crate) mod tests { } pub(crate) struct CoreUdpTrackerServices { - pub udp_stats_event_sender: Arc>>, + pub udp_core_stats_event_sender: Arc>>, + } + + pub(crate) struct ServerUdpTrackerServices { + pub udp_server_stats_event_sender: Arc>>, } fn default_testing_tracker_configuration() -> Configuration { @@ -212,19 +229,23 @@ pub(crate) mod tests { } pub(crate) fn initialize_core_tracker_services_for_default_tracker_configuration( - ) -> (CoreTrackerServices, CoreUdpTrackerServices) { + ) -> (CoreTrackerServices, CoreUdpTrackerServices, ServerUdpTrackerServices) { initialize_core_tracker_services(&default_testing_tracker_configuration()) } - pub(crate) fn initialize_core_tracker_services_for_public_tracker() -> (CoreTrackerServices, CoreUdpTrackerServices) { + pub(crate) fn initialize_core_tracker_services_for_public_tracker( + ) -> (CoreTrackerServices, CoreUdpTrackerServices, ServerUdpTrackerServices) { initialize_core_tracker_services(&configuration::ephemeral_public()) } - pub(crate) fn initialize_core_tracker_services_for_listed_tracker() -> (CoreTrackerServices, CoreUdpTrackerServices) { + pub(crate) fn initialize_core_tracker_services_for_listed_tracker( + ) -> (CoreTrackerServices, CoreUdpTrackerServices, ServerUdpTrackerServices) { initialize_core_tracker_services(&configuration::ephemeral_listed()) } - fn initialize_core_tracker_services(config: &Configuration) -> (CoreTrackerServices, CoreUdpTrackerServices) { + fn initialize_core_tracker_services( + config: &Configuration, + ) -> (CoreTrackerServices, CoreUdpTrackerServices, ServerUdpTrackerServices) { let core_config = Arc::new(config.core.clone()); let database = initialize_database(&config.core); let in_memory_whitelist = Arc::new(InMemoryWhitelist::default()); @@ -239,8 +260,12 @@ pub(crate) mod tests { )); let scrape_handler = Arc::new(ScrapeHandler::new(&whitelist_authorization, &in_memory_torrent_repository)); - let (udp_stats_event_sender, _udp_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = + bittorrent_udp_tracker_core::statistics::setup::factory(false); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + + let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); ( CoreTrackerServices { @@ -251,7 +276,12 @@ pub(crate) mod tests { in_memory_whitelist, whitelist_authorization, }, - CoreUdpTrackerServices { udp_stats_event_sender }, + CoreUdpTrackerServices { + udp_core_stats_event_sender, + }, + ServerUdpTrackerServices { + udp_server_stats_event_sender, + }, ) } @@ -356,9 +386,16 @@ pub(crate) mod tests { } mock! { - pub(crate) UdpStatsEventSender {} - impl statistics::event::sender::Sender for UdpStatsEventSender { - fn send_event(&self, event: statistics::event::Event) -> BoxFuture<'static,Option > > > ; + pub(crate) UdpCoreStatsEventSender {} + impl core_statistics::event::sender::Sender for UdpCoreStatsEventSender { + fn send_event(&self, event: core_statistics::event::Event) -> BoxFuture<'static,Option > > > ; + } + } + + mock! { + pub(crate) UdpServerStatsEventSender {} + impl server_statistics::event::sender::Sender for UdpServerStatsEventSender { + fn send_event(&self, event: server_statistics::event::Event) -> BoxFuture<'static,Option > > > ; } } } diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index de98b5f6d..248f0ca12 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -1,5 +1,5 @@ //! UDP tracker scrape handler. -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::ops::Range; use std::sync::Arc; @@ -7,25 +7,27 @@ use aquatic_udp_protocol::{ NumberOfDownloads, NumberOfPeers, Response, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId, }; use bittorrent_tracker_core::scrape_handler::ScrapeHandler; -use bittorrent_udp_tracker_core::statistics::{self}; -use bittorrent_udp_tracker_core::{self, services}; +use bittorrent_udp_tracker_core::{self, services, statistics as core_statistics}; use torrust_tracker_primitives::core::ScrapeData; use tracing::{instrument, Level}; use zerocopy::network_endian::I32; use crate::error::Error; +use crate::statistics as server_statistics; +use crate::statistics::event::UdpResponseKind; /// It handles the `Scrape` request. /// /// # Errors /// /// This function does not ever return an error. -#[instrument(fields(transaction_id, connection_id), skip(scrape_handler, opt_udp_stats_event_sender), ret(level = Level::TRACE))] +#[instrument(fields(transaction_id, connection_id), skip(scrape_handler, opt_udp_core_stats_event_sender, opt_udp_server_stats_event_sender), ret(level = Level::TRACE))] pub async fn handle_scrape( remote_addr: SocketAddr, request: &ScrapeRequest, scrape_handler: &Arc, - opt_udp_stats_event_sender: &Arc>>, + opt_udp_core_stats_event_sender: &Arc>>, + opt_udp_server_stats_event_sender: &Arc>>, cookie_valid_range: Range, ) -> Result { tracing::Span::current() @@ -34,11 +36,30 @@ pub async fn handle_scrape( tracing::trace!("handle scrape"); + if let Some(udp_server_stats_event_sender) = opt_udp_server_stats_event_sender.as_deref() { + match remote_addr.ip() { + IpAddr::V4(_) => { + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::Udp4Request { + kind: UdpResponseKind::Scrape, + }) + .await; + } + IpAddr::V6(_) => { + udp_server_stats_event_sender + .send_event(server_statistics::event::Event::Udp6Request { + kind: UdpResponseKind::Scrape, + }) + .await; + } + } + } + let scrape_data = services::scrape::handle_scrape( remote_addr, request, scrape_handler, - opt_udp_stats_event_sender, + opt_udp_core_stats_event_sender, cookie_valid_range, ) .await @@ -104,7 +125,8 @@ mod tests { #[tokio::test] async fn should_return_no_stats_when_the_tracker_does_not_have_any_torrent() { - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); let remote_addr = sample_ipv4_remote_addr(); @@ -121,7 +143,8 @@ mod tests { remote_addr, &request, &core_tracker_services.scrape_handler, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -168,8 +191,12 @@ mod tests { in_memory_torrent_repository: Arc, scrape_handler: Arc, ) -> Response { - let (udp_stats_event_sender, _udp_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(false); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); + let (udp_core_stats_event_sender, _udp_core_stats_repository) = + bittorrent_udp_tracker_core::statistics::setup::factory(false); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + + let (udp_server_stats_event_sender, _udp_server_stats_repository) = crate::statistics::setup::factory(false); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); let remote_addr = sample_ipv4_remote_addr(); let info_hash = InfoHash([0u8; 20]); @@ -182,7 +209,8 @@ mod tests { remote_addr, &request, &scrape_handler, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -204,7 +232,8 @@ mod tests { #[tokio::test] async fn should_return_torrent_statistics_when_the_tracker_has_the_requested_torrent() { - let (core_tracker_services, _core_udp_tracker_services) = initialize_core_tracker_services_for_public_tracker(); + let (core_tracker_services, _core_udp_tracker_services, _server_udp_tracker_services) = + initialize_core_tracker_services_for_public_tracker(); let torrent_stats = match_scrape_response( add_a_sample_seeder_and_scrape( @@ -237,7 +266,8 @@ mod tests { #[tokio::test] async fn should_return_the_torrent_statistics_when_the_requested_torrent_is_whitelisted() { - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_listed_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = + initialize_core_tracker_services_for_listed_tracker(); let remote_addr = sample_ipv4_remote_addr(); let info_hash = InfoHash([0u8; 20]); @@ -258,7 +288,8 @@ mod tests { remote_addr, &request, &core_tracker_services.scrape_handler, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -277,7 +308,8 @@ mod tests { #[tokio::test] async fn should_return_zeroed_statistics_when_the_requested_torrent_is_not_whitelisted() { - let (core_tracker_services, core_udp_tracker_services) = initialize_core_tracker_services_for_listed_tracker(); + let (core_tracker_services, core_udp_tracker_services, server_udp_tracker_services) = + initialize_core_tracker_services_for_listed_tracker(); let remote_addr = sample_ipv4_remote_addr(); let info_hash = InfoHash([0u8; 20]); @@ -296,7 +328,8 @@ mod tests { remote_addr, &request, &core_tracker_services.scrape_handler, - &core_udp_tracker_services.udp_stats_event_sender, + &core_udp_tracker_services.udp_core_stats_event_sender, + &server_udp_tracker_services.udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -325,37 +358,50 @@ mod tests { use std::future; use std::sync::Arc; - use bittorrent_udp_tracker_core::statistics; + use bittorrent_udp_tracker_core::statistics as core_statistics; use mockall::predicate::eq; use super::sample_scrape_request; use crate::handlers::handle_scrape; use crate::handlers::tests::{ initialize_core_tracker_services_for_default_tracker_configuration, sample_cookie_valid_range, - sample_ipv4_remote_addr, MockUdpStatsEventSender, + sample_ipv4_remote_addr, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, }; + use crate::statistics as server_statistics; #[tokio::test] async fn should_send_the_upd4_scrape_event() { - let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new(); - udp_stats_event_sender_mock + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); + udp_core_stats_event_sender_mock + .expect_send_event() + .with(eq(core_statistics::event::Event::Udp4Scrape)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let udp_core_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); + + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); + udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp4Scrape)) + .with(eq(server_statistics::event::Event::Udp4Request { + kind: server_statistics::event::UdpResponseKind::Scrape, + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let udp_stats_event_sender: Arc>> = - Arc::new(Some(Box::new(udp_stats_event_sender_mock))); + let udp_server_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let remote_addr = sample_ipv4_remote_addr(); - let (core_tracker_services, _core_udp_tracker_services) = + let (core_tracker_services, _core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); handle_scrape( remote_addr, &sample_scrape_request(&remote_addr), &core_tracker_services.scrape_handler, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await @@ -367,37 +413,50 @@ mod tests { use std::future; use std::sync::Arc; - use bittorrent_udp_tracker_core::statistics; + use bittorrent_udp_tracker_core::statistics as core_statistics; use mockall::predicate::eq; use super::sample_scrape_request; use crate::handlers::handle_scrape; use crate::handlers::tests::{ initialize_core_tracker_services_for_default_tracker_configuration, sample_cookie_valid_range, - sample_ipv6_remote_addr, MockUdpStatsEventSender, + sample_ipv6_remote_addr, MockUdpCoreStatsEventSender, MockUdpServerStatsEventSender, }; + use crate::statistics as server_statistics; #[tokio::test] async fn should_send_the_upd6_scrape_event() { - let mut udp_stats_event_sender_mock = MockUdpStatsEventSender::new(); - udp_stats_event_sender_mock + let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new(); + udp_core_stats_event_sender_mock + .expect_send_event() + .with(eq(core_statistics::event::Event::Udp6Scrape)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let udp_core_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_core_stats_event_sender_mock))); + + let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new(); + udp_server_stats_event_sender_mock .expect_send_event() - .with(eq(statistics::event::Event::Udp6Scrape)) + .with(eq(server_statistics::event::Event::Udp6Request { + kind: server_statistics::event::UdpResponseKind::Scrape, + })) .times(1) .returning(|_| Box::pin(future::ready(Some(Ok(()))))); - let udp_stats_event_sender: Arc>> = - Arc::new(Some(Box::new(udp_stats_event_sender_mock))); + let udp_server_stats_event_sender: Arc>> = + Arc::new(Some(Box::new(udp_server_stats_event_sender_mock))); let remote_addr = sample_ipv6_remote_addr(); - let (core_tracker_services, _core_udp_tracker_services) = + let (core_tracker_services, _core_udp_tracker_services, _server_udp_tracker_services) = initialize_core_tracker_services_for_default_tracker_configuration(); handle_scrape( remote_addr, &sample_scrape_request(&remote_addr), &core_tracker_services.scrape_handler, - &udp_stats_event_sender, + &udp_core_stats_event_sender, + &udp_server_stats_event_sender, sample_cookie_valid_range(), ) .await diff --git a/packages/udp-tracker-server/src/lib.rs b/packages/udp-tracker-server/src/lib.rs index 8e3cf503b..9e013bf81 100644 --- a/packages/udp-tracker-server/src/lib.rs +++ b/packages/udp-tracker-server/src/lib.rs @@ -634,10 +634,12 @@ //! documentation by [Arvid Norberg](https://github.com/arvidn) was very //! supportive in the development of this documentation. Some descriptions were //! taken from the [libtorrent](https://www.rasterbar.com/products/libtorrent/udp_tracker_protocol.html). +pub mod container; pub mod environment; pub mod error; pub mod handlers; pub mod server; +pub mod statistics; use std::net::SocketAddr; diff --git a/packages/udp-tracker-server/src/server/launcher.rs b/packages/udp-tracker-server/src/server/launcher.rs index 12d9c740c..acd214ab0 100644 --- a/packages/udp-tracker-server/src/server/launcher.rs +++ b/packages/udp-tracker-server/src/server/launcher.rs @@ -4,7 +4,7 @@ use std::time::Duration; use bittorrent_tracker_client::udp::client::check; use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer; -use bittorrent_udp_tracker_core::{self, statistics, UDP_TRACKER_LOG_TARGET}; +use bittorrent_udp_tracker_core::{self, UDP_TRACKER_LOG_TARGET}; use derive_more::Constructor; use futures_util::StreamExt; use tokio::select; @@ -16,9 +16,11 @@ use torrust_server_lib::signals::{shutdown_signal_with_message, Halted, Started} use tracing::instrument; use super::request_buffer::ActiveRequests; +use crate::container::UdpTrackerServerContainer; use crate::server::bound_socket::BoundSocket; use crate::server::processor::Processor; use crate::server::receiver::Receiver; +use crate::statistics; const IP_BANS_RESET_INTERVAL_IN_SECS: u64 = 3600; @@ -34,9 +36,10 @@ impl Launcher { /// It panics if unable to bind to udp socket, and get the address from the udp socket. /// It panics if unable to send address of socket. /// It panics if the udp server is loaded when the tracker is private. - #[instrument(skip(udp_tracker_container, bind_to, tx_start, rx_halt))] + #[instrument(skip(udp_tracker_core_container, udp_tracker_server_container, bind_to, tx_start, rx_halt))] pub async fn run_with_graceful_shutdown( - udp_tracker_container: Arc, + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, bind_to: SocketAddr, cookie_lifetime: Duration, tx_start: oneshot::Sender, @@ -44,7 +47,7 @@ impl Launcher { ) { tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting on: {bind_to}"); - if udp_tracker_container.core_config.private { + if udp_tracker_core_container.core_config.private { tracing::error!("udp services cannot be used for private trackers"); panic!("it should not use udp if using authentication"); } @@ -74,7 +77,13 @@ impl Launcher { let local_addr = local_udp_url.clone(); tokio::task::spawn(async move { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)"); - let () = Self::run_udp_server_main(receiver, udp_tracker_container, cookie_lifetime).await; + let () = Self::run_udp_server_main( + receiver, + udp_tracker_core_container, + udp_tracker_server_container, + cookie_lifetime, + ) + .await; }) }; @@ -111,10 +120,11 @@ impl Launcher { ServiceHealthCheckJob::new(binding, info, job) } - #[instrument(skip(receiver, udp_tracker_container))] + #[instrument(skip(receiver, udp_tracker_core_container, udp_tracker_server_container))] async fn run_udp_server_main( mut receiver: Receiver, - udp_tracker_container: Arc, + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, cookie_lifetime: Duration, ) { let active_requests = &mut ActiveRequests::default(); @@ -125,7 +135,7 @@ impl Launcher { let cookie_lifetime = cookie_lifetime.as_secs_f64(); - let ban_cleaner = udp_tracker_container.ban_service.clone(); + let ban_cleaner = udp_tracker_core_container.ban_service.clone(); tokio::spawn(async move { let mut cleaner_interval = interval(Duration::from_secs(IP_BANS_RESET_INTERVAL_IN_SECS)); @@ -157,22 +167,29 @@ impl Launcher { } }; - if let Some(udp_stats_event_sender) = udp_tracker_container.udp_stats_event_sender.as_deref() { + if let Some(udp_server_stats_event_sender) = udp_tracker_server_container.udp_server_stats_event_sender.as_deref() + { match req.from.ip() { IpAddr::V4(_) => { - udp_stats_event_sender.send_event(statistics::event::Event::Udp4Request).await; + udp_server_stats_event_sender + .send_event(statistics::event::Event::Udp4IncomingRequest) + .await; } IpAddr::V6(_) => { - udp_stats_event_sender.send_event(statistics::event::Event::Udp6Request).await; + udp_server_stats_event_sender + .send_event(statistics::event::Event::Udp6IncomingRequest) + .await; } } } - if udp_tracker_container.ban_service.read().await.is_banned(&req.from.ip()) { + if udp_tracker_core_container.ban_service.read().await.is_banned(&req.from.ip()) { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop continue: (banned ip)"); - if let Some(udp_stats_event_sender) = udp_tracker_container.udp_stats_event_sender.as_deref() { - udp_stats_event_sender + if let Some(udp_server_stats_event_sender) = + udp_tracker_server_container.udp_server_stats_event_sender.as_deref() + { + udp_server_stats_event_sender .send_event(statistics::event::Event::UdpRequestBanned) .await; } @@ -180,7 +197,12 @@ impl Launcher { continue; } - let processor = Processor::new(receiver.socket.clone(), udp_tracker_container.clone(), cookie_lifetime); + let processor = Processor::new( + receiver.socket.clone(), + udp_tracker_core_container.clone(), + udp_tracker_server_container.clone(), + cookie_lifetime, + ); /* We spawn the new task even if the active requests buffer is full. This could seem counterintuitive because we are accepting @@ -204,8 +226,10 @@ impl Launcher { if old_request_aborted { // Evicted task from active requests buffer was aborted. - if let Some(udp_stats_event_sender) = udp_tracker_container.udp_stats_event_sender.as_deref() { - udp_stats_event_sender + if let Some(udp_server_stats_event_sender) = + udp_tracker_server_container.udp_server_stats_event_sender.as_deref() + { + udp_server_stats_event_sender .send_event(statistics::event::Event::UdpRequestAborted) .await; } diff --git a/packages/udp-tracker-server/src/server/mod.rs b/packages/udp-tracker-server/src/server/mod.rs index 1ab79b6fe..f70e28b27 100644 --- a/packages/udp-tracker-server/src/server/mod.rs +++ b/packages/udp-tracker-server/src/server/mod.rs @@ -64,6 +64,7 @@ mod tests { use super::spawner::Spawner; use super::Server; + use crate::container::UdpTrackerServerContainer; fn initialize_global_services(configuration: &Configuration) { initialize_static(); @@ -97,10 +98,16 @@ mod tests { let stopped = Server::new(Spawner::new(bind_to)); - let udp_tracker_container = UdpTrackerCoreContainer::initialize(&core_config, &udp_tracker_config); + let udp_tracker_core_container = UdpTrackerCoreContainer::initialize(&core_config, &udp_tracker_config); + let udp_tracker_server_container = UdpTrackerServerContainer::initialize(&core_config); let started = stopped - .start(udp_tracker_container, register.give_form(), config.cookie_lifetime) + .start( + udp_tracker_core_container, + udp_tracker_server_container, + register.give_form(), + config.cookie_lifetime, + ) .await .expect("it should start the server"); @@ -131,11 +138,13 @@ mod tests { let stopped = Server::new(Spawner::new(bind_to)); - let udp_tracker_container = UdpTrackerCoreContainer::initialize(&core_config, &udp_tracker_config); + let udp_tracker_core_container = UdpTrackerCoreContainer::initialize(&core_config, &udp_tracker_config); + let udp_tracker_server_container = UdpTrackerServerContainer::initialize(&core_config); let started = stopped .start( - udp_tracker_container, + udp_tracker_core_container, + udp_tracker_server_container, register.give_form(), udp_tracker_config.cookie_lifetime, ) diff --git a/packages/udp-tracker-server/src/server/processor.rs b/packages/udp-tracker-server/src/server/processor.rs index a933fdd17..44b543571 100644 --- a/packages/udp-tracker-server/src/server/processor.rs +++ b/packages/udp-tracker-server/src/server/processor.rs @@ -5,25 +5,33 @@ use std::time::Duration; use aquatic_udp_protocol::Response; use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer; -use bittorrent_udp_tracker_core::{self, statistics}; +use bittorrent_udp_tracker_core::{self}; use tokio::time::Instant; use tracing::{instrument, Level}; use super::bound_socket::BoundSocket; +use crate::container::UdpTrackerServerContainer; use crate::handlers::CookieTimeValues; -use crate::{handlers, RawRequest}; +use crate::{handlers, statistics, RawRequest}; pub struct Processor { socket: Arc, - udp_tracker_container: Arc, + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, cookie_lifetime: f64, } impl Processor { - pub fn new(socket: Arc, udp_tracker_container: Arc, cookie_lifetime: f64) -> Self { + pub fn new( + socket: Arc, + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, + cookie_lifetime: f64, + ) -> Self { Self { socket, - udp_tracker_container, + udp_tracker_core_container, + udp_tracker_server_container, cookie_lifetime, } } @@ -36,7 +44,8 @@ impl Processor { let response = handlers::handle_packet( request, - self.udp_tracker_container.clone(), + self.udp_tracker_core_container.clone(), + self.udp_tracker_server_container.clone(), self.socket.address(), CookieTimeValues::new(self.cookie_lifetime), ) @@ -81,10 +90,12 @@ impl Processor { tracing::debug!(%bytes_count, %sent_bytes, "sent {response_type}"); } - if let Some(udp_stats_event_sender) = self.udp_tracker_container.udp_stats_event_sender.as_deref() { + if let Some(udp_server_stats_event_sender) = + self.udp_tracker_server_container.udp_server_stats_event_sender.as_deref() + { match target.ip() { IpAddr::V4(_) => { - udp_stats_event_sender + udp_server_stats_event_sender .send_event(statistics::event::Event::Udp4Response { kind: udp_response_kind, req_processing_time, @@ -92,7 +103,7 @@ impl Processor { .await; } IpAddr::V6(_) => { - udp_stats_event_sender + udp_server_stats_event_sender .send_event(statistics::event::Event::Udp6Response { kind: udp_response_kind, req_processing_time, diff --git a/packages/udp-tracker-server/src/server/spawner.rs b/packages/udp-tracker-server/src/server/spawner.rs index 6c1f9a48e..46916f6ae 100644 --- a/packages/udp-tracker-server/src/server/spawner.rs +++ b/packages/udp-tracker-server/src/server/spawner.rs @@ -11,6 +11,7 @@ use tokio::task::JoinHandle; use torrust_server_lib::signals::{Halted, Started}; use super::launcher::Launcher; +use crate::container::UdpTrackerServerContainer; #[derive(Constructor, Copy, Clone, Debug, Display)] #[display("(with socket): {bind_to}")] @@ -27,7 +28,8 @@ impl Spawner { #[must_use] pub fn spawn_launcher( &self, - udp_tracker_container: Arc, + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, cookie_lifetime: Duration, tx_start: oneshot::Sender, rx_halt: oneshot::Receiver, @@ -35,8 +37,15 @@ impl Spawner { let spawner = Self::new(self.bind_to); tokio::spawn(async move { - Launcher::run_with_graceful_shutdown(udp_tracker_container, spawner.bind_to, cookie_lifetime, tx_start, rx_halt) - .await; + Launcher::run_with_graceful_shutdown( + udp_tracker_core_container, + udp_tracker_server_container, + spawner.bind_to, + cookie_lifetime, + tx_start, + rx_halt, + ) + .await; spawner }) } diff --git a/packages/udp-tracker-server/src/server/states.rs b/packages/udp-tracker-server/src/server/states.rs index fc700ea40..4d1c97167 100644 --- a/packages/udp-tracker-server/src/server/states.rs +++ b/packages/udp-tracker-server/src/server/states.rs @@ -14,6 +14,7 @@ use tracing::{instrument, Level}; use super::spawner::Spawner; use super::{Server, UdpError}; +use crate::container::UdpTrackerServerContainer; use crate::server::launcher::Launcher; /// A UDP server instance controller with no UDP instance running. @@ -60,10 +61,11 @@ impl Server { /// # Panics /// /// It panics if unable to receive the bound socket address from service. - #[instrument(skip(self, udp_tracker_container, form), err, ret(Display, level = Level::INFO))] + #[instrument(skip(self, udp_tracker_core_container, udp_tracker_server_container, form), err, ret(Display, level = Level::INFO))] pub async fn start( self, - udp_tracker_container: Arc, + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, form: ServiceRegistrationForm, cookie_lifetime: Duration, ) -> Result, std::io::Error> { @@ -73,10 +75,13 @@ impl Server { assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); // May need to wrap in a task to about a tokio bug. - let task = self - .state - .spawner - .spawn_launcher(udp_tracker_container, cookie_lifetime, tx_start, rx_halt); + let task = self.state.spawner.spawn_launcher( + udp_tracker_core_container, + udp_tracker_server_container, + cookie_lifetime, + tx_start, + rx_halt, + ); let local_addr = rx_start.await.expect("it should be able to start the service").address; diff --git a/packages/udp-tracker-server/src/statistics/event/handler.rs b/packages/udp-tracker-server/src/statistics/event/handler.rs new file mode 100644 index 000000000..b3b86e20a --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/handler.rs @@ -0,0 +1,190 @@ +use crate::statistics::event::{Event, UdpResponseKind}; +use crate::statistics::repository::Repository; + +pub async fn handle_event(event: Event, stats_repository: &Repository) { + match event { + // UDP + Event::UdpRequestAborted => { + stats_repository.increase_udp_requests_aborted().await; + } + Event::UdpRequestBanned => { + stats_repository.increase_udp_requests_banned().await; + } + + // UDP4 + Event::Udp4IncomingRequest => { + stats_repository.increase_udp4_requests().await; + } + Event::Udp4Request { kind } => match kind { + UdpResponseKind::Connect => { + stats_repository.increase_udp4_connections().await; + } + UdpResponseKind::Announce => { + stats_repository.increase_udp4_announces().await; + } + UdpResponseKind::Scrape => { + stats_repository.increase_udp4_scrapes().await; + } + UdpResponseKind::Error => {} + }, + Event::Udp4Response { + kind, + req_processing_time, + } => { + stats_repository.increase_udp4_responses().await; + + match kind { + UdpResponseKind::Connect => { + stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + } + UdpResponseKind::Announce => { + stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + } + UdpResponseKind::Scrape => { + stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + } + UdpResponseKind::Error => {} + } + } + Event::Udp4Error => { + stats_repository.increase_udp4_errors().await; + } + + // UDP6 + Event::Udp6IncomingRequest => { + stats_repository.increase_udp6_requests().await; + } + Event::Udp6Request { kind } => match kind { + UdpResponseKind::Connect => { + stats_repository.increase_udp6_connections().await; + } + UdpResponseKind::Announce => { + stats_repository.increase_udp6_announces().await; + } + UdpResponseKind::Scrape => { + stats_repository.increase_udp6_scrapes().await; + } + UdpResponseKind::Error => {} + }, + Event::Udp6Response { + kind: _, + req_processing_time: _, + } => { + stats_repository.increase_udp6_responses().await; + } + Event::Udp6Error => { + stats_repository.increase_udp6_errors().await; + } + } + + tracing::debug!("stats: {:?}", stats_repository.get_stats().await); +} + +#[cfg(test)] +mod tests { + use crate::statistics::event::handler::handle_event; + use crate::statistics::event::Event; + use crate::statistics::repository::Repository; + + #[tokio::test] + async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() { + let stats_repository = Repository::new(); + + handle_event(Event::UdpRequestAborted, &stats_repository).await; + let stats = stats_repository.get_stats().await; + assert_eq!(stats.udp_requests_aborted, 1); + } + #[tokio::test] + async fn should_increase_the_udp_ban_counter_when_it_receives_a_udp_banned_event() { + let stats_repository = Repository::new(); + + handle_event(Event::UdpRequestBanned, &stats_repository).await; + let stats = stats_repository.get_stats().await; + assert_eq!(stats.udp_requests_banned, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_requests_counter_when_it_receives_a_udp4_request_event() { + let stats_repository = Repository::new(); + + handle_event(Event::Udp4IncomingRequest, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_requests, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_responses_counter_when_it_receives_a_udp4_response_event() { + let stats_repository = Repository::new(); + + handle_event( + Event::Udp4Response { + kind: crate::statistics::event::UdpResponseKind::Announce, + req_processing_time: std::time::Duration::from_secs(1), + }, + &stats_repository, + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_responses, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() { + let stats_repository = Repository::new(); + + handle_event(Event::Udp4Error, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_errors_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_requests_counter_when_it_receives_a_udp6_request_event() { + let stats_repository = Repository::new(); + + handle_event(Event::Udp6IncomingRequest, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_requests, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_response_counter_when_it_receives_a_udp6_response_event() { + let stats_repository = Repository::new(); + + handle_event( + Event::Udp6Response { + kind: crate::statistics::event::UdpResponseKind::Announce, + req_processing_time: std::time::Duration::from_secs(1), + }, + &stats_repository, + ) + .await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_responses, 1); + } + #[tokio::test] + async fn should_increase_the_udp6_errors_counter_when_it_receives_a_udp6_error_event() { + let stats_repository = Repository::new(); + + handle_event(Event::Udp6Error, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_errors_handled, 1); + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/listener.rs b/packages/udp-tracker-server/src/statistics/event/listener.rs new file mode 100644 index 000000000..f1a2e25de --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/listener.rs @@ -0,0 +1,11 @@ +use tokio::sync::mpsc; + +use super::handler::handle_event; +use super::Event; +use crate::statistics::repository::Repository; + +pub async fn dispatch_events(mut receiver: mpsc::Receiver, stats_repository: Repository) { + while let Some(event) = receiver.recv().await { + handle_event(event, &stats_repository).await; + } +} diff --git a/packages/udp-tracker-server/src/statistics/event/mod.rs b/packages/udp-tracker-server/src/statistics/event/mod.rs new file mode 100644 index 000000000..6a48b9449 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/mod.rs @@ -0,0 +1,51 @@ +use std::time::Duration; + +pub mod handler; +pub mod listener; +pub mod sender; + +/// An statistics event. It is used to collect tracker metrics. +/// +/// - `Tcp` prefix means the event was triggered by the HTTP tracker +/// - `Udp` prefix means the event was triggered by the UDP tracker +/// - `4` or `6` prefixes means the IP version used by the peer +/// - Finally the event suffix is the type of request: `announce`, `scrape` or `connection` +/// +/// > NOTE: HTTP trackers do not use `connection` requests. +#[derive(Debug, PartialEq, Eq)] +pub enum Event { + // code-review: consider one single event for request type with data: Event::Announce { scheme: HTTPorUDP, ip_version: V4orV6 } + // Attributes are enums too. + UdpRequestAborted, + UdpRequestBanned, + + // UDP4 + Udp4IncomingRequest, + Udp4Request { + kind: UdpResponseKind, + }, + Udp4Response { + kind: UdpResponseKind, + req_processing_time: Duration, + }, + Udp4Error, + + // UDP6 + Udp6IncomingRequest, + Udp6Request { + kind: UdpResponseKind, + }, + Udp6Response { + kind: UdpResponseKind, + req_processing_time: Duration, + }, + Udp6Error, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum UdpResponseKind { + Connect, + Announce, + Scrape, + Error, +} diff --git a/packages/udp-tracker-server/src/statistics/event/sender.rs b/packages/udp-tracker-server/src/statistics/event/sender.rs new file mode 100644 index 000000000..ca4b4e210 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/event/sender.rs @@ -0,0 +1,29 @@ +use futures::future::BoxFuture; +use futures::FutureExt; +#[cfg(test)] +use mockall::{automock, predicate::str}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::SendError; + +use super::Event; + +/// A trait to allow sending statistics events +#[cfg_attr(test, automock)] +pub trait Sender: Sync + Send { + fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>>; +} + +/// An [`statistics::EventSender`](crate::statistics::event::sender::Sender) implementation. +/// +/// It uses a channel sender to send the statistic events. The channel is created by a +/// [`statistics::Keeper`](crate::statistics::keeper::Keeper) +#[allow(clippy::module_name_repetitions)] +pub struct ChannelSender { + pub(crate) sender: mpsc::Sender, +} + +impl Sender for ChannelSender { + fn send_event(&self, event: Event) -> BoxFuture<'_, Option>>> { + async move { Some(self.sender.send(event).await) }.boxed() + } +} diff --git a/packages/udp-tracker-server/src/statistics/keeper.rs b/packages/udp-tracker-server/src/statistics/keeper.rs new file mode 100644 index 000000000..ae80e7970 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/keeper.rs @@ -0,0 +1,77 @@ +use tokio::sync::mpsc; + +use super::event::listener::dispatch_events; +use super::event::sender::{ChannelSender, Sender}; +use super::event::Event; +use super::repository::Repository; + +const CHANNEL_BUFFER_SIZE: usize = 65_535; + +/// The service responsible for keeping tracker metrics (listening to statistics events and handle them). +/// +/// It actively listen to new statistics events. When it receives a new event +/// it accordingly increases the counters. +pub struct Keeper { + pub repository: Repository, +} + +impl Default for Keeper { + fn default() -> Self { + Self::new() + } +} + +impl Keeper { + #[must_use] + pub fn new() -> Self { + Self { + repository: Repository::new(), + } + } + + #[must_use] + pub fn new_active_instance() -> (Box, Repository) { + let mut stats_tracker = Self::new(); + + let stats_event_sender = stats_tracker.run_event_listener(); + + (stats_event_sender, stats_tracker.repository) + } + + pub fn run_event_listener(&mut self) -> Box { + let (sender, receiver) = mpsc::channel::(CHANNEL_BUFFER_SIZE); + + let stats_repository = self.repository.clone(); + + tokio::spawn(async move { dispatch_events(receiver, stats_repository).await }); + + Box::new(ChannelSender { sender }) + } +} + +#[cfg(test)] +mod tests { + use crate::statistics::event::Event; + use crate::statistics::keeper::Keeper; + use crate::statistics::metrics::Metrics; + + #[tokio::test] + async fn should_contain_the_tracker_statistics() { + let stats_tracker = Keeper::new(); + + let stats = stats_tracker.repository.get_stats().await; + + assert_eq!(stats.udp4_requests, Metrics::default().udp4_requests); + } + + #[tokio::test] + async fn should_create_an_event_sender_to_send_statistical_events() { + let mut stats_tracker = Keeper::new(); + + let event_sender = stats_tracker.run_event_listener(); + + let result = event_sender.send_event(Event::Udp4IncomingRequest).await; + + assert!(result.is_some()); + } +} diff --git a/packages/udp-tracker-server/src/statistics/metrics.rs b/packages/udp-tracker-server/src/statistics/metrics.rs new file mode 100644 index 000000000..cce618d74 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/metrics.rs @@ -0,0 +1,60 @@ +/// Metrics collected by the UDP tracker server. +#[derive(Debug, PartialEq, Default)] +pub struct Metrics { + // UDP + /// Total number of UDP (UDP tracker) requests aborted. + pub udp_requests_aborted: u64, + + /// Total number of UDP (UDP tracker) requests banned. + pub udp_requests_banned: u64, + + /// Total number of banned IPs. + pub udp_banned_ips_total: u64, + + /// Average rounded time spent processing UDP connect requests. + pub udp_avg_connect_processing_time_ns: u64, + + /// Average rounded time spent processing UDP announce requests. + pub udp_avg_announce_processing_time_ns: u64, + + /// Average rounded time spent processing UDP scrape requests. + pub udp_avg_scrape_processing_time_ns: u64, + + // UDPv4 + /// Total number of UDP (UDP tracker) requests from IPv4 peers. + pub udp4_requests: u64, + + /// Total number of UDP (UDP tracker) connections from IPv4 peers. + pub udp4_connections_handled: u64, + + /// Total number of UDP (UDP tracker) `announce` requests from IPv4 peers. + pub udp4_announces_handled: u64, + + /// Total number of UDP (UDP tracker) `scrape` requests from IPv4 peers. + pub udp4_scrapes_handled: u64, + + /// Total number of UDP (UDP tracker) responses from IPv4 peers. + pub udp4_responses: u64, + + /// Total number of UDP (UDP tracker) `error` requests from IPv4 peers. + pub udp4_errors_handled: u64, + + // UDPv6 + /// Total number of UDP (UDP tracker) requests from IPv6 peers. + pub udp6_requests: u64, + + /// Total number of UDP (UDP tracker) `connection` requests from IPv6 peers. + pub udp6_connections_handled: u64, + + /// Total number of UDP (UDP tracker) `announce` requests from IPv6 peers. + pub udp6_announces_handled: u64, + + /// Total number of UDP (UDP tracker) `scrape` requests from IPv6 peers. + pub udp6_scrapes_handled: u64, + + /// Total number of UDP (UDP tracker) responses from IPv6 peers. + pub udp6_responses: u64, + + /// Total number of UDP (UDP tracker) `error` requests from IPv6 peers. + pub udp6_errors_handled: u64, +} diff --git a/packages/udp-tracker-server/src/statistics/mod.rs b/packages/udp-tracker-server/src/statistics/mod.rs new file mode 100644 index 000000000..939a41061 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/mod.rs @@ -0,0 +1,6 @@ +pub mod event; +pub mod keeper; +pub mod metrics; +pub mod repository; +pub mod services; +pub mod setup; diff --git a/packages/udp-tracker-server/src/statistics/repository.rs b/packages/udp-tracker-server/src/statistics/repository.rs new file mode 100644 index 000000000..22e793036 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/repository.rs @@ -0,0 +1,173 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::{RwLock, RwLockReadGuard}; + +use super::metrics::Metrics; + +/// A repository for the tracker metrics. +#[derive(Clone)] +pub struct Repository { + pub stats: Arc>, +} + +impl Default for Repository { + fn default() -> Self { + Self::new() + } +} + +impl Repository { + #[must_use] + pub fn new() -> Self { + Self { + stats: Arc::new(RwLock::new(Metrics::default())), + } + } + + pub async fn get_stats(&self) -> RwLockReadGuard<'_, Metrics> { + self.stats.read().await + } + + pub async fn increase_udp_requests_aborted(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp_requests_aborted += 1; + drop(stats_lock); + } + + pub async fn increase_udp_requests_banned(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp_requests_banned += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_requests(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_requests += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_scrapes_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_responses(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_responses += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_errors(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_errors_handled += 1; + drop(stats_lock); + } + + #[allow(clippy::cast_precision_loss)] + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + pub async fn recalculate_udp_avg_connect_processing_time_ns(&self, req_processing_time: Duration) { + let mut stats_lock = self.stats.write().await; + + let req_processing_time = req_processing_time.as_nanos() as f64; + let udp_connections_handled = (stats_lock.udp4_connections_handled + stats_lock.udp6_connections_handled) as f64; + + let previous_avg = stats_lock.udp_avg_connect_processing_time_ns; + + // Moving average: https://en.wikipedia.org/wiki/Moving_average + let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_connections_handled; + + stats_lock.udp_avg_connect_processing_time_ns = new_avg.ceil() as u64; + + drop(stats_lock); + } + + #[allow(clippy::cast_precision_loss)] + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + pub async fn recalculate_udp_avg_announce_processing_time_ns(&self, req_processing_time: Duration) { + let mut stats_lock = self.stats.write().await; + + let req_processing_time = req_processing_time.as_nanos() as f64; + + let udp_announces_handled = (stats_lock.udp4_announces_handled + stats_lock.udp6_announces_handled) as f64; + + let previous_avg = stats_lock.udp_avg_announce_processing_time_ns; + + // Moving average: https://en.wikipedia.org/wiki/Moving_average + let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_announces_handled; + + stats_lock.udp_avg_announce_processing_time_ns = new_avg.ceil() as u64; + + drop(stats_lock); + } + + #[allow(clippy::cast_precision_loss)] + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + pub async fn recalculate_udp_avg_scrape_processing_time_ns(&self, req_processing_time: Duration) { + let mut stats_lock = self.stats.write().await; + + let req_processing_time = req_processing_time.as_nanos() as f64; + let udp_scrapes_handled = (stats_lock.udp4_scrapes_handled + stats_lock.udp6_scrapes_handled) as f64; + + let previous_avg = stats_lock.udp_avg_scrape_processing_time_ns; + + // Moving average: https://en.wikipedia.org/wiki/Moving_average + let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_scrapes_handled; + + stats_lock.udp_avg_scrape_processing_time_ns = new_avg.ceil() as u64; + + drop(stats_lock); + } + + pub async fn increase_udp6_requests(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_requests += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_scrapes_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_responses(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_responses += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_errors(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_errors_handled += 1; + drop(stats_lock); + } +} diff --git a/packages/udp-tracker-server/src/statistics/services.rs b/packages/udp-tracker-server/src/statistics/services.rs new file mode 100644 index 000000000..92ee14f50 --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/services.rs @@ -0,0 +1,147 @@ +//! Statistics services. +//! +//! It includes: +//! +//! - A [`factory`](crate::statistics::setup::factory) function to build the structs needed to collect the tracker metrics. +//! - A [`get_metrics`] service to get the tracker [`metrics`](crate::statistics::metrics::Metrics). +//! +//! Tracker metrics are collected using a Publisher-Subscribe pattern. +//! +//! The factory function builds two structs: +//! +//! - An statistics event [`Sender`](crate::statistics::event::sender::Sender) +//! - An statistics [`Repository`] +//! +//! ```text +//! let (stats_event_sender, stats_repository) = factory(tracker_usage_statistics); +//! ``` +//! +//! The statistics repository is responsible for storing the metrics in memory. +//! The statistics event sender allows sending events related to metrics. +//! There is an event listener that is receiving all the events and processing them with an event handler. +//! Then, the event handler updates the metrics depending on the received event. +//! +//! For example, if you send the event [`Event::Udp4Connect`](crate::statistics::event::Event::Udp4Connect): +//! +//! ```text +//! let result = event_sender.send_event(Event::Udp4Connect).await; +//! ``` +//! +//! Eventually the counter for UDP connections from IPv4 peers will be increased. +//! +//! ```rust,no_run +//! pub struct Metrics { +//! // ... +//! pub udp4_connections_handled: u64, // This will be incremented +//! // ... +//! } +//! ``` +use std::sync::Arc; + +use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository; +use bittorrent_udp_tracker_core::services::banning::BanService; +use tokio::sync::RwLock; +use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; + +use crate::statistics::metrics::Metrics; +use crate::statistics::repository::Repository; + +/// All the metrics collected by the tracker. +#[derive(Debug, PartialEq)] +pub struct TrackerMetrics { + /// Domain level metrics. + /// + /// General metrics for all torrents (number of seeders, leechers, etcetera) + pub torrents_metrics: TorrentsMetrics, + + /// Application level metrics. Usage statistics/metrics. + /// + /// Metrics about how the tracker is been used (number of udp announce requests, etcetera) + pub protocol_metrics: Metrics, +} + +/// It returns all the [`TrackerMetrics`] +pub async fn get_metrics( + in_memory_torrent_repository: Arc, + ban_service: Arc>, + stats_repository: Arc, +) -> TrackerMetrics { + let torrents_metrics = in_memory_torrent_repository.get_torrents_metrics(); + let stats = stats_repository.get_stats().await; + let udp_banned_ips_total = ban_service.read().await.get_banned_ips_total(); + + TrackerMetrics { + torrents_metrics, + protocol_metrics: Metrics { + // UDP + udp_requests_aborted: stats.udp_requests_aborted, + udp_requests_banned: stats.udp_requests_banned, + udp_banned_ips_total: udp_banned_ips_total as u64, + udp_avg_connect_processing_time_ns: stats.udp_avg_connect_processing_time_ns, + udp_avg_announce_processing_time_ns: stats.udp_avg_announce_processing_time_ns, + udp_avg_scrape_processing_time_ns: stats.udp_avg_scrape_processing_time_ns, + // UDPv4 + udp4_requests: stats.udp4_requests, + udp4_connections_handled: stats.udp4_connections_handled, + udp4_announces_handled: stats.udp4_announces_handled, + udp4_scrapes_handled: stats.udp4_scrapes_handled, + udp4_responses: stats.udp4_responses, + udp4_errors_handled: stats.udp4_errors_handled, + // UDPv6 + udp6_requests: stats.udp6_requests, + udp6_connections_handled: stats.udp6_connections_handled, + udp6_announces_handled: stats.udp6_announces_handled, + udp6_scrapes_handled: stats.udp6_scrapes_handled, + udp6_responses: stats.udp6_responses, + udp6_errors_handled: stats.udp6_errors_handled, + }, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepository; + use bittorrent_tracker_core::{self}; + use bittorrent_udp_tracker_core::services::banning::BanService; + use bittorrent_udp_tracker_core::MAX_CONNECTION_ID_ERRORS_PER_IP; + use tokio::sync::RwLock; + use torrust_tracker_configuration::Configuration; + use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; + use torrust_tracker_test_helpers::configuration; + + use crate::statistics; + use crate::statistics::services::{get_metrics, TrackerMetrics}; + + pub fn tracker_configuration() -> Configuration { + configuration::ephemeral() + } + + #[tokio::test] + async fn the_statistics_service_should_return_the_tracker_metrics() { + let config = tracker_configuration(); + + let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + + let (_udp_server_stats_event_sender, udp_server_stats_repository) = + statistics::setup::factory(config.core.tracker_usage_statistics); + let udp_server_stats_repository = Arc::new(udp_server_stats_repository); + + let tracker_metrics = get_metrics( + in_memory_torrent_repository.clone(), + ban_service.clone(), + udp_server_stats_repository.clone(), + ) + .await; + + assert_eq!( + tracker_metrics, + TrackerMetrics { + torrents_metrics: TorrentsMetrics::default(), + protocol_metrics: statistics::metrics::Metrics::default(), + } + ); + } +} diff --git a/packages/udp-tracker-server/src/statistics/setup.rs b/packages/udp-tracker-server/src/statistics/setup.rs new file mode 100644 index 000000000..d3114a75e --- /dev/null +++ b/packages/udp-tracker-server/src/statistics/setup.rs @@ -0,0 +1,54 @@ +//! Setup for the tracker statistics. +//! +//! The [`factory`] function builds the structs needed for handling the tracker metrics. +use crate::statistics; + +/// It builds the structs needed for handling the tracker metrics. +/// +/// It returns: +/// +/// - An statistics event [`Sender`](crate::statistics::event::sender::Sender) that allows you to send events related to statistics. +/// - An statistics [`Repository`](crate::statistics::repository::Repository) which is an in-memory repository for the tracker metrics. +/// +/// When the input argument `tracker_usage_statistics`is false the setup does not run the event listeners, consequently the statistics +/// events are sent are received but not dispatched to the handler. +#[must_use] +pub fn factory( + tracker_usage_statistics: bool, +) -> ( + Option>, + statistics::repository::Repository, +) { + let mut stats_event_sender = None; + + let mut stats_tracker = statistics::keeper::Keeper::new(); + + if tracker_usage_statistics { + stats_event_sender = Some(stats_tracker.run_event_listener()); + } + + (stats_event_sender, stats_tracker.repository) +} + +#[cfg(test)] +mod test { + use super::factory; + + #[tokio::test] + async fn should_not_send_any_event_when_statistics_are_disabled() { + let tracker_usage_statistics = false; + + let (stats_event_sender, _stats_repository) = factory(tracker_usage_statistics); + + assert!(stats_event_sender.is_none()); + } + + #[tokio::test] + async fn should_send_events_when_statistics_are_enabled() { + let tracker_usage_statistics = true; + + let (stats_event_sender, _stats_repository) = factory(tracker_usage_statistics); + + assert!(stats_event_sender.is_some()); + } +} diff --git a/packages/udp-tracker-server/tests/server/contract.rs b/packages/udp-tracker-server/tests/server/contract.rs index d2da552a2..4cb23621d 100644 --- a/packages/udp-tracker-server/tests/server/contract.rs +++ b/packages/udp-tracker-server/tests/server/contract.rs @@ -267,8 +267,8 @@ mod receiving_an_announce_request { let udp_requests_banned_before = env .container - .udp_tracker_core_container - .udp_stats_repository + .udp_tracker_server_container + .udp_server_stats_repository .get_stats() .await .udp_requests_banned; @@ -283,8 +283,8 @@ mod receiving_an_announce_request { let udp_requests_banned_after = env .container - .udp_tracker_core_container - .udp_stats_repository + .udp_tracker_server_container + .udp_server_stats_repository .get_stats() .await .udp_requests_banned; diff --git a/src/app.rs b/src/app.rs index 27ffe7a4a..5458ea600 100644 --- a/src/app.rs +++ b/src/app.rs @@ -79,8 +79,11 @@ pub async fn start(config: &Configuration, app_container: &Arc) -> } else { let udp_tracker_config = Arc::new(udp_tracker_config.clone()); let udp_tracker_container = Arc::new(app_container.udp_tracker_container(&udp_tracker_config)); + let udp_tracker_server_container = Arc::new(app_container.udp_tracker_server_container()); - jobs.push(udp_tracker::start_job(udp_tracker_container, registar.give_form()).await); + jobs.push( + udp_tracker::start_job(udp_tracker_container, udp_tracker_server_container, registar.give_form()).await, + ); } } } else { diff --git a/src/bootstrap/jobs/udp_tracker.rs b/src/bootstrap/jobs/udp_tracker.rs index 0276de1d3..2723ad9ab 100644 --- a/src/bootstrap/jobs/udp_tracker.rs +++ b/src/bootstrap/jobs/udp_tracker.rs @@ -12,6 +12,7 @@ use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer; use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET; use tokio::task::JoinHandle; use torrust_server_lib::registar::ServiceRegistrationForm; +use torrust_udp_tracker_server::container::UdpTrackerServerContainer; use torrust_udp_tracker_server::server::spawner::Spawner; use torrust_udp_tracker_server::server::Server; use tracing::instrument; @@ -27,13 +28,22 @@ use tracing::instrument; /// It will panic if the task did not finish successfully. #[must_use] #[allow(clippy::async_yields_async)] -#[instrument(skip(udp_tracker_container, form))] -pub async fn start_job(udp_tracker_container: Arc, form: ServiceRegistrationForm) -> JoinHandle<()> { - let bind_to = udp_tracker_container.udp_tracker_config.bind_address; - let cookie_lifetime = udp_tracker_container.udp_tracker_config.cookie_lifetime; +#[instrument(skip(udp_tracker_core_container, udp_tracker_server_container, form))] +pub async fn start_job( + udp_tracker_core_container: Arc, + udp_tracker_server_container: Arc, + form: ServiceRegistrationForm, +) -> JoinHandle<()> { + let bind_to = udp_tracker_core_container.udp_tracker_config.bind_address; + let cookie_lifetime = udp_tracker_core_container.udp_tracker_config.cookie_lifetime; let server = Server::new(Spawner::new(bind_to)) - .start(udp_tracker_container, form, cookie_lifetime) + .start( + udp_tracker_core_container, + udp_tracker_server_container, + form, + cookie_lifetime, + ) .await .expect("it should be able to start the udp tracker"); diff --git a/src/container.rs b/src/container.rs index 6f6d9013d..b10ac9ae0 100644 --- a/src/container.rs +++ b/src/container.rs @@ -19,6 +19,7 @@ use bittorrent_udp_tracker_core::{self, MAX_CONNECTION_ID_ERRORS_PER_IP}; use tokio::sync::RwLock; use torrust_tracker_api_core::container::TrackerHttpApiCoreContainer; use torrust_tracker_configuration::{Configuration, Core, HttpApi, HttpTracker, UdpTracker}; +use torrust_udp_tracker_server::container::UdpTrackerServerContainer; use tracing::instrument; pub struct AppContainer { @@ -38,12 +39,16 @@ pub struct AppContainer { // UDP Tracker Core Services pub ban_service: Arc>, - pub udp_stats_event_sender: Arc>>, + pub udp_core_stats_event_sender: Arc>>, + pub udp_core_stats_repository: Arc, // HTTP Tracker Core Services pub http_stats_event_sender: Arc>>, pub http_stats_repository: Arc, - pub udp_stats_repository: Arc, + + // UDP Tracker Server Services + pub udp_server_stats_event_sender: Arc>>, + pub udp_server_stats_repository: Arc, } impl AppContainer { @@ -53,20 +58,26 @@ impl AppContainer { let tracker_core_container = TrackerCoreContainer::initialize(&core_config); - // HTTP stats + // HTTP core stats let (http_stats_event_sender, http_stats_repository) = bittorrent_http_tracker_core::statistics::setup::factory(configuration.core.tracker_usage_statistics); let http_stats_event_sender = Arc::new(http_stats_event_sender); let http_stats_repository = Arc::new(http_stats_repository); - // UDP stats - let (udp_stats_event_sender, udp_stats_repository) = + // UDP core stats + let (udp_core_stats_event_sender, udp_core_stats_repository) = bittorrent_udp_tracker_core::statistics::setup::factory(configuration.core.tracker_usage_statistics); - let udp_stats_event_sender = Arc::new(udp_stats_event_sender); - let udp_stats_repository = Arc::new(udp_stats_repository); + let udp_core_stats_event_sender = Arc::new(udp_core_stats_event_sender); + let udp_core_stats_repository = Arc::new(udp_core_stats_repository); let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + // UDP server stats + let (udp_server_stats_event_sender, udp_server_stats_repository) = + torrust_udp_tracker_server::statistics::setup::factory(configuration.core.tracker_usage_statistics); + let udp_server_stats_event_sender = Arc::new(udp_server_stats_event_sender); + let udp_server_stats_repository = Arc::new(udp_server_stats_repository); + AppContainer { core_config, database: tracker_core_container.database, @@ -82,9 +93,11 @@ impl AppContainer { torrents_manager: tracker_core_container.torrents_manager, ban_service, http_stats_event_sender, - udp_stats_event_sender, + udp_core_stats_event_sender, http_stats_repository, - udp_stats_repository, + udp_core_stats_repository, + udp_server_stats_event_sender, + udp_server_stats_repository, } } @@ -112,8 +125,8 @@ impl AppContainer { whitelist_authorization: self.whitelist_authorization.clone(), udp_tracker_config: udp_tracker_config.clone(), - udp_stats_event_sender: self.udp_stats_event_sender.clone(), - udp_stats_repository: self.udp_stats_repository.clone(), + udp_core_stats_event_sender: self.udp_core_stats_event_sender.clone(), + udp_core_stats_repository: self.udp_core_stats_repository.clone(), ban_service: self.ban_service.clone(), } } @@ -128,7 +141,16 @@ impl AppContainer { whitelist_manager: self.whitelist_manager.clone(), ban_service: self.ban_service.clone(), http_stats_repository: self.http_stats_repository.clone(), - udp_stats_repository: self.udp_stats_repository.clone(), + udp_core_stats_repository: self.udp_core_stats_repository.clone(), + udp_server_stats_repository: self.udp_server_stats_repository.clone(), + } + } + + #[must_use] + pub fn udp_tracker_server_container(&self) -> UdpTrackerServerContainer { + UdpTrackerServerContainer { + udp_server_stats_event_sender: self.udp_server_stats_event_sender.clone(), + udp_server_stats_repository: self.udp_server_stats_repository.clone(), } } }