Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;

//use serde::{Deserialize, Serialize};
use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
Expand Down Expand Up @@ -80,10 +79,10 @@ pub trait EntryAsync {
/// that's the list of all the peers trying to download the same torrent.
/// The tracker keeps one entry like this for every torrent.
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Torrent {
pub struct Torrent<T> {
/// The swarm: a network of peers that are all trying to download the torrent associated to this entry
// #[serde(skip)]
pub(crate) peers: std::collections::BTreeMap<peer::Id, Arc<peer::Peer>>,
pub(crate) peers: T,
/// The number of peers that have ever completed downloading the torrent associated to this entry
pub(crate) downloaded: u32,
}
44 changes: 40 additions & 4 deletions packages/torrent-repository/src/entry/mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};
use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, SkipMapPeerList};

impl EntrySync for EntryMutexStd {
impl EntrySync for EntryMutexStd<BTreeMapPeerList> {
fn get_swarm_metadata(&self) -> SwarmMetadata {
self.lock().expect("it should get a lock").get_swarm_metadata()
}
Expand Down Expand Up @@ -44,8 +44,44 @@ impl EntrySync for EntryMutexStd {
}
}

impl From<EntrySingle> for EntryMutexStd {
fn from(entry: EntrySingle) -> Self {
impl EntrySync for EntryMutexStd<SkipMapPeerList> {
fn get_swarm_metadata(&self) -> SwarmMetadata {
self.lock().expect("it should get a lock").get_swarm_metadata()
}

fn is_good(&self, policy: &TrackerPolicy) -> bool {
self.lock().expect("it should get a lock").is_good(policy)
}

fn peers_is_empty(&self) -> bool {
self.lock().expect("it should get a lock").peers_is_empty()
}

fn get_peers_len(&self) -> usize {
self.lock().expect("it should get a lock").get_peers_len()
}

fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().expect("it should get lock").get_peers(limit)
}

fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().expect("it should get lock").get_peers_for_client(client, limit)
}

fn upsert_peer(&self, peer: &peer::Peer) -> bool {
self.lock().expect("it should lock the entry").upsert_peer(peer)
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
self.lock()
.expect("it should lock the entry")
.remove_inactive_peers(current_cutoff);
}
}

impl<T> From<EntrySingle<T>> for EntryMutexStd<T> {
fn from(entry: EntrySingle<T>) -> Self {
Arc::new(std::sync::Mutex::new(entry))
}
}
42 changes: 38 additions & 4 deletions packages/torrent-repository/src/entry/mutex_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};

use super::{Entry, EntryAsync};
use crate::{EntryMutexTokio, EntrySingle};
use crate::{BTreeMapPeerList, EntryMutexTokio, EntrySingle, SkipMapPeerList};

impl EntryAsync for EntryMutexTokio {
impl EntryAsync for EntryMutexTokio<BTreeMapPeerList> {
async fn get_swarm_metadata(&self) -> SwarmMetadata {
self.lock().await.get_swarm_metadata()
}
Expand Down Expand Up @@ -42,8 +42,42 @@ impl EntryAsync for EntryMutexTokio {
}
}

impl From<EntrySingle> for EntryMutexTokio {
fn from(entry: EntrySingle) -> Self {
impl EntryAsync for EntryMutexTokio<SkipMapPeerList> {
async fn get_swarm_metadata(&self) -> SwarmMetadata {
self.lock().await.get_swarm_metadata()
}

async fn check_good(self, policy: &TrackerPolicy) -> bool {
self.lock().await.is_good(policy)
}

async fn peers_is_empty(&self) -> bool {
self.lock().await.peers_is_empty()
}

async fn get_peers_len(&self) -> usize {
self.lock().await.get_peers_len()
}

async fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().await.get_peers(limit)
}

async fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
self.lock().await.get_peers_for_client(client, limit)
}

async fn upsert_peer(self, peer: &peer::Peer) -> bool {
self.lock().await.upsert_peer(peer)
}

async fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) {
self.lock().await.remove_inactive_peers(current_cutoff);
}
}

impl<T> From<EntrySingle<T>> for EntryMutexTokio<T> {
fn from(entry: EntrySingle<T>) -> Self {
Arc::new(tokio::sync::Mutex::new(entry))
}
}
111 changes: 108 additions & 3 deletions packages/torrent-repository/src/entry/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::sync::Arc;

use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::announce_event::AnnounceEvent;
use torrust_tracker_primitives::peer::{self};
use torrust_tracker_primitives::peer::{self, ReadInfo};
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::DurationSinceUnixEpoch;

use super::Entry;
use crate::EntrySingle;
use crate::{BTreeMapPeerList, EntrySingle, SkipMapPeerList};

impl Entry for EntrySingle {
impl Entry for EntrySingle<BTreeMapPeerList> {
#[allow(clippy::cast_possible_truncation)]
fn get_swarm_metadata(&self) -> SwarmMetadata {
let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32;
Expand Down Expand Up @@ -98,3 +98,108 @@ impl Entry for EntrySingle {
.retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff);
}
}

impl Entry for EntrySingle<SkipMapPeerList> {
#[allow(clippy::cast_possible_truncation)]
fn get_swarm_metadata(&self) -> SwarmMetadata {
let complete: u32 = self.peers.iter().filter(|entry| entry.value().is_seeder()).count() as u32;
let incomplete: u32 = self.peers.len() as u32 - complete;

SwarmMetadata {
downloaded: self.downloaded,
complete,
incomplete,
}
}

fn is_good(&self, policy: &TrackerPolicy) -> bool {
if policy.persistent_torrent_completed_stat && self.downloaded > 0 {
return true;
}

if policy.remove_peerless_torrents && self.peers.is_empty() {
return false;
}

true
}

fn peers_is_empty(&self) -> bool {
self.peers.is_empty()
}

fn get_peers_len(&self) -> usize {
self.peers.len()
}
fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
match limit {
Some(limit) => self.peers.iter().take(limit).map(|entry| entry.value().clone()).collect(),
None => self.peers.iter().map(|entry| entry.value().clone()).collect(),
}
}

fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
match limit {
Some(limit) => self
.peers
.iter()
// Take peers which are not the client peer
.filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client)
// Limit the number of peers on the result
.take(limit)
.map(|entry| entry.value().clone())
.collect(),
None => self
.peers
.iter()
// Take peers which are not the client peer
.filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client)
.map(|entry| entry.value().clone())
.collect(),
}
}

fn upsert_peer(&mut self, peer: &peer::Peer) -> bool {
let mut downloaded_stats_updated: bool = false;

match peer::ReadInfo::get_event(peer) {
AnnounceEvent::Stopped => {
drop(self.peers.remove(&peer::ReadInfo::get_id(peer)));
}
AnnounceEvent::Completed => {
let previous = self.peers.get(&peer.get_id());

let increase_downloads = match previous {
Some(entry) => {
// Don't count if peer was already completed.
entry.value().event != AnnounceEvent::Completed
}
None => {
// Don't count if peer was not previously known
false
}
};

self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer));

if increase_downloads {
self.downloaded += 1;
downloaded_stats_updated = true;
}
}
_ => {
drop(self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer)));
}
}

downloaded_stats_updated
}

fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) {
for entry in &self.peers {
if entry.value().get_updated() >= current_cutoff {
entry.remove();
}
}
}
}
41 changes: 30 additions & 11 deletions packages/torrent-repository/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,46 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use crossbeam_skiplist::SkipMap;
use repository::dash_map_mutex_std::XacrimonDashMap;
use repository::rw_lock_std::RwLockStd;
use repository::rw_lock_tokio::RwLockTokio;
use repository::skip_map_mutex_std::CrossbeamSkipList;
use torrust_tracker_clock::clock;
use torrust_tracker_primitives::peer;

pub mod entry;
pub mod repository;

pub type EntrySingle = entry::Torrent;
pub type EntryMutexStd = Arc<std::sync::Mutex<entry::Torrent>>;
pub type EntryMutexTokio = Arc<tokio::sync::Mutex<entry::Torrent>>;
// Peer List

pub type TorrentsRwLockStd = RwLockStd<EntrySingle>;
pub type TorrentsRwLockStdMutexStd = RwLockStd<EntryMutexStd>;
pub type TorrentsRwLockStdMutexTokio = RwLockStd<EntryMutexTokio>;
pub type TorrentsRwLockTokio = RwLockTokio<EntrySingle>;
pub type TorrentsRwLockTokioMutexStd = RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = RwLockTokio<EntryMutexTokio>;
pub type BTreeMapPeerList = BTreeMap<peer::Id, Arc<peer::Peer>>;
pub type SkipMapPeerList = SkipMap<peer::Id, Arc<peer::Peer>>;

pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;
pub type TorrentsDashMapMutexStd = XacrimonDashMap<EntryMutexStd>;
// Torrent Entry

pub type EntrySingle<T> = entry::Torrent<T>;
pub type EntryMutexStd<T> = Arc<std::sync::Mutex<EntrySingle<T>>>;
pub type EntryMutexTokio<T> = Arc<tokio::sync::Mutex<EntrySingle<T>>>;

// Repos

// Torrent repo and peer list: BTreeMap
pub type TorrentsRwLockStd = RwLockStd<EntrySingle<BTreeMapPeerList>>;
pub type TorrentsRwLockStdMutexStd = RwLockStd<EntryMutexStd<BTreeMapPeerList>>;
pub type TorrentsRwLockStdMutexTokio = RwLockStd<EntryMutexTokio<BTreeMapPeerList>>;
pub type TorrentsRwLockTokio = RwLockTokio<EntrySingle<BTreeMapPeerList>>;
pub type TorrentsRwLockTokioMutexStd = RwLockTokio<EntryMutexStd<BTreeMapPeerList>>;
pub type TorrentsRwLockTokioMutexTokio = RwLockTokio<EntryMutexTokio<BTreeMapPeerList>>;

// Torrent repo: SkipMap; Peer list: BTreeMap
pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd<BTreeMapPeerList>>;

// Torrent repo: DashMap; Peer list: BTreeMap
pub type TorrentsDashMapMutexStd = XacrimonDashMap<EntryMutexStd<BTreeMapPeerList>>;

// Torrent repo and peer list: SkipMap
pub type TorrentsSkipMapMutexStdSkipMap = CrossbeamSkipList<EntryMutexStd<SkipMapPeerList>>;

/// This code needs to be copied into each crate.
/// Working version, for production.
Expand Down
14 changes: 7 additions & 7 deletions packages/torrent-repository/src/repository/dash_map_mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent

use super::Repository;
use crate::entry::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};
use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle};

#[derive(Default, Debug)]
pub struct XacrimonDashMap<T> {
pub torrents: DashMap<InfoHash, T>,
}

impl Repository<EntryMutexStd> for XacrimonDashMap<EntryMutexStd>
impl Repository<EntryMutexStd<BTreeMapPeerList>> for XacrimonDashMap<EntryMutexStd<BTreeMapPeerList>>
where
EntryMutexStd: EntrySync,
EntrySingle: Entry,
EntryMutexStd<BTreeMapPeerList>: EntrySync,
EntrySingle<BTreeMapPeerList>: Entry,
{
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
if let Some(entry) = self.torrents.get(info_hash) {
Expand All @@ -38,7 +38,7 @@ where
self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata())
}

fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
fn get(&self, key: &InfoHash) -> Option<EntryMutexStd<BTreeMapPeerList>> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.clone())
}
Expand All @@ -57,7 +57,7 @@ where
metrics
}

fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> {
fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd<BTreeMapPeerList>)> {
match pagination {
Some(pagination) => self
.torrents
Expand Down Expand Up @@ -92,7 +92,7 @@ where
}
}

fn remove(&self, key: &InfoHash) -> Option<EntryMutexStd> {
fn remove(&self, key: &InfoHash) -> Option<EntryMutexStd<BTreeMapPeerList>> {
self.torrents.remove(key).map(|(_key, value)| value.clone())
}

Expand Down
Loading