diff --git a/packages/primitives/src/info_hash.rs b/packages/primitives/src/info_hash.rs index 1275e7d52..a07cc41a2 100644 --- a/packages/primitives/src/info_hash.rs +++ b/packages/primitives/src/info_hash.rs @@ -89,6 +89,13 @@ impl std::convert::From<&DefaultHasher> for InfoHash { } } +impl std::convert::From<&i32> for InfoHash { + fn from(n: &i32) -> InfoHash { + let n = n.to_le_bytes(); + InfoHash([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, n[0], n[1], n[2], n[3]]) + } +} + impl std::convert::From<[u8; 20]> for InfoHash { fn from(val: [u8; 20]) -> Self { InfoHash(val) diff --git a/packages/primitives/src/pagination.rs b/packages/primitives/src/pagination.rs index ab7dcfe2b..96b5ad662 100644 --- a/packages/primitives/src/pagination.rs +++ b/packages/primitives/src/pagination.rs @@ -1,7 +1,8 @@ +use derive_more::Constructor; use serde::Deserialize; /// A struct to keep information about the page when results are being paginated -#[derive(Deserialize, Copy, Clone, Debug, PartialEq)] +#[derive(Deserialize, Copy, Clone, Debug, PartialEq, Constructor)] pub struct Pagination { /// The page number, starting at 0 pub offset: u32, @@ -10,11 +11,6 @@ pub struct Pagination { } impl Pagination { - #[must_use] - pub fn new(offset: u32, limit: u32) -> Self { - Self { offset, limit } - } - #[must_use] pub fn new_with_options(offset_option: Option, limit_option: Option) -> Self { let offset = match offset_option { diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index 998a8b4b0..4c39af829 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -65,7 +65,7 @@ pub trait EntrySync { #[allow(clippy::module_name_repetitions)] pub trait EntryAsync { fn get_stats(&self) -> impl std::future::Future + Send; - fn is_good(&self, policy: &TrackerPolicy) -> impl std::future::Future + Send; + fn check_good(self, policy: &TrackerPolicy) -> impl std::future::Future + Send; fn peers_is_empty(&self) -> impl std::future::Future + Send; fn get_peers_len(&self) -> impl std::future::Future + Send; fn get_peers(&self, limit: Option) -> impl std::future::Future>> + Send; diff --git a/packages/torrent-repository/src/entry/mutex_tokio.rs b/packages/torrent-repository/src/entry/mutex_tokio.rs index b9717a467..34f4a4e92 100644 --- a/packages/torrent-repository/src/entry/mutex_tokio.rs +++ b/packages/torrent-repository/src/entry/mutex_tokio.rs @@ -13,7 +13,7 @@ impl EntryAsync for EntryMutexTokio { self.lock().await.get_stats() } - async fn is_good(&self, policy: &TrackerPolicy) -> bool { + async fn check_good(self, policy: &TrackerPolicy) -> bool { self.lock().await.is_good(policy) } diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs index d039be240..5394abb6a 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::iter::zip; use std::pin::Pin; use std::sync::Arc; @@ -124,8 +125,27 @@ where } async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + let handles: Vec> + Send>>>; + + { + let db = self.get_torrents(); + + handles = zip(db.keys().copied(), db.values().cloned()) + .map(|(infohash, torrent)| { + torrent + .check_good(policy) + .map(move |good| if good { None } else { Some(infohash) }) + .boxed() + }) + .collect::>(); + } + + let not_good = join_all(handles).await; + let mut db = self.get_torrents_mut(); - db.retain(|_, e| e.blocking_lock().is_good(policy)); + for remove in not_good.into_iter().flatten() { + drop(db.remove(&remove)); + } } } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs index 7f0394179..bc7fd61e8 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs @@ -119,6 +119,16 @@ where async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { let mut db = self.get_torrents_mut().await; - db.retain(|_, e| e.blocking_lock().is_good(policy)); + let mut not_good = Vec::::default(); + + for (&infohash, torrent) in db.iter() { + if !torrent.clone().check_good(policy).await { + not_good.push(infohash); + } + } + + for remove in not_good { + drop(db.remove(&remove)); + } } } diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 3f1b4a78c..3a4b53d2f 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -20,7 +20,6 @@ pub(crate) enum Repo { TokioMutexTokio(TorrentsRwLockTokioMutexTokio), } -#[allow(dead_code)] impl Repo { pub(crate) async fn get(&self, key: &InfoHash) -> Option { match self { diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs index 83371f413..33264c443 100644 --- a/packages/torrent-repository/tests/common/torrent.rs +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -27,7 +27,7 @@ impl Torrent { match self { Torrent::Single(entry) => entry.is_good(policy), Torrent::MutexStd(entry) => entry.is_good(policy), - Torrent::MutexTokio(entry) => entry.clone().is_good(policy).await, + Torrent::MutexTokio(entry) => entry.clone().check_good(policy).await, } } diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index f6d4bdfb6..7ffe17dd7 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -1,8 +1,11 @@ +use std::collections::{BTreeMap, HashSet}; use std::hash::{DefaultHasher, Hash, Hasher}; use rstest::{fixture, rstest}; +use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::announce_event::AnnounceEvent; use torrust_tracker_primitives::info_hash::InfoHash; +use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents}; use torrust_tracker_torrent_repository::entry::Entry as _; use torrust_tracker_torrent_repository::repository::{RwLockStd, RwLockTokio}; @@ -104,6 +107,39 @@ fn three() -> Entries { ] } +#[fixture] +fn many_out_of_order() -> Entries { + let mut entries: HashSet<(InfoHash, EntrySingle)> = HashSet::default(); + + for i in 0..408 { + let mut entry = EntrySingle::default(); + entry.insert_or_update_peer(&a_started_peer(i)); + + entries.insert((InfoHash::from(&i), entry)); + } + + // we keep the random order from the hashed set for the vector. + entries.iter().map(|(i, e)| (*i, e.clone())).collect() +} + +#[fixture] +fn many_hashed_in_order() -> Entries { + let mut entries: BTreeMap = BTreeMap::default(); + + for i in 0..408 { + let mut entry = EntrySingle::default(); + entry.insert_or_update_peer(&a_started_peer(i)); + + let hash: &mut DefaultHasher = &mut DefaultHasher::default(); + hash.write_i32(i); + + entries.insert(InfoHash::from(&hash.clone()), entry); + } + + // We return the entries in-order from from the b-tree map. + entries.iter().map(|(i, e)| (*i, e.clone())).collect() +} + #[fixture] fn persistent_empty() -> PersistentTorrents { PersistentTorrents::default() @@ -141,6 +177,41 @@ async fn make(repo: &Repo, entries: &Entries) { } } +#[fixture] +fn paginated_limit_zero() -> Pagination { + Pagination::new(0, 0) +} + +#[fixture] +fn paginated_limit_one() -> Pagination { + Pagination::new(0, 1) +} + +#[fixture] +fn paginated_limit_one_offset_one() -> Pagination { + Pagination::new(1, 1) +} + +#[fixture] +fn policy_none() -> TrackerPolicy { + TrackerPolicy::new(false, 0, false) +} + +#[fixture] +fn policy_persist() -> TrackerPolicy { + TrackerPolicy::new(false, 0, true) +} + +#[fixture] +fn policy_remove() -> TrackerPolicy { + TrackerPolicy::new(true, 0, false) +} + +#[fixture] +fn policy_remove_persist() -> TrackerPolicy { + TrackerPolicy::new(true, 0, true) +} + #[rstest] #[case::empty(empty())] #[case::default(default())] @@ -148,6 +219,8 @@ async fn make(repo: &Repo, entries: &Entries) { #[case::completed(completed())] #[case::downloaded(downloaded())] #[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_a_torrent_entry( #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, @@ -169,17 +242,77 @@ async fn it_should_get_a_torrent_entry( #[case::completed(completed())] #[case::downloaded(downloaded())] #[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] #[tokio::test] -async fn it_should_get_entries( +async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, #[case] entries: Entries, + many_out_of_order: Entries, ) { make(&repo, &entries).await; - if entries.first().is_some() { - assert!(entries.contains(repo.get_paginated(None).await.first().expect("it should have at least one"))); - } else { - assert!(repo.get_paginated(None).await.is_empty()); + let entries_a = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::>(); + + make(&repo, &many_out_of_order).await; + + let entries_b = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::>(); + + let is_equal = entries_b.iter().take(entries_a.len()).copied().collect::>() == entries_a; + + let is_sorted = entries_b.windows(2).all(|w| w[0] <= w[1]); + + assert!( + is_equal || is_sorted, + "The order is unstable: {is_equal}, or is sorted {is_sorted}." + ); +} + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_get_paginated( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, + #[values(paginated_limit_zero(), paginated_limit_one(), paginated_limit_one_offset_one())] paginated: Pagination, +) { + make(&repo, &entries).await; + + let mut info_hashes = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::>(); + info_hashes.sort(); + + match paginated { + // it should return empty if limit is zero. + Pagination { limit: 0, .. } => assert_eq!(repo.get_paginated(Some(&paginated)).await, vec![]), + + // it should return a single entry if the limit is one. + Pagination { limit: 1, offset: 0 } => { + if info_hashes.is_empty() { + assert_eq!(repo.get_paginated(Some(&paginated)).await.len(), 0); + } else { + let page = repo.get_paginated(Some(&paginated)).await; + assert_eq!(page.len(), 1); + assert_eq!(page.first().map(|(i, _)| i), info_hashes.first()); + } + } + + // it should return the only the second entry if both the limit and the offset are one. + Pagination { limit: 1, offset: 1 } => { + if info_hashes.len() > 1 { + let page = repo.get_paginated(Some(&paginated)).await; + assert_eq!(page.len(), 1); + assert_eq!(page[0].0, info_hashes[1]); + } + } + // the other cases are not yet tested. + _ => {} } } @@ -190,6 +323,8 @@ async fn it_should_get_entries( #[case::completed(completed())] #[case::downloaded(downloaded())] #[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_get_metrics( #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, @@ -220,6 +355,8 @@ async fn it_should_get_metrics( #[case::completed(completed())] #[case::downloaded(downloaded())] #[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_import_persistent_torrents( #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, @@ -247,6 +384,8 @@ async fn it_should_import_persistent_torrents( #[case::completed(completed())] #[case::downloaded(downloaded())] #[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_an_entry( #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, @@ -272,6 +411,8 @@ async fn it_should_remove_an_entry( #[case::completed(completed())] #[case::downloaded(downloaded())] #[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] #[tokio::test] async fn it_should_remove_inactive_peers( #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, @@ -335,3 +476,29 @@ async fn it_should_remove_inactive_peers( assert!(!entry.get_peers(None).contains(&peer.into())); } } + +#[rstest] +#[case::empty(empty())] +#[case::default(default())] +#[case::started(started())] +#[case::completed(completed())] +#[case::downloaded(downloaded())] +#[case::three(three())] +#[case::out_of_order(many_out_of_order())] +#[case::in_order(many_hashed_in_order())] +#[tokio::test] +async fn it_should_remove_peerless_torrents( + #[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo, + #[case] entries: Entries, + #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, +) { + make(&repo, &entries).await; + + repo.remove_peerless_torrents(&policy).await; + + let torrents = repo.get_paginated(None).await; + + for (_, entry) in torrents { + assert!(entry.is_good(&policy)); + } +}