Skip to content

Commit

Permalink
dev: add some torrent repo tests
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Mar 20, 2024
1 parent b0d19b6 commit 29b6cb8
Show file tree
Hide file tree
Showing 21 changed files with 510 additions and 135 deletions.
2 changes: 1 addition & 1 deletion packages/primitives/src/announce_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};

/// Announce events. Described on the
/// [BEP 3. The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html)
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Serialize, Deserialize)]
#[derive(Hash, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum AnnounceEvent {
/// The peer has started downloading the torrent.
Started,
Expand Down
2 changes: 1 addition & 1 deletion packages/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub enum IPVersion {
}

/// Number of bytes downloaded, uploaded or pending to download (left) by the peer.
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Serialize, Deserialize)]
#[derive(Hash, Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct NumberOfBytes(pub i64);

/// The database management system used by the tracker.
Expand Down
2 changes: 1 addition & 1 deletion packages/primitives/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{ser_unix_time_value, DurationSinceUnixEpoch, IPVersion, NumberOfByte
/// event: AnnounceEvent::Started,
/// };
/// ```
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Copy)]
#[derive(Debug, Clone, Serialize, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Peer {
/// ID used by the downloader peer
pub peer_id: Id,
Expand Down
12 changes: 6 additions & 6 deletions packages/primitives/src/torrent_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ use std::ops::AddAssign;
#[derive(Copy, Clone, Debug, PartialEq, Default)]
pub struct TorrentsMetrics {
/// Total number of seeders for all torrents
pub seeders: u64,
pub complete: u64,
/// Total number of peers that have ever completed downloading for all torrents.
pub completed: u64,
pub downloaded: u64,
/// Total number of leechers for all torrents.
pub leechers: u64,
pub incomplete: u64,
/// Total number of torrents.
pub torrents: u64,
}

impl AddAssign for TorrentsMetrics {
fn add_assign(&mut self, rhs: Self) {
self.seeders += rhs.seeders;
self.completed += rhs.completed;
self.leechers += rhs.leechers;
self.complete += rhs.complete;
self.downloaded += rhs.downloaded;
self.incomplete += rhs.incomplete;
self.torrents += rhs.torrents;
}
}
2 changes: 1 addition & 1 deletion packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub trait EntryAsync {
/// This is the tracker entry for a given torrent and contains the swarm data,
/// that's the list of all the peers trying to download the same torrent.
/// The tracker keeps one entry like this for every torrent.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct Torrent {
/// The swarm: a network of peers that are all trying to download the torrent associated to this entry
// #[serde(skip)]
Expand Down
8 changes: 7 additions & 1 deletion packages/torrent-repository/src/entry/mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntrySync};
use crate::EntryMutexStd;
use crate::{EntryMutexStd, EntrySingle};

impl EntrySync for EntryMutexStd {
fn get_stats(&self) -> SwarmMetadata {
Expand Down Expand Up @@ -48,3 +48,9 @@ impl EntrySync for EntryMutexStd {
.remove_inactive_peers(current_cutoff);
}
}

impl From<EntrySingle> for EntryMutexStd {
fn from(entry: EntrySingle) -> Self {
Arc::new(std::sync::Mutex::new(entry))
}
}
8 changes: 7 additions & 1 deletion packages/torrent-repository/src/entry/mutex_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntryAsync};
use crate::EntryMutexTokio;
use crate::{EntryMutexTokio, EntrySingle};

impl EntryAsync for EntryMutexTokio {
async fn get_stats(&self) -> SwarmMetadata {
Expand Down Expand Up @@ -44,3 +44,9 @@ impl EntryAsync for EntryMutexTokio {
self.lock().await.remove_inactive_peers(current_cutoff);
}
}

impl From<EntrySingle> for EntryMutexTokio {
fn from(entry: EntrySingle) -> Self {
Arc::new(tokio::sync::Mutex::new(entry))
}
}
38 changes: 32 additions & 6 deletions packages/torrent-repository/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ pub mod rw_lock_tokio;
pub mod rw_lock_tokio_mutex_std;
pub mod rw_lock_tokio_mutex_tokio;

pub trait Repository<T>: Default + 'static {
use std::fmt::Debug;

pub trait Repository<T>: Debug + Default + Sized + 'static {
fn get(&self, key: &InfoHash) -> Option<T>;
fn get_metrics(&self) -> TorrentsMetrics;
fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, T)>;
Expand All @@ -24,7 +26,7 @@ pub trait Repository<T>: Default + 'static {
}

#[allow(clippy::module_name_repetitions)]
pub trait RepositoryAsync<T>: Default + 'static {
pub trait RepositoryAsync<T>: Debug + Default + Sized + 'static {
fn get(&self, key: &InfoHash) -> impl std::future::Future<Output = Option<T>> + Send;
fn get_metrics(&self) -> impl std::future::Future<Output = TorrentsMetrics> + Send;
fn get_paginated(&self, pagination: Option<&Pagination>) -> impl std::future::Future<Output = Vec<(InfoHash, T)>> + Send;
Expand All @@ -39,12 +41,36 @@ pub trait RepositoryAsync<T>: Default + 'static {
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + Send;
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct RwLockStd<T> {
torrents: std::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

#[derive(Default, Debug)]
pub struct RwLockTokio<T> {
torrents: tokio::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

#[derive(Default)]
pub struct RwLockStd<T> {
torrents: std::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
impl<T> RwLockStd<T> {
/// # Panics
///
/// Panics if unable to get a lock.
pub fn write(
&self,
) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>> {
self.torrents.write().expect("it should get lock")
}
}

impl<T> RwLockTokio<T> {
pub fn write(
&self,
) -> impl std::future::Future<
Output = tokio::sync::RwLockWriteGuard<
'_,
std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>,
>,
> {
self.torrents.write()
}
}
6 changes: 3 additions & 3 deletions packages/torrent-repository/src/repository/rw_lock_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ where

for entry in self.get_torrents().values() {
let stats = entry.get_stats();
metrics.seeders += u64::from(stats.complete);
metrics.completed += u64::from(stats.downloaded);
metrics.leechers += u64::from(stats.incomplete);
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ where

for entry in self.get_torrents().values() {
let stats = entry.lock().expect("it should get a lock").get_stats();
metrics.seeders += u64::from(stats.complete);
metrics.completed += u64::from(stats.downloaded);
metrics.leechers += u64::from(stats.incomplete);
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ where

for entry in entries {
let stats = entry.lock().await.get_stats();
metrics.seeders += u64::from(stats.complete);
metrics.completed += u64::from(stats.downloaded);
metrics.leechers += u64::from(stats.incomplete);
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

Expand Down
6 changes: 3 additions & 3 deletions packages/torrent-repository/src/repository/rw_lock_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ where

for entry in self.get_torrents().await.values() {
let stats = entry.get_stats();
metrics.seeders += u64::from(stats.complete);
metrics.completed += u64::from(stats.downloaded);
metrics.leechers += u64::from(stats.incomplete);
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ where

for entry in self.get_torrents().await.values() {
let stats = entry.get_stats();
metrics.seeders += u64::from(stats.complete);
metrics.completed += u64::from(stats.downloaded);
metrics.leechers += u64::from(stats.incomplete);
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ where

for entry in self.get_torrents().await.values() {
let stats = entry.get_stats().await;
metrics.seeders += u64::from(stats.complete);
metrics.completed += u64::from(stats.downloaded);
metrics.leechers += u64::from(stats.incomplete);
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

Expand Down
2 changes: 2 additions & 0 deletions packages/torrent-repository/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub mod torrent;
pub mod torrent_peer_builder;
pub mod torrents;
88 changes: 88 additions & 0 deletions packages/torrent-repository/tests/common/torrent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _};
use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle};

#[derive(Debug)]
pub(crate) enum Torrent {
Single(EntrySingle),
MutexStd(EntryMutexStd),
MutexTokio(EntryMutexTokio),
}

impl Torrent {
pub(crate) async fn get_stats(&self) -> SwarmMetadata {
match self {
Torrent::Single(entry) => entry.get_stats(),
Torrent::MutexStd(entry) => entry.get_stats(),
Torrent::MutexTokio(entry) => entry.clone().get_stats().await,
}
}

pub(crate) async fn is_good(&self, policy: &TrackerPolicy) -> bool {
match self {
Torrent::Single(entry) => entry.is_good(policy),
Torrent::MutexStd(entry) => entry.is_good(policy),
Torrent::MutexTokio(entry) => entry.clone().is_good(policy).await,
}
}

pub(crate) async fn peers_is_empty(&self) -> bool {
match self {
Torrent::Single(entry) => entry.peers_is_empty(),
Torrent::MutexStd(entry) => entry.peers_is_empty(),
Torrent::MutexTokio(entry) => entry.clone().peers_is_empty().await,
}
}

pub(crate) async fn get_peers_len(&self) -> usize {
match self {
Torrent::Single(entry) => entry.get_peers_len(),
Torrent::MutexStd(entry) => entry.get_peers_len(),
Torrent::MutexTokio(entry) => entry.clone().get_peers_len().await,
}
}

pub(crate) async fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
match self {
Torrent::Single(entry) => entry.get_peers(limit),
Torrent::MutexStd(entry) => entry.get_peers(limit),
Torrent::MutexTokio(entry) => entry.clone().get_peers(limit).await,
}
}

pub(crate) async fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
match self {
Torrent::Single(entry) => entry.get_peers_for_peer(client, limit),
Torrent::MutexStd(entry) => entry.get_peers_for_peer(client, limit),
Torrent::MutexTokio(entry) => entry.clone().get_peers_for_peer(client, limit).await,
}
}

pub(crate) async fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool {
match self {
Torrent::Single(entry) => entry.insert_or_update_peer(peer),
Torrent::MutexStd(entry) => entry.insert_or_update_peer(peer),
Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer(peer).await,
}
}

pub(crate) async fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
match self {
Torrent::Single(entry) => entry.insert_or_update_peer_and_get_stats(peer),
Torrent::MutexStd(entry) => entry.insert_or_update_peer_and_get_stats(peer),
Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer_and_get_stats(peer).await,
}
}

pub(crate) async fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) {
match self {
Torrent::Single(entry) => entry.remove_inactive_peers(current_cutoff),
Torrent::MutexStd(entry) => entry.remove_inactive_peers(current_cutoff),
Torrent::MutexTokio(entry) => entry.clone().remove_inactive_peers(current_cutoff).await,
}
}
}
Loading

0 comments on commit 29b6cb8

Please sign in to comment.