From aa4bfbaf5496de32a02bc1d23779ba200974f2a0 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 12 Apr 2024 15:54:49 +0100 Subject: [PATCH] refactor: segregate command and query for announce request This changes the API of the torrent repository. The method: ``` fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata); ``` is replaced with: ``` fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer); fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option; ``` The performance is not affected. Benchmaring is still using both methods in order to simulate `announce` requests. 1. The interface is simpler (command/query segregation. 2. In the long-term: - Returning swarm metadata in the announce request could be optional. The announce request process would be faster if the tracker does not have to mantain the swarm data. This is not likely to happen becuase the scrape request needs this metadata. - New repository performance improvements could be implemented. This allow decoupling peer lists from swarm metadata. The repository internally can have two data strcutures one for the peer list and another for the swarm metatada. Both using different locks. --- .../benches/helpers/asyn.rs | 34 +++++------ .../benches/helpers/sync.rs | 22 +++++-- packages/torrent-repository/src/entry/mod.rs | 20 ++---- .../torrent-repository/src/entry/mutex_std.rs | 14 ++--- .../src/entry/mutex_tokio.rs | 12 ++-- .../torrent-repository/src/entry/single.rs | 10 +-- .../src/repository/dash_map_mutex_std.rs | 16 ++--- .../torrent-repository/src/repository/mod.rs | 10 ++- .../src/repository/rw_lock_std.rs | 10 ++- .../src/repository/rw_lock_std_mutex_std.rs | 12 +++- .../src/repository/rw_lock_std_mutex_tokio.rs | 16 ++++- .../src/repository/rw_lock_tokio.rs | 11 +++- .../src/repository/rw_lock_tokio_mutex_std.rs | 11 +++- .../repository/rw_lock_tokio_mutex_tokio.rs | 14 ++++- .../src/repository/skip_map_mutex_std.rs | 10 ++- .../torrent-repository/tests/common/repo.rs | 43 +++++++------ .../tests/common/torrent.rs | 22 +++---- .../torrent-repository/tests/entry/mod.rs | 41 +++++++------ .../tests/repository/mod.rs | 40 ++++++++---- src/core/mod.rs | 61 +++++++++++-------- src/core/services/torrent.rs | 38 ++++-------- src/servers/udp/handlers.rs | 12 +--- tests/servers/api/environment.rs | 2 +- tests/servers/http/environment.rs | 2 +- tests/servers/udp/environment.rs | 2 +- 25 files changed, 259 insertions(+), 226 deletions(-) diff --git a/packages/torrent-repository/benches/helpers/asyn.rs b/packages/torrent-repository/benches/helpers/asyn.rs index 80f70cdc..1c6d9d91 100644 --- a/packages/torrent-repository/benches/helpers/asyn.rs +++ b/packages/torrent-repository/benches/helpers/asyn.rs @@ -18,9 +18,9 @@ where let info_hash = InfoHash([0; 20]); - torrent_repository - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) - .await; + torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER).await; + + torrent_repository.get_swarm_metadata(&info_hash).await; } start.elapsed() @@ -37,9 +37,9 @@ where let handles = FuturesUnordered::new(); // Add the torrent/peer to the torrent repository - torrent_repository - .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) - .await; + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await; + + torrent_repository.get_swarm_metadata(info_hash).await; let start = Instant::now(); @@ -47,9 +47,9 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone - .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) - .await; + torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER).await; + + torrent_repository_clone.get_swarm_metadata(info_hash).await; if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); @@ -87,9 +87,9 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) - .await; + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await; + + torrent_repository_clone.get_swarm_metadata(&info_hash).await; if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); @@ -123,9 +123,8 @@ where // Add the torrents/peers to the torrent repository for info_hash in &info_hashes { - torrent_repository - .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) - .await; + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await; + torrent_repository.get_swarm_metadata(info_hash).await; } let start = Instant::now(); @@ -134,9 +133,8 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) - .await; + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await; + torrent_repository_clone.get_swarm_metadata(&info_hash).await; if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); diff --git a/packages/torrent-repository/benches/helpers/sync.rs b/packages/torrent-repository/benches/helpers/sync.rs index 0523f414..63fccfc7 100644 --- a/packages/torrent-repository/benches/helpers/sync.rs +++ b/packages/torrent-repository/benches/helpers/sync.rs @@ -20,7 +20,9 @@ where let info_hash = InfoHash([0; 20]); - torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER); + + torrent_repository.get_swarm_metadata(&info_hash); } start.elapsed() @@ -37,7 +39,9 @@ where let handles = FuturesUnordered::new(); // Add the torrent/peer to the torrent repository - torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER); + + torrent_repository.get_swarm_metadata(info_hash); let start = Instant::now(); @@ -45,7 +49,9 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER); + + torrent_repository_clone.get_swarm_metadata(info_hash); if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); @@ -83,7 +89,9 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER); + + torrent_repository_clone.get_swarm_metadata(&info_hash); if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); @@ -117,7 +125,8 @@ where // Add the torrents/peers to the torrent repository for info_hash in &info_hashes { - torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER); + torrent_repository.get_swarm_metadata(info_hash); } let start = Instant::now(); @@ -126,7 +135,8 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.get_swarm_metadata(&info_hash); if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index 4c39af82..d72ff254 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -15,7 +15,7 @@ pub trait Entry { /// It returns the swarm metadata (statistics) as a struct: /// /// `(seeders, completed, leechers)` - fn get_stats(&self) -> SwarmMetadata; + fn get_swarm_metadata(&self) -> SwarmMetadata; /// Returns True if Still a Valid Entry according to the Tracker Policy fn is_good(&self, policy: &TrackerPolicy) -> bool; @@ -40,10 +40,7 @@ pub trait Entry { /// /// The number of peers that have complete downloading is synchronously updated when peers are updated. /// That's the total torrent downloads counter. - fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool; - - // It preforms a combined operation of `insert_or_update_peer` and `get_stats`. - fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata); + fn upsert_peer(&mut self, peer: &peer::Peer) -> bool; /// It removes peer from the swarm that have not been updated for more than `current_cutoff` seconds fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch); @@ -51,20 +48,19 @@ pub trait Entry { #[allow(clippy::module_name_repetitions)] pub trait EntrySync { - fn get_stats(&self) -> SwarmMetadata; + fn get_swarm_metadata(&self) -> SwarmMetadata; fn is_good(&self, policy: &TrackerPolicy) -> bool; fn peers_is_empty(&self) -> bool; fn get_peers_len(&self) -> usize; fn get_peers(&self, limit: Option) -> Vec>; fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec>; - fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool; - fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata); + fn upsert_peer(&self, peer: &peer::Peer) -> bool; fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch); } #[allow(clippy::module_name_repetitions)] pub trait EntryAsync { - fn get_stats(&self) -> impl std::future::Future + Send; + fn get_swarm_metadata(&self) -> impl std::future::Future + Send; fn check_good(self, policy: &TrackerPolicy) -> impl std::future::Future + Send; fn peers_is_empty(&self) -> impl std::future::Future + Send; fn get_peers_len(&self) -> impl std::future::Future + Send; @@ -74,11 +70,7 @@ pub trait EntryAsync { client: &SocketAddr, limit: Option, ) -> impl std::future::Future>> + Send; - fn insert_or_update_peer(self, peer: &peer::Peer) -> impl std::future::Future + Send; - fn insert_or_update_peer_and_get_stats( - self, - peer: &peer::Peer, - ) -> impl std::future::Future + std::marker::Send; + fn upsert_peer(self, peer: &peer::Peer) -> impl std::future::Future + Send; fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future + Send; } diff --git a/packages/torrent-repository/src/entry/mutex_std.rs b/packages/torrent-repository/src/entry/mutex_std.rs index b4b82390..990d8ab7 100644 --- a/packages/torrent-repository/src/entry/mutex_std.rs +++ b/packages/torrent-repository/src/entry/mutex_std.rs @@ -9,8 +9,8 @@ use super::{Entry, EntrySync}; use crate::{EntryMutexStd, EntrySingle}; impl EntrySync for EntryMutexStd { - fn get_stats(&self) -> SwarmMetadata { - self.lock().expect("it should get a lock").get_stats() + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().expect("it should get a lock").get_swarm_metadata() } fn is_good(&self, policy: &TrackerPolicy) -> bool { @@ -33,14 +33,8 @@ impl EntrySync for EntryMutexStd { self.lock().expect("it should get lock").get_peers_for_client(client, limit) } - fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool { - self.lock().expect("it should lock the entry").insert_or_update_peer(peer) - } - - fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - self.lock() - .expect("it should lock the entry") - .insert_or_update_peer_and_get_stats(peer) + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.lock().expect("it should lock the entry").upsert_peer(peer) } fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { diff --git a/packages/torrent-repository/src/entry/mutex_tokio.rs b/packages/torrent-repository/src/entry/mutex_tokio.rs index 34f4a4e9..c5363e51 100644 --- a/packages/torrent-repository/src/entry/mutex_tokio.rs +++ b/packages/torrent-repository/src/entry/mutex_tokio.rs @@ -9,8 +9,8 @@ use super::{Entry, EntryAsync}; use crate::{EntryMutexTokio, EntrySingle}; impl EntryAsync for EntryMutexTokio { - async fn get_stats(&self) -> SwarmMetadata { - self.lock().await.get_stats() + async fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().await.get_swarm_metadata() } async fn check_good(self, policy: &TrackerPolicy) -> bool { @@ -33,12 +33,8 @@ impl EntryAsync for EntryMutexTokio { self.lock().await.get_peers_for_client(client, limit) } - async fn insert_or_update_peer(self, peer: &peer::Peer) -> bool { - self.lock().await.insert_or_update_peer(peer) - } - - async fn insert_or_update_peer_and_get_stats(self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - self.lock().await.insert_or_update_peer_and_get_stats(peer) + async fn upsert_peer(self, peer: &peer::Peer) -> bool { + self.lock().await.upsert_peer(peer) } async fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) { diff --git a/packages/torrent-repository/src/entry/single.rs b/packages/torrent-repository/src/entry/single.rs index c1041e9a..a38b5402 100644 --- a/packages/torrent-repository/src/entry/single.rs +++ b/packages/torrent-repository/src/entry/single.rs @@ -12,7 +12,7 @@ use crate::EntrySingle; impl Entry for EntrySingle { #[allow(clippy::cast_possible_truncation)] - fn get_stats(&self) -> SwarmMetadata { + fn get_swarm_metadata(&self) -> SwarmMetadata { let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32; let incomplete: u32 = self.peers.len() as u32 - complete; @@ -70,7 +70,7 @@ impl Entry for EntrySingle { } } - fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool { + fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { let mut downloaded_stats_updated: bool = false; match peer::ReadInfo::get_event(peer) { @@ -93,12 +93,6 @@ impl Entry for EntrySingle { downloaded_stats_updated } - fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - let changed = self.insert_or_update_peer(peer); - let stats = self.get_stats(); - (changed, stats) - } - fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { self.peers .retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff); diff --git a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs index 67c47973..b398b09d 100644 --- a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs @@ -23,19 +23,21 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { if let Some(entry) = self.torrents.get(info_hash) { - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); } else { let _unused = self.torrents.insert(*info_hash, Arc::default()); - - match self.torrents.get(info_hash) { - Some(entry) => entry.insert_or_update_peer_and_get_stats(peer), - None => (false, SwarmMetadata::zeroed()), + if let Some(entry) = self.torrents.get(info_hash) { + entry.upsert_peer(peer); } } } + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) + } + fn get(&self, key: &InfoHash) -> Option { let maybe_entry = self.torrents.get(key); maybe_entry.map(|entry| entry.clone()) @@ -45,7 +47,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in &self.torrents { - let stats = entry.value().lock().expect("it should get a lock").get_stats(); + let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/mod.rs b/packages/torrent-repository/src/repository/mod.rs index c7c64c54..f198288f 100644 --- a/packages/torrent-repository/src/repository/mod.rs +++ b/packages/torrent-repository/src/repository/mod.rs @@ -24,7 +24,8 @@ pub trait Repository: Debug + Default + Sized + 'static { fn remove(&self, key: &InfoHash) -> Option; fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch); fn remove_peerless_torrents(&self, policy: &TrackerPolicy); - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata); + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer); + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option; } #[allow(clippy::module_name_repetitions)] @@ -36,9 +37,6 @@ pub trait RepositoryAsync: Debug + Default + Sized + 'static { fn remove(&self, key: &InfoHash) -> impl std::future::Future> + Send; fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future + Send; fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> impl std::future::Future + Send; - fn update_torrent_with_peer_and_get_stats( - &self, - info_hash: &InfoHash, - peer: &peer::Peer, - ) -> impl std::future::Future + Send; + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> impl std::future::Future + Send; + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> impl std::future::Future> + Send; } diff --git a/packages/torrent-repository/src/repository/rw_lock_std.rs b/packages/torrent-repository/src/repository/rw_lock_std.rs index e9074a27..af48428e 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std.rs @@ -47,12 +47,16 @@ impl Repository for TorrentsRwLockStd where EntrySingle: Entry, { - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let mut db = self.get_torrents_mut(); let entry = db.entry(*info_hash).or_insert(EntrySingle::default()); - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.get(info_hash).map(|entry| entry.get_swarm_metadata()) } fn get(&self, key: &InfoHash) -> Option { @@ -64,7 +68,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().values() { - let stats = entry.get_stats(); + let stats = entry.get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs index 0b65234e..74cdc447 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs @@ -33,7 +33,7 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -44,7 +44,13 @@ where entry.clone() }; - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.get_torrents() + .get(info_hash) + .map(super::super::entry::EntrySync::get_swarm_metadata) } fn get(&self, key: &InfoHash) -> Option { @@ -56,7 +62,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().values() { - let stats = entry.lock().expect("it should get a lock").get_stats(); + let stats = entry.lock().expect("it should get a lock").get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs index 5394abb6..83ac02c9 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs @@ -37,7 +37,7 @@ where EntryMutexTokio: EntryAsync, EntrySingle: Entry, { - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -48,8 +48,18 @@ where entry.clone() }; - entry.insert_or_update_peer_and_get_stats(peer).await + entry.upsert_peer(peer).await; } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + let maybe_entry = self.get_torrents().get(info_hash).cloned(); + + match maybe_entry { + Some(entry) => Some(entry.get_swarm_metadata().await), + None => None, + } + } + async fn get(&self, key: &InfoHash) -> Option { let db = self.get_torrents(); db.get(key).cloned() @@ -75,7 +85,7 @@ where let entries: Vec<_> = self.get_torrents().values().cloned().collect(); for entry in entries { - let stats = entry.lock().await.get_stats(); + let stats = entry.lock().await.get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio.rs index d84074ea..b95f1e31 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio.rs @@ -51,13 +51,18 @@ impl RepositoryAsync for TorrentsRwLockTokio where EntrySingle: Entry, { - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let mut db = self.get_torrents_mut().await; let entry = db.entry(*info_hash).or_insert(EntrySingle::default()); - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.get(info_hash).await.map(|entry| entry.get_swarm_metadata()) + } + async fn get(&self, key: &InfoHash) -> Option { let db = self.get_torrents().await; db.get(key).cloned() @@ -81,7 +86,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().await.values() { - let stats = entry.get_stats(); + let stats = entry.get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs index fbbc51a0..bde95994 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs @@ -35,7 +35,7 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -46,8 +46,13 @@ where entry.clone() }; - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.get(info_hash).await.map(|entry| entry.get_swarm_metadata()) + } + async fn get(&self, key: &InfoHash) -> Option { let db = self.get_torrents().await; db.get(key).cloned() @@ -71,7 +76,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().await.values() { - let stats = entry.get_stats(); + let stats = entry.get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs index bc7fd61e..1d002e31 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs @@ -35,7 +35,7 @@ where EntryMutexTokio: EntryAsync, EntrySingle: Entry, { - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -46,8 +46,16 @@ where entry.clone() }; - entry.insert_or_update_peer_and_get_stats(peer).await + entry.upsert_peer(peer).await; } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + match self.get(info_hash).await { + Some(entry) => Some(entry.get_swarm_metadata().await), + None => None, + } + } + async fn get(&self, key: &InfoHash) -> Option { let db = self.get_torrents().await; db.get(key).cloned() @@ -71,7 +79,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().await.values() { - let stats = entry.get_stats().await; + let stats = entry.get_swarm_metadata().await; metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index 0c0127b1..ef3e7e47 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -23,9 +23,13 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); - entry.value().insert_or_update_peer_and_get_stats(peer) + entry.value().upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) } fn get(&self, key: &InfoHash) -> Option { @@ -37,7 +41,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in &self.torrents { - let stats = entry.value().lock().expect("it should get a lock").get_stats(); + let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 5a6eddf9..7c245fe0 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -23,6 +23,32 @@ pub(crate) enum Repo { } impl Repo { + pub(crate) async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + match self { + Repo::RwLockStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::RwLockStdMutexStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::RwLockStdMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, + Repo::RwLockTokio(repo) => repo.upsert_peer(info_hash, peer).await, + Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer).await, + Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, + Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), + } + } + + pub(crate) async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + match self { + Repo::RwLockStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::RwLockStdMutexStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::RwLockStdMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await, + Repo::RwLockTokio(repo) => repo.get_swarm_metadata(info_hash).await, + Repo::RwLockTokioMutexStd(repo) => repo.get_swarm_metadata(info_hash).await, + Repo::RwLockTokioMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await, + Repo::SkipMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::DashMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), + } + } + pub(crate) async fn get(&self, key: &InfoHash) -> Option { match self { Repo::RwLockStd(repo) => repo.get(key), @@ -145,23 +171,6 @@ impl Repo { } } - pub(crate) async fn update_torrent_with_peer_and_get_stats( - &self, - info_hash: &InfoHash, - peer: &peer::Peer, - ) -> (bool, SwarmMetadata) { - match self { - Repo::RwLockStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::RwLockStdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::RwLockStdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::RwLockTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::RwLockTokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::RwLockTokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::SkipMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::DashMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - } - } - pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option { match self { Repo::RwLockStd(repo) => { diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs index 33264c44..c0699479 100644 --- a/packages/torrent-repository/tests/common/torrent.rs +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -17,9 +17,9 @@ pub(crate) enum Torrent { impl Torrent { pub(crate) async fn get_stats(&self) -> SwarmMetadata { match self { - Torrent::Single(entry) => entry.get_stats(), - Torrent::MutexStd(entry) => entry.get_stats(), - Torrent::MutexTokio(entry) => entry.clone().get_stats().await, + Torrent::Single(entry) => entry.get_swarm_metadata(), + Torrent::MutexStd(entry) => entry.get_swarm_metadata(), + Torrent::MutexTokio(entry) => entry.clone().get_swarm_metadata().await, } } @@ -63,19 +63,11 @@ impl Torrent { } } - pub(crate) async fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool { + pub(crate) async fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { match self { - Torrent::Single(entry) => entry.insert_or_update_peer(peer), - Torrent::MutexStd(entry) => entry.insert_or_update_peer(peer), - Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer(peer).await, - } - } - - pub(crate) async fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - match self { - Torrent::Single(entry) => entry.insert_or_update_peer_and_get_stats(peer), - Torrent::MutexStd(entry) => entry.insert_or_update_peer_and_get_stats(peer), - Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer_and_get_stats(peer).await, + Torrent::Single(entry) => entry.upsert_peer(peer), + Torrent::MutexStd(entry) => entry.upsert_peer(peer), + Torrent::MutexTokio(entry) => entry.clone().upsert_peer(peer).await, } } diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/entry/mod.rs index c39bef63..3c564c6f 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/entry/mod.rs @@ -62,34 +62,34 @@ async fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { Makes::Empty => vec![], Makes::Started => { let peer = a_started_peer(1); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; vec![peer] } Makes::Completed => { let peer = a_completed_peer(2); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; vec![peer] } Makes::Downloaded => { let mut peer = a_started_peer(3); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; vec![peer] } Makes::Three => { let peer_1 = a_started_peer(1); - torrent.insert_or_update_peer(&peer_1).await; + torrent.upsert_peer(&peer_1).await; let peer_2 = a_completed_peer(2); - torrent.insert_or_update_peer(&peer_2).await; + torrent.upsert_peer(&peer_2).await; let mut peer_3 = a_started_peer(3); - torrent.insert_or_update_peer(&peer_3).await; + torrent.upsert_peer(&peer_3).await; peer_3.event = AnnounceEvent::Completed; peer_3.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer_3).await; + torrent.upsert_peer(&peer_3).await; vec![peer_1, peer_2, peer_3] } } @@ -182,7 +182,7 @@ async fn it_should_update_a_peer( // Make and insert a new peer. let mut peer = a_started_peer(-1); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; // Get the Inserted Peer by Id. let peers = torrent.get_peers(None).await; @@ -195,7 +195,7 @@ async fn it_should_update_a_peer( // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; // Get the Updated Peer by Id. let peers = torrent.get_peers(None).await; @@ -224,7 +224,7 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( let mut peer = a_started_peer(-1); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; // The started peer should be inserted. let peers = torrent.get_peers(None).await; @@ -237,7 +237,7 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( // Change peer to "Stopped" and insert. peer.event = AnnounceEvent::Stopped; - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; // It should be removed now. let peers = torrent.get_peers(None).await; @@ -270,13 +270,12 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - let (updated, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; + torrent.upsert_peer(&peer).await; + let stats = torrent.get_stats().await; if is_already_completed { - assert!(!updated); assert_eq!(stats.downloaded, downloaded); } else { - assert!(updated); assert_eq!(stats.downloaded, downloaded + 1); } } @@ -301,7 +300,8 @@ async fn it_should_update_a_peer_as_a_seeder( // Set Bytes Left to Zero peer.left = NumberOfBytes(0); - let (_, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; // Add the peer + torrent.upsert_peer(&peer).await; + let stats = torrent.get_stats().await; if is_already_non_left { // it was already complete @@ -332,7 +332,8 @@ async fn it_should_update_a_peer_as_incomplete( // Set Bytes Left to no Zero peer.left = NumberOfBytes(1); - let (_, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; // Add the peer + torrent.upsert_peer(&peer).await; + let stats = torrent.get_stats().await; if completed_already { // now it is incomplete @@ -368,7 +369,7 @@ async fn it_should_get_peers_excluding_the_client_socket( // set the address to the socket. peer.peer_addr = socket; - torrent.insert_or_update_peer(&peer).await; // Add peer + torrent.upsert_peer(&peer).await; // Add peer // It should not include the peer that has the same socket. assert!(!torrent.get_peers_for_client(&socket, None).await.contains(&peer.into())); @@ -391,7 +392,7 @@ async fn it_should_limit_the_number_of_peers_returned( for peer_number in 1..=74 + 1 { let mut peer = a_started_peer(1); peer.peer_id = peer::Id::from(peer_number); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; } let peers = torrent.get_peers(Some(TORRENT_PEERS_LIMIT)).await; @@ -422,7 +423,7 @@ async fn it_should_remove_inactive_peers_beyond_cutoff( peer.updated = now.sub(EXPIRE); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; assert_eq!(torrent.get_peers_len().await, peers.len() + 1); diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index a6784bf5..fde34467 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -6,6 +6,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::announce_event::AnnounceEvent; use torrust_tracker_primitives::info_hash::InfoHash; use torrust_tracker_primitives::pagination::Pagination; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents}; use torrust_tracker_torrent_repository::entry::Entry as _; use torrust_tracker_torrent_repository::repository::dash_map_mutex_std::XacrimonDashMap; @@ -72,14 +73,14 @@ fn default() -> Entries { #[fixture] fn started() -> Entries { let mut torrent = EntrySingle::default(); - torrent.insert_or_update_peer(&a_started_peer(1)); + torrent.upsert_peer(&a_started_peer(1)); vec![(InfoHash::default(), torrent)] } #[fixture] fn completed() -> Entries { let mut torrent = EntrySingle::default(); - torrent.insert_or_update_peer(&a_completed_peer(2)); + torrent.upsert_peer(&a_completed_peer(2)); vec![(InfoHash::default(), torrent)] } @@ -87,10 +88,10 @@ fn completed() -> Entries { fn downloaded() -> Entries { let mut torrent = EntrySingle::default(); let mut peer = a_started_peer(3); - torrent.insert_or_update_peer(&peer); + torrent.upsert_peer(&peer); peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer); + torrent.upsert_peer(&peer); vec![(InfoHash::default(), torrent)] } @@ -98,21 +99,21 @@ fn downloaded() -> Entries { fn three() -> Entries { let mut started = EntrySingle::default(); let started_h = &mut DefaultHasher::default(); - started.insert_or_update_peer(&a_started_peer(1)); + started.upsert_peer(&a_started_peer(1)); started.hash(started_h); let mut completed = EntrySingle::default(); let completed_h = &mut DefaultHasher::default(); - completed.insert_or_update_peer(&a_completed_peer(2)); + completed.upsert_peer(&a_completed_peer(2)); completed.hash(completed_h); let mut downloaded = EntrySingle::default(); let downloaded_h = &mut DefaultHasher::default(); let mut downloaded_peer = a_started_peer(3); - downloaded.insert_or_update_peer(&downloaded_peer); + downloaded.upsert_peer(&downloaded_peer); downloaded_peer.event = AnnounceEvent::Completed; downloaded_peer.left = NumberOfBytes(0); - downloaded.insert_or_update_peer(&downloaded_peer); + downloaded.upsert_peer(&downloaded_peer); downloaded.hash(downloaded_h); vec![ @@ -128,7 +129,7 @@ fn many_out_of_order() -> Entries { for i in 0..408 { let mut entry = EntrySingle::default(); - entry.insert_or_update_peer(&a_started_peer(i)); + entry.upsert_peer(&a_started_peer(i)); entries.insert((InfoHash::from(&i), entry)); } @@ -143,7 +144,7 @@ fn many_hashed_in_order() -> Entries { for i in 0..408 { let mut entry = EntrySingle::default(); - entry.insert_or_update_peer(&a_started_peer(i)); + entry.upsert_peer(&a_started_peer(i)); let hash: &mut DefaultHasher = &mut DefaultHasher::default(); hash.write_i32(i); @@ -390,7 +391,7 @@ async fn it_should_get_metrics( let mut metrics = TorrentsMetrics::default(); for (_, torrent) in entries { - let stats = torrent.get_stats(); + let stats = torrent.get_swarm_metadata(); metrics.torrents += 1; metrics.incomplete += u64::from(stats.incomplete); @@ -537,10 +538,25 @@ async fn it_should_remove_inactive_peers( // Insert the infohash and peer into the repository // and verify there is an extra torrent entry. { - repo.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + repo.upsert_peer(&info_hash, &peer).await; assert_eq!(repo.get_metrics().await.torrents, entries.len() as u64 + 1); } + // Insert the infohash and peer into the repository + // and verify the swarm metadata was updated. + { + repo.upsert_peer(&info_hash, &peer).await; + let stats = repo.get_swarm_metadata(&info_hash).await; + assert_eq!( + stats, + Some(SwarmMetadata { + downloaded: 0, + complete: 1, + incomplete: 0 + }) + ); + } + // Verify that this new peer was inserted into the repository. { let entry = repo.get(&info_hash).await.expect("it_should_get_some"); diff --git a/src/core/mod.rs b/src/core/mod.rs index 6628426c..83813a86 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -626,10 +626,9 @@ impl Tracker { peer.change_ip(&assign_ip_address_to_peer(remote_client_ip, self.external_ip)); debug!("After: {peer:?}"); - // we should update the torrent and get the stats before we get the peer list. - let stats = self.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + let stats = self.upsert_peer_and_get_stats(info_hash, peer).await; - let peers = self.get_torrent_peers_for_peer(info_hash, peer); + let peers = self.get_peers_for(info_hash, peer); AnnounceData { peers, @@ -660,7 +659,7 @@ impl Tracker { /// It returns the data for a `scrape` response. fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata { match self.torrents.get(info_hash) { - Some(torrent_entry) => torrent_entry.get_stats(), + Some(torrent_entry) => torrent_entry.get_swarm_metadata(), None => SwarmMetadata::default(), } } @@ -681,7 +680,7 @@ impl Tracker { Ok(()) } - fn get_torrent_peers_for_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> Vec> { + fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer) -> Vec> { match self.torrents.get(info_hash) { None => vec![], Some(entry) => entry.get_peers_for_client(&peer.peer_addr, Some(TORRENT_PEERS_LIMIT)), @@ -703,20 +702,36 @@ impl Tracker { /// needed for a `announce` request response. /// /// # Context: Tracker - pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> SwarmMetadata { - // code-review: consider splitting the function in two (command and query segregation). - // `update_torrent_with_peer` and `get_stats` + pub async fn upsert_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> SwarmMetadata { + let swarm_metadata_before = match self.torrents.get_swarm_metadata(info_hash) { + Some(swarm_metadata) => swarm_metadata, + None => SwarmMetadata::zeroed(), + }; - let (stats_updated, stats) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer); + self.torrents.upsert_peer(info_hash, peer); - if self.policy.persistent_torrent_completed_stat && stats_updated { - let completed = stats.downloaded; + let swarm_metadata_after = match self.torrents.get_swarm_metadata(info_hash) { + Some(swarm_metadata) => swarm_metadata, + None => SwarmMetadata::zeroed(), + }; + + if swarm_metadata_before != swarm_metadata_after { + self.persist_stats(info_hash, &swarm_metadata_after).await; + } + + swarm_metadata_after + } + + /// It stores the torrents stats into the database (if persistency is enabled). + /// + /// # Context: Tracker + async fn persist_stats(&self, info_hash: &InfoHash, swarm_metadata: &SwarmMetadata) { + if self.policy.persistent_torrent_completed_stat { + let completed = swarm_metadata.downloaded; let info_hash = *info_hash; drop(self.database.save_persistent_torrent(&info_hash, completed).await); } - - stats } /// It calculates and returns the general `Tracker` @@ -1130,7 +1145,7 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + tracker.upsert_peer_and_get_stats(&info_hash, &peer).await; let peers = tracker.get_torrent_peers(&info_hash); @@ -1144,9 +1159,9 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + tracker.upsert_peer_and_get_stats(&info_hash, &peer).await; - let peers = tracker.get_torrent_peers_for_peer(&info_hash, &peer); + let peers = tracker.get_peers_for(&info_hash, &peer); assert_eq!(peers, vec![]); } @@ -1155,9 +1170,7 @@ mod tests { async fn it_should_return_the_torrent_metrics() { let tracker = public_tracker(); - tracker - .update_torrent_with_peer_and_get_stats(&sample_info_hash(), &leecher()) - .await; + tracker.upsert_peer_and_get_stats(&sample_info_hash(), &leecher()).await; let torrent_metrics = tracker.get_torrents_metrics(); @@ -1178,9 +1191,7 @@ mod tests { let start_time = std::time::Instant::now(); for i in 0..1_000_000 { - tracker - .update_torrent_with_peer_and_get_stats(&gen_seeded_infohash(&i), &leecher()) - .await; + tracker.upsert_peer_and_get_stats(&gen_seeded_infohash(&i), &leecher()).await; } let result_a = start_time.elapsed(); @@ -1704,11 +1715,11 @@ mod tests { let mut peer = sample_peer(); peer.event = AnnounceEvent::Started; - let swarm_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + let swarm_stats = tracker.upsert_peer_and_get_stats(&info_hash, &peer).await; assert_eq!(swarm_stats.downloaded, 0); peer.event = AnnounceEvent::Completed; - let swarm_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + let swarm_stats = tracker.upsert_peer_and_get_stats(&info_hash, &peer).await; assert_eq!(swarm_stats.downloaded, 1); // Remove the newly updated torrent from memory @@ -1719,7 +1730,7 @@ mod tests { let torrent_entry = tracker.torrents.get(&info_hash).expect("it should be able to get entry"); // It persists the number of completed peers. - assert_eq!(torrent_entry.get_stats().downloaded, 1); + assert_eq!(torrent_entry.get_swarm_metadata().downloaded, 1); // It does not persist the peers assert!(torrent_entry.peers_is_empty()); diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index ce44af3a..9cba5de2 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -50,7 +50,7 @@ pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Op let torrent_entry = torrent_entry_option?; - let stats = torrent_entry.get_stats(); + let stats = torrent_entry.get_swarm_metadata(); let peers = torrent_entry.get_peers(None); @@ -70,7 +70,7 @@ pub async fn get_torrents_page(tracker: Arc, pagination: Option<&Pagina let mut basic_infos: Vec = vec![]; for (info_hash, torrent_entry) in tracker.torrents.get_paginated(pagination) { - let stats = torrent_entry.get_stats(); + let stats = torrent_entry.get_swarm_metadata(); basic_infos.push(BasicInfo { info_hash, @@ -88,7 +88,7 @@ pub async fn get_torrents(tracker: Arc, info_hashes: &[InfoHash]) -> Ve let mut basic_infos: Vec = vec![]; for info_hash in info_hashes { - if let Some(stats) = tracker.torrents.get(info_hash).map(|t| t.get_stats()) { + if let Some(stats) = tracker.torrents.get(info_hash).map(|t| t.get_swarm_metadata()) { basic_infos.push(BasicInfo { info_hash: *info_hash, seeders: u64::from(stats.complete), @@ -156,9 +156,7 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash, &sample_peer()).await; let torrent_info = get_torrent_info(tracker.clone(), &info_hash).await.unwrap(); @@ -208,9 +206,7 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash, &sample_peer()).await; let torrents = get_torrents_page(tracker.clone(), Some(&Pagination::default())).await; @@ -234,12 +230,8 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash1, &sample_peer()).await; + tracker.upsert_peer_and_get_stats(&info_hash2, &sample_peer()).await; let offset = 0; let limit = 1; @@ -258,12 +250,8 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash1, &sample_peer()).await; + tracker.upsert_peer_and_get_stats(&info_hash2, &sample_peer()).await; let offset = 1; let limit = 4000; @@ -288,15 +276,11 @@ mod tests { let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash1 = InfoHash::from_str(&hash1).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash1, &sample_peer()).await; let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash2, &sample_peer()).await; let torrents = get_torrents_page(tracker.clone(), Some(&Pagination::default())).await; diff --git a/src/servers/udp/handlers.rs b/src/servers/udp/handlers.rs index 2d5038ec..122e666a 100644 --- a/src/servers/udp/handlers.rs +++ b/src/servers/udp/handlers.rs @@ -718,9 +718,7 @@ mod tests { .with_peer_address(SocketAddr::new(IpAddr::V6(client_ip_v6), client_port)) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv6) - .await; + tracker.upsert_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv6).await; } async fn announce_a_new_peer_using_ipv4(tracker: Arc) -> Response { @@ -944,9 +942,7 @@ mod tests { .with_peer_address(SocketAddr::new(IpAddr::V4(client_ip_v4), client_port)) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv4) - .await; + tracker.upsert_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv4).await; } async fn announce_a_new_peer_using_ipv6(tracker: Arc) -> Response { @@ -1119,9 +1115,7 @@ mod tests { .with_number_of_bytes_left(0) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer) - .await; + tracker.upsert_peer_and_get_stats(&info_hash.0.into(), &peer).await; } fn build_scrape_request(remote_addr: &SocketAddr, info_hash: &InfoHash) -> ScrapeRequest { diff --git a/tests/servers/api/environment.rs b/tests/servers/api/environment.rs index 8d91f3ae..dec4ccff 100644 --- a/tests/servers/api/environment.rs +++ b/tests/servers/api/environment.rs @@ -23,7 +23,7 @@ pub struct Environment { impl Environment { /// Add a torrent to the tracker pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.upsert_peer_and_get_stats(info_hash, peer).await; } } diff --git a/tests/servers/http/environment.rs b/tests/servers/http/environment.rs index 5638713a..f00da293 100644 --- a/tests/servers/http/environment.rs +++ b/tests/servers/http/environment.rs @@ -20,7 +20,7 @@ pub struct Environment { impl Environment { /// Add a torrent to the tracker pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.upsert_peer_and_get_stats(info_hash, peer).await; } } diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index 12f4aeb9..6ced1dbb 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -20,7 +20,7 @@ impl Environment { /// Add a torrent to the tracker #[allow(dead_code)] pub async fn add_torrent(&self, info_hash: &InfoHash, peer: &peer::Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.upsert_peer_and_get_stats(info_hash, peer).await; } }