Skip to content

Commit

Permalink
dev: finish repo tests
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Mar 25, 2024
1 parent 1b36562 commit d81b693
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 17 deletions.
7 changes: 7 additions & 0 deletions packages/primitives/src/info_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ impl std::convert::From<&DefaultHasher> for InfoHash {
}
}

impl std::convert::From<&i32> for InfoHash {
fn from(n: &i32) -> InfoHash {
let n = n.to_le_bytes();
InfoHash([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, n[0], n[1], n[2], n[3]])
}
}

impl std::convert::From<[u8; 20]> for InfoHash {
fn from(val: [u8; 20]) -> Self {
InfoHash(val)
Expand Down
8 changes: 2 additions & 6 deletions packages/primitives/src/pagination.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use derive_more::Constructor;
use serde::Deserialize;

/// A struct to keep information about the page when results are being paginated
#[derive(Deserialize, Copy, Clone, Debug, PartialEq)]
#[derive(Deserialize, Copy, Clone, Debug, PartialEq, Constructor)]
pub struct Pagination {
/// The page number, starting at 0
pub offset: u32,
Expand All @@ -10,11 +11,6 @@ pub struct Pagination {
}

impl Pagination {
#[must_use]
pub fn new(offset: u32, limit: u32) -> Self {
Self { offset, limit }
}

#[must_use]
pub fn new_with_options(offset_option: Option<u32>, limit_option: Option<u32>) -> Self {
let offset = match offset_option {
Expand Down
2 changes: 1 addition & 1 deletion packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub trait EntrySync {
#[allow(clippy::module_name_repetitions)]
pub trait EntryAsync {
fn get_stats(&self) -> impl std::future::Future<Output = SwarmMetadata> + Send;
fn is_good(&self, policy: &TrackerPolicy) -> impl std::future::Future<Output = bool> + Send;
fn check_good(self, policy: &TrackerPolicy) -> impl std::future::Future<Output = bool> + Send;
fn peers_is_empty(&self) -> impl std::future::Future<Output = bool> + Send;
fn get_peers_len(&self) -> impl std::future::Future<Output = usize> + Send;
fn get_peers(&self, limit: Option<usize>) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
Expand Down
2 changes: 1 addition & 1 deletion packages/torrent-repository/src/entry/mutex_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl EntryAsync for EntryMutexTokio {
self.lock().await.get_stats()
}

async fn is_good(&self, policy: &TrackerPolicy) -> bool {
async fn check_good(self, policy: &TrackerPolicy) -> bool {
self.lock().await.is_good(policy)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::iter::zip;
use std::pin::Pin;
use std::sync::Arc;

Expand Down Expand Up @@ -124,8 +125,27 @@ where
}

async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
let handles: Vec<Pin<Box<dyn Future<Output = Option<InfoHash>> + Send>>>;

{
let db = self.get_torrents();

handles = zip(db.keys().copied(), db.values().cloned())
.map(|(infohash, torrent)| {
torrent
.check_good(policy)
.map(move |good| if good { None } else { Some(infohash) })
.boxed()
})
.collect::<Vec<_>>();
}

let not_good = join_all(handles).await;

let mut db = self.get_torrents_mut();

db.retain(|_, e| e.blocking_lock().is_good(policy));
for remove in not_good.into_iter().flatten() {
drop(db.remove(&remove));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ where
async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
let mut db = self.get_torrents_mut().await;

db.retain(|_, e| e.blocking_lock().is_good(policy));
let mut not_good = Vec::<InfoHash>::default();

for (&infohash, torrent) in db.iter() {
if !torrent.clone().check_good(policy).await {
not_good.push(infohash);
}
}

for remove in not_good {
drop(db.remove(&remove));
}
}
}
1 change: 0 additions & 1 deletion packages/torrent-repository/tests/common/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub(crate) enum Repo {
TokioMutexTokio(TorrentsRwLockTokioMutexTokio),
}

#[allow(dead_code)]
impl Repo {
pub(crate) async fn get(&self, key: &InfoHash) -> Option<EntrySingle> {
match self {
Expand Down
2 changes: 1 addition & 1 deletion packages/torrent-repository/tests/common/torrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Torrent {
match self {
Torrent::Single(entry) => entry.is_good(policy),
Torrent::MutexStd(entry) => entry.is_good(policy),
Torrent::MutexTokio(entry) => entry.clone().is_good(policy).await,
Torrent::MutexTokio(entry) => entry.clone().check_good(policy).await,
}
}

Expand Down
177 changes: 172 additions & 5 deletions packages/torrent-repository/tests/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::{BTreeMap, HashSet};
use std::hash::{DefaultHasher, Hash, Hasher};

use rstest::{fixture, rstest};
use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::announce_event::AnnounceEvent;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::pagination::Pagination;
use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents};
use torrust_tracker_torrent_repository::entry::Entry as _;
use torrust_tracker_torrent_repository::repository::{RwLockStd, RwLockTokio};
Expand Down Expand Up @@ -104,6 +107,39 @@ fn three() -> Entries {
]
}

#[fixture]
fn many_out_of_order() -> Entries {
let mut entries: HashSet<(InfoHash, EntrySingle)> = HashSet::default();

for i in 0..408 {
let mut entry = EntrySingle::default();
entry.insert_or_update_peer(&a_started_peer(i));

entries.insert((InfoHash::from(&i), entry));
}

// we keep the random order from the hashed set for the vector.
entries.iter().map(|(i, e)| (*i, e.clone())).collect()
}

#[fixture]
fn many_hashed_in_order() -> Entries {
let mut entries: BTreeMap<InfoHash, EntrySingle> = BTreeMap::default();

for i in 0..408 {
let mut entry = EntrySingle::default();
entry.insert_or_update_peer(&a_started_peer(i));

let hash: &mut DefaultHasher = &mut DefaultHasher::default();
hash.write_i32(i);

entries.insert(InfoHash::from(&hash.clone()), entry);
}

// We return the entries in-order from from the b-tree map.
entries.iter().map(|(i, e)| (*i, e.clone())).collect()
}

#[fixture]
fn persistent_empty() -> PersistentTorrents {
PersistentTorrents::default()
Expand Down Expand Up @@ -141,13 +177,50 @@ async fn make(repo: &Repo, entries: &Entries) {
}
}

#[fixture]
fn paginated_limit_zero() -> Pagination {
Pagination::new(0, 0)
}

#[fixture]
fn paginated_limit_one() -> Pagination {
Pagination::new(0, 1)
}

#[fixture]
fn paginated_limit_one_offset_one() -> Pagination {
Pagination::new(1, 1)
}

#[fixture]
fn policy_none() -> TrackerPolicy {
TrackerPolicy::new(false, 0, false)
}

#[fixture]
fn policy_persist() -> TrackerPolicy {
TrackerPolicy::new(false, 0, true)
}

#[fixture]
fn policy_remove() -> TrackerPolicy {
TrackerPolicy::new(true, 0, false)
}

#[fixture]
fn policy_remove_persist() -> TrackerPolicy {
TrackerPolicy::new(true, 0, true)
}

#[rstest]
#[case::empty(empty())]
#[case::default(default())]
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[case::out_of_order(many_out_of_order())]
#[case::in_order(many_hashed_in_order())]
#[tokio::test]
async fn it_should_get_a_torrent_entry(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
Expand All @@ -169,17 +242,77 @@ async fn it_should_get_a_torrent_entry(
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[case::out_of_order(many_out_of_order())]
#[case::in_order(many_hashed_in_order())]
#[tokio::test]
async fn it_should_get_entries(
async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
#[case] entries: Entries,
many_out_of_order: Entries,
) {
make(&repo, &entries).await;

if entries.first().is_some() {
assert!(entries.contains(repo.get_paginated(None).await.first().expect("it should have at least one")));
} else {
assert!(repo.get_paginated(None).await.is_empty());
let entries_a = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::<Vec<_>>();

make(&repo, &many_out_of_order).await;

let entries_b = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::<Vec<_>>();

let is_equal = entries_b.iter().take(entries_a.len()).copied().collect::<Vec<_>>() == entries_a;

let is_sorted = entries_b.windows(2).all(|w| w[0] <= w[1]);

assert!(
is_equal || is_sorted,
"The order is unstable: {is_equal}, or is sorted {is_sorted}."
);
}

#[rstest]
#[case::empty(empty())]
#[case::default(default())]
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[case::out_of_order(many_out_of_order())]
#[case::in_order(many_hashed_in_order())]
#[tokio::test]
async fn it_should_get_paginated(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
#[case] entries: Entries,
#[values(paginated_limit_zero(), paginated_limit_one(), paginated_limit_one_offset_one())] paginated: Pagination,
) {
make(&repo, &entries).await;

let mut info_hashes = repo.get_paginated(None).await.iter().map(|(i, _)| *i).collect::<Vec<_>>();
info_hashes.sort();

match paginated {
// it should return empty if limit is zero.
Pagination { limit: 0, .. } => assert_eq!(repo.get_paginated(Some(&paginated)).await, vec![]),

// it should return a single entry if the limit is one.
Pagination { limit: 1, offset: 0 } => {
if info_hashes.is_empty() {
assert_eq!(repo.get_paginated(Some(&paginated)).await.len(), 0);
} else {
let page = repo.get_paginated(Some(&paginated)).await;
assert_eq!(page.len(), 1);
assert_eq!(page.first().map(|(i, _)| i), info_hashes.first());
}
}

// it should return the only the second entry if both the limit and the offset are one.
Pagination { limit: 1, offset: 1 } => {
if info_hashes.len() > 1 {
let page = repo.get_paginated(Some(&paginated)).await;
assert_eq!(page.len(), 1);
assert_eq!(page[0].0, info_hashes[1]);
}
}
// the other cases are not yet tested.
_ => {}
}
}

Expand All @@ -190,6 +323,8 @@ async fn it_should_get_entries(
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[case::out_of_order(many_out_of_order())]
#[case::in_order(many_hashed_in_order())]
#[tokio::test]
async fn it_should_get_metrics(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
Expand Down Expand Up @@ -220,6 +355,8 @@ async fn it_should_get_metrics(
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[case::out_of_order(many_out_of_order())]
#[case::in_order(many_hashed_in_order())]
#[tokio::test]
async fn it_should_import_persistent_torrents(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
Expand Down Expand Up @@ -247,6 +384,8 @@ async fn it_should_import_persistent_torrents(
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[case::out_of_order(many_out_of_order())]
#[case::in_order(many_hashed_in_order())]
#[tokio::test]
async fn it_should_remove_an_entry(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
Expand All @@ -272,6 +411,8 @@ async fn it_should_remove_an_entry(
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[case::out_of_order(many_out_of_order())]
#[case::in_order(many_hashed_in_order())]
#[tokio::test]
async fn it_should_remove_inactive_peers(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
Expand Down Expand Up @@ -335,3 +476,29 @@ async fn it_should_remove_inactive_peers(
assert!(!entry.get_peers(None).contains(&peer.into()));
}
}

#[rstest]
#[case::empty(empty())]
#[case::default(default())]
#[case::started(started())]
#[case::completed(completed())]
#[case::downloaded(downloaded())]
#[case::three(three())]
#[case::out_of_order(many_out_of_order())]
#[case::in_order(many_hashed_in_order())]
#[tokio::test]
async fn it_should_remove_peerless_torrents(
#[values(standard(), standard_mutex(), standard_tokio(), tokio_std(), tokio_mutex(), tokio_tokio())] repo: Repo,
#[case] entries: Entries,
#[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy,
) {
make(&repo, &entries).await;

repo.remove_peerless_torrents(&policy).await;

let torrents = repo.get_paginated(None).await;

for (_, entry) in torrents {
assert!(entry.is_good(&policy));
}
}

0 comments on commit d81b693

Please sign in to comment.