diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index d72ff254b..cbc975942 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -2,7 +2,6 @@ use std::fmt::Debug; use std::net::SocketAddr; use std::sync::Arc; -//use serde::{Deserialize, Serialize}; use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; @@ -80,10 +79,10 @@ pub trait EntryAsync { /// that's the list of all the peers trying to download the same torrent. /// The tracker keeps one entry like this for every torrent. #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Torrent { +pub struct Torrent { /// The swarm: a network of peers that are all trying to download the torrent associated to this entry // #[serde(skip)] - pub(crate) peers: std::collections::BTreeMap>, + pub(crate) peers: T, /// The number of peers that have ever completed downloading the torrent associated to this entry pub(crate) downloaded: u32, } diff --git a/packages/torrent-repository/src/entry/mutex_std.rs b/packages/torrent-repository/src/entry/mutex_std.rs index 990d8ab76..16818d16b 100644 --- a/packages/torrent-repository/src/entry/mutex_std.rs +++ b/packages/torrent-repository/src/entry/mutex_std.rs @@ -6,9 +6,9 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use super::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, SkipMapPeerList}; -impl EntrySync for EntryMutexStd { +impl EntrySync for EntryMutexStd { fn get_swarm_metadata(&self) -> SwarmMetadata { self.lock().expect("it should get a lock").get_swarm_metadata() } @@ -44,8 +44,44 @@ impl EntrySync for EntryMutexStd { } } -impl From for EntryMutexStd { - fn from(entry: EntrySingle) -> Self { +impl EntrySync for EntryMutexStd { + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().expect("it should get a lock").get_swarm_metadata() + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + self.lock().expect("it should get a lock").is_good(policy) + } + + fn peers_is_empty(&self) -> bool { + self.lock().expect("it should get a lock").peers_is_empty() + } + + fn get_peers_len(&self) -> usize { + self.lock().expect("it should get a lock").get_peers_len() + } + + fn get_peers(&self, limit: Option) -> Vec> { + self.lock().expect("it should get lock").get_peers(limit) + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.lock().expect("it should get lock").get_peers_for_client(client, limit) + } + + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.lock().expect("it should lock the entry").upsert_peer(peer) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + self.lock() + .expect("it should lock the entry") + .remove_inactive_peers(current_cutoff); + } +} + +impl From> for EntryMutexStd { + fn from(entry: EntrySingle) -> Self { Arc::new(std::sync::Mutex::new(entry)) } } diff --git a/packages/torrent-repository/src/entry/mutex_tokio.rs b/packages/torrent-repository/src/entry/mutex_tokio.rs index c5363e51a..84f292dca 100644 --- a/packages/torrent-repository/src/entry/mutex_tokio.rs +++ b/packages/torrent-repository/src/entry/mutex_tokio.rs @@ -6,9 +6,9 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use super::{Entry, EntryAsync}; -use crate::{EntryMutexTokio, EntrySingle}; +use crate::{BTreeMapPeerList, EntryMutexTokio, EntrySingle, SkipMapPeerList}; -impl EntryAsync for EntryMutexTokio { +impl EntryAsync for EntryMutexTokio { async fn get_swarm_metadata(&self) -> SwarmMetadata { self.lock().await.get_swarm_metadata() } @@ -42,8 +42,42 @@ impl EntryAsync for EntryMutexTokio { } } -impl From for EntryMutexTokio { - fn from(entry: EntrySingle) -> Self { +impl EntryAsync for EntryMutexTokio { + async fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().await.get_swarm_metadata() + } + + async fn check_good(self, policy: &TrackerPolicy) -> bool { + self.lock().await.is_good(policy) + } + + async fn peers_is_empty(&self) -> bool { + self.lock().await.peers_is_empty() + } + + async fn get_peers_len(&self) -> usize { + self.lock().await.get_peers_len() + } + + async fn get_peers(&self, limit: Option) -> Vec> { + self.lock().await.get_peers(limit) + } + + async fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.lock().await.get_peers_for_client(client, limit) + } + + async fn upsert_peer(self, peer: &peer::Peer) -> bool { + self.lock().await.upsert_peer(peer) + } + + async fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) { + self.lock().await.remove_inactive_peers(current_cutoff); + } +} + +impl From> for EntryMutexTokio { + fn from(entry: EntrySingle) -> Self { Arc::new(tokio::sync::Mutex::new(entry)) } } diff --git a/packages/torrent-repository/src/entry/single.rs b/packages/torrent-repository/src/entry/single.rs index a38b54023..1c5b44229 100644 --- a/packages/torrent-repository/src/entry/single.rs +++ b/packages/torrent-repository/src/entry/single.rs @@ -3,14 +3,14 @@ use std::sync::Arc; use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::announce_event::AnnounceEvent; -use torrust_tracker_primitives::peer::{self}; +use torrust_tracker_primitives::peer::{self, ReadInfo}; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::DurationSinceUnixEpoch; use super::Entry; -use crate::EntrySingle; +use crate::{BTreeMapPeerList, EntrySingle, SkipMapPeerList}; -impl Entry for EntrySingle { +impl Entry for EntrySingle { #[allow(clippy::cast_possible_truncation)] fn get_swarm_metadata(&self) -> SwarmMetadata { let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32; @@ -98,3 +98,108 @@ impl Entry for EntrySingle { .retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff); } } + +impl Entry for EntrySingle { + #[allow(clippy::cast_possible_truncation)] + fn get_swarm_metadata(&self) -> SwarmMetadata { + let complete: u32 = self.peers.iter().filter(|entry| entry.value().is_seeder()).count() as u32; + let incomplete: u32 = self.peers.len() as u32 - complete; + + SwarmMetadata { + downloaded: self.downloaded, + complete, + incomplete, + } + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + if policy.persistent_torrent_completed_stat && self.downloaded > 0 { + return true; + } + + if policy.remove_peerless_torrents && self.peers.is_empty() { + return false; + } + + true + } + + fn peers_is_empty(&self) -> bool { + self.peers.is_empty() + } + + fn get_peers_len(&self) -> usize { + self.peers.len() + } + fn get_peers(&self, limit: Option) -> Vec> { + match limit { + Some(limit) => self.peers.iter().take(limit).map(|entry| entry.value().clone()).collect(), + None => self.peers.iter().map(|entry| entry.value().clone()).collect(), + } + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + match limit { + Some(limit) => self + .peers + .iter() + // Take peers which are not the client peer + .filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client) + // Limit the number of peers on the result + .take(limit) + .map(|entry| entry.value().clone()) + .collect(), + None => self + .peers + .iter() + // Take peers which are not the client peer + .filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client) + .map(|entry| entry.value().clone()) + .collect(), + } + } + + fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { + let mut downloaded_stats_updated: bool = false; + + match peer::ReadInfo::get_event(peer) { + AnnounceEvent::Stopped => { + drop(self.peers.remove(&peer::ReadInfo::get_id(peer))); + } + AnnounceEvent::Completed => { + let previous = self.peers.get(&peer.get_id()); + + let increase_downloads = match previous { + Some(entry) => { + // Don't count if peer was already completed. + entry.value().event != AnnounceEvent::Completed + } + None => { + // Don't count if peer was not previously known + false + } + }; + + self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer)); + + if increase_downloads { + self.downloaded += 1; + downloaded_stats_updated = true; + } + } + _ => { + drop(self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer))); + } + } + + downloaded_stats_updated + } + + fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.peers { + if entry.value().get_updated() >= current_cutoff { + entry.remove(); + } + } + } +} diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index 7a6d209b9..ba70b4463 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -1,27 +1,46 @@ +use std::collections::BTreeMap; use std::sync::Arc; +use crossbeam_skiplist::SkipMap; use repository::dash_map_mutex_std::XacrimonDashMap; use repository::rw_lock_std::RwLockStd; use repository::rw_lock_tokio::RwLockTokio; use repository::skip_map_mutex_std::CrossbeamSkipList; use torrust_tracker_clock::clock; +use torrust_tracker_primitives::peer; pub mod entry; pub mod repository; -pub type EntrySingle = entry::Torrent; -pub type EntryMutexStd = Arc>; -pub type EntryMutexTokio = Arc>; +// Peer List -pub type TorrentsRwLockStd = RwLockStd; -pub type TorrentsRwLockStdMutexStd = RwLockStd; -pub type TorrentsRwLockStdMutexTokio = RwLockStd; -pub type TorrentsRwLockTokio = RwLockTokio; -pub type TorrentsRwLockTokioMutexStd = RwLockTokio; -pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; +pub type BTreeMapPeerList = BTreeMap>; +pub type SkipMapPeerList = SkipMap>; -pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; -pub type TorrentsDashMapMutexStd = XacrimonDashMap; +// Torrent Entry + +pub type EntrySingle = entry::Torrent; +pub type EntryMutexStd = Arc>>; +pub type EntryMutexTokio = Arc>>; + +// Repos + +// Torrent repo and peer list: BTreeMap +pub type TorrentsRwLockStd = RwLockStd>; +pub type TorrentsRwLockStdMutexStd = RwLockStd>; +pub type TorrentsRwLockStdMutexTokio = RwLockStd>; +pub type TorrentsRwLockTokio = RwLockTokio>; +pub type TorrentsRwLockTokioMutexStd = RwLockTokio>; +pub type TorrentsRwLockTokioMutexTokio = RwLockTokio>; + +// Torrent repo: SkipMap; Peer list: BTreeMap +pub type TorrentsSkipMapMutexStd = CrossbeamSkipList>; + +// Torrent repo: DashMap; Peer list: BTreeMap +pub type TorrentsDashMapMutexStd = XacrimonDashMap>; + +// Torrent repo and peer list: SkipMap +pub type TorrentsSkipMapMutexStdSkipMap = CrossbeamSkipList>; /// This code needs to be copied into each crate. /// Working version, for production. diff --git a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs index b398b09dc..517a9b31f 100644 --- a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs @@ -11,17 +11,17 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle}; #[derive(Default, Debug)] pub struct XacrimonDashMap { pub torrents: DashMap, } -impl Repository for XacrimonDashMap +impl Repository> for XacrimonDashMap> where - EntryMutexStd: EntrySync, - EntrySingle: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { if let Some(entry) = self.torrents.get(info_hash) { @@ -38,7 +38,7 @@ where self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let maybe_entry = self.torrents.get(key); maybe_entry.map(|entry| entry.clone()) } @@ -57,7 +57,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { match pagination { Some(pagination) => self .torrents @@ -92,7 +92,7 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { self.torrents.remove(key).map(|(_key, value)| value.clone()) } diff --git a/packages/torrent-repository/src/repository/rw_lock_std.rs b/packages/torrent-repository/src/repository/rw_lock_std.rs index af48428e4..2f6ce2fa0 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std.rs @@ -9,7 +9,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::Entry; -use crate::{EntrySingle, TorrentsRwLockStd}; +use crate::{BTreeMapPeerList, EntrySingle, TorrentsRwLockStd}; #[derive(Default, Debug)] pub struct RwLockStd { @@ -28,24 +28,28 @@ impl RwLockStd { } impl TorrentsRwLockStd { - fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + fn get_torrents<'a>( + &'a self, + ) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().expect("it should get the read lock") } - fn get_torrents_mut<'a>(&'a self) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + fn get_torrents_mut<'a>( + &'a self, + ) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().expect("it should get the write lock") } } -impl Repository for TorrentsRwLockStd +impl Repository> for TorrentsRwLockStd where - EntrySingle: Entry, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let mut db = self.get_torrents_mut(); @@ -59,7 +63,7 @@ where self.get(info_hash).map(|entry| entry.get_swarm_metadata()) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents(); db.get(key).cloned() } @@ -78,7 +82,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { let db = self.get_torrents(); match pagination { @@ -110,7 +114,7 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut(); db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs index 74cdc4475..f4e95fb6b 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs @@ -10,28 +10,32 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle, TorrentsRwLockStdMutexStd}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, TorrentsRwLockStdMutexStd}; impl TorrentsRwLockStdMutexStd { - fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + fn get_torrents<'a>( + &'a self, + ) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().expect("unable to get torrent list") } - fn get_torrents_mut<'a>(&'a self) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + fn get_torrents_mut<'a>( + &'a self, + ) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().expect("unable to get writable torrent list") } } -impl Repository for TorrentsRwLockStdMutexStd +impl Repository> for TorrentsRwLockStdMutexStd where - EntryMutexStd: EntrySync, - EntrySingle: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().get(info_hash).cloned(); @@ -53,7 +57,7 @@ where .map(super::super::entry::EntrySync::get_swarm_metadata) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents(); db.get(key).cloned() } @@ -72,7 +76,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { let db = self.get_torrents(); match pagination { @@ -107,7 +111,7 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut(); db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs index 83ac02c91..c7202f180 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs @@ -14,28 +14,32 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::RepositoryAsync; use crate::entry::{Entry, EntryAsync}; -use crate::{EntryMutexTokio, EntrySingle, TorrentsRwLockStdMutexTokio}; +use crate::{BTreeMapPeerList, EntryMutexTokio, EntrySingle, TorrentsRwLockStdMutexTokio}; impl TorrentsRwLockStdMutexTokio { - fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + fn get_torrents<'a>( + &'a self, + ) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().expect("unable to get torrent list") } - fn get_torrents_mut<'a>(&'a self) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + fn get_torrents_mut<'a>( + &'a self, + ) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().expect("unable to get writable torrent list") } } -impl RepositoryAsync for TorrentsRwLockStdMutexTokio +impl RepositoryAsync> for TorrentsRwLockStdMutexTokio where - EntryMutexTokio: EntryAsync, - EntrySingle: Entry, + EntryMutexTokio: EntryAsync, + EntrySingle: Entry, { async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().get(info_hash).cloned(); @@ -60,12 +64,12 @@ where } } - async fn get(&self, key: &InfoHash) -> Option { + async fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents(); db.get(key).cloned() } - async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexTokio)> { + async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexTokio)> { let db = self.get_torrents(); match pagination { @@ -116,7 +120,7 @@ where } } - async fn remove(&self, key: &InfoHash) -> Option { + async fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut(); db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio.rs index b95f1e31e..56f9b95a8 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio.rs @@ -9,7 +9,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::RepositoryAsync; use crate::entry::Entry; -use crate::{EntrySingle, TorrentsRwLockTokio}; +use crate::{BTreeMapPeerList, EntrySingle, TorrentsRwLockTokio}; #[derive(Default, Debug)] pub struct RwLockTokio { @@ -30,26 +30,28 @@ impl RwLockTokio { } impl TorrentsRwLockTokio { - async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + async fn get_torrents<'a>( + &'a self, + ) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().await } async fn get_torrents_mut<'a>( &'a self, - ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().await } } -impl RepositoryAsync for TorrentsRwLockTokio +impl RepositoryAsync> for TorrentsRwLockTokio where - EntrySingle: Entry, + EntrySingle: Entry, { async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let mut db = self.get_torrents_mut().await; @@ -63,12 +65,12 @@ where self.get(info_hash).await.map(|entry| entry.get_swarm_metadata()) } - async fn get(&self, key: &InfoHash) -> Option { + async fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents().await; db.get(key).cloned() } - async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { + async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { let db = self.get_torrents().await; match pagination { @@ -114,7 +116,7 @@ where } } - async fn remove(&self, key: &InfoHash) -> Option { + async fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut().await; db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs index bde959940..2827e5933 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs @@ -10,30 +10,32 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::RepositoryAsync; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle, TorrentsRwLockTokioMutexStd}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, TorrentsRwLockTokioMutexStd}; impl TorrentsRwLockTokioMutexStd { - async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + async fn get_torrents<'a>( + &'a self, + ) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().await } async fn get_torrents_mut<'a>( &'a self, - ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().await } } -impl RepositoryAsync for TorrentsRwLockTokioMutexStd +impl RepositoryAsync> for TorrentsRwLockTokioMutexStd where - EntryMutexStd: EntrySync, - EntrySingle: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); @@ -53,12 +55,12 @@ where self.get(info_hash).await.map(|entry| entry.get_swarm_metadata()) } - async fn get(&self, key: &InfoHash) -> Option { + async fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents().await; db.get(key).cloned() } - async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { let db = self.get_torrents().await; match pagination { @@ -107,7 +109,7 @@ where } } - async fn remove(&self, key: &InfoHash) -> Option { + async fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut().await; db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs index 1d002e317..3809a8492 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs @@ -10,30 +10,32 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::RepositoryAsync; use crate::entry::{Entry, EntryAsync}; -use crate::{EntryMutexTokio, EntrySingle, TorrentsRwLockTokioMutexTokio}; +use crate::{BTreeMapPeerList, EntryMutexTokio, EntrySingle, TorrentsRwLockTokioMutexTokio}; impl TorrentsRwLockTokioMutexTokio { - async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + async fn get_torrents<'a>( + &'a self, + ) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().await } async fn get_torrents_mut<'a>( &'a self, - ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().await } } -impl RepositoryAsync for TorrentsRwLockTokioMutexTokio +impl RepositoryAsync> for TorrentsRwLockTokioMutexTokio where - EntryMutexTokio: EntryAsync, - EntrySingle: Entry, + EntryMutexTokio: EntryAsync, + EntrySingle: Entry, { async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); @@ -56,12 +58,12 @@ where } } - async fn get(&self, key: &InfoHash) -> Option { + async fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents().await; db.get(key).cloned() } - async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexTokio)> { + async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexTokio)> { let db = self.get_torrents().await; match pagination { @@ -110,7 +112,7 @@ where } } - async fn remove(&self, key: &InfoHash) -> Option { + async fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut().await; db.remove(key) } diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index ef3e7e478..e3c5b0977 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -11,17 +11,17 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, SkipMapPeerList}; #[derive(Default, Debug)] pub struct CrossbeamSkipList { pub torrents: SkipMap, } -impl Repository for CrossbeamSkipList +impl Repository> for CrossbeamSkipList> where - EntryMutexStd: EntrySync, - EntrySingle: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); @@ -32,7 +32,7 @@ where self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let maybe_entry = self.torrents.get(key); maybe_entry.map(|entry| entry.value().clone()) } @@ -51,7 +51,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { match pagination { Some(pagination) => self .torrents @@ -88,7 +88,98 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { + self.torrents.remove(key).map(|entry| entry.value().clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + for entry in &self.torrents { + if entry.value().is_good(policy) { + continue; + } + + entry.remove(); + } + } +} + +impl Repository> for CrossbeamSkipList> +where + EntryMutexStd: EntrySync, + EntrySingle: Entry, +{ + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); + entry.value().upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) + } + + fn get(&self, key: &InfoHash) -> Option> { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.value().clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntryMutexStd::new( + EntrySingle { + peers: SkipMap::default(), + downloaded: *completed, + } + .into(), + ); + + // Since SkipMap is lock-free the torrent could have been inserted + // after checking if it exists. + self.torrents.get_or_insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option> { self.torrents.remove(key).map(|entry| entry.value().clone()) } diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 7c245fe04..fc5160cd4 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -6,8 +6,9 @@ use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; use torrust_tracker_torrent_repository::{ - EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, - TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, + BTreeMapPeerList, EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, + TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, + TorrentsSkipMapMutexStd, }; #[derive(Debug)] @@ -49,7 +50,7 @@ impl Repo { } } - pub(crate) async fn get(&self, key: &InfoHash) -> Option { + pub(crate) async fn get(&self, key: &InfoHash) -> Option> { match self { Repo::RwLockStd(repo) => repo.get(key), Repo::RwLockStdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), @@ -75,7 +76,7 @@ impl Repo { } } - pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { + pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { match self { Repo::RwLockStd(repo) => repo.get_paginated(pagination), Repo::RwLockStdMutexStd(repo) => repo @@ -84,7 +85,7 @@ impl Repo { .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), Repo::RwLockStdMutexTokio(repo) => { - let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; + let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; for (i, t) in repo.get_paginated(pagination).await { v.push((i, t.lock().await.clone())); @@ -99,7 +100,7 @@ impl Repo { .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), Repo::RwLockTokioMutexTokio(repo) => { - let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; + let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; for (i, t) in repo.get_paginated(pagination).await { v.push((i, t.lock().await.clone())); @@ -132,7 +133,7 @@ impl Repo { } } - pub(crate) async fn remove(&self, key: &InfoHash) -> Option { + pub(crate) async fn remove(&self, key: &InfoHash) -> Option> { match self { Repo::RwLockStd(repo) => repo.remove(key), Repo::RwLockStdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), @@ -171,7 +172,11 @@ impl Repo { } } - pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option { + pub(crate) async fn insert( + &self, + info_hash: &InfoHash, + torrent: EntrySingle, + ) -> Option> { match self { Repo::RwLockStd(repo) => { repo.write().insert(*info_hash, torrent); diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs index c0699479e..09a18e869 100644 --- a/packages/torrent-repository/tests/common/torrent.rs +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -5,13 +5,13 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _}; -use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; +use torrust_tracker_torrent_repository::{BTreeMapPeerList, EntryMutexStd, EntryMutexTokio, EntrySingle}; #[derive(Debug, Clone)] pub(crate) enum Torrent { - Single(EntrySingle), - MutexStd(EntryMutexStd), - MutexTokio(EntryMutexTokio), + Single(EntrySingle), + MutexStd(EntryMutexStd), + MutexTokio(EntryMutexTokio), } impl Torrent { diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index fde34467e..c31c9c32a 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -13,7 +13,7 @@ use torrust_tracker_torrent_repository::repository::dash_map_mutex_std::Xacrimon use torrust_tracker_torrent_repository::repository::rw_lock_std::RwLockStd; use torrust_tracker_torrent_repository::repository::rw_lock_tokio::RwLockTokio; use torrust_tracker_torrent_repository::repository::skip_map_mutex_std::CrossbeamSkipList; -use torrust_tracker_torrent_repository::EntrySingle; +use torrust_tracker_torrent_repository::{BTreeMapPeerList, EntrySingle}; use crate::common::repo::Repo; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; @@ -58,7 +58,7 @@ fn dash_map_std() -> Repo { Repo::DashMapMutexStd(XacrimonDashMap::default()) } -type Entries = Vec<(InfoHash, EntrySingle)>; +type Entries = Vec<(InfoHash, EntrySingle)>; #[fixture] fn empty() -> Entries { @@ -125,7 +125,7 @@ fn three() -> Entries { #[fixture] fn many_out_of_order() -> Entries { - let mut entries: HashSet<(InfoHash, EntrySingle)> = HashSet::default(); + let mut entries: HashSet<(InfoHash, EntrySingle)> = HashSet::default(); for i in 0..408 { let mut entry = EntrySingle::default(); @@ -140,7 +140,7 @@ fn many_out_of_order() -> Entries { #[fixture] fn many_hashed_in_order() -> Entries { - let mut entries: BTreeMap = BTreeMap::default(); + let mut entries: BTreeMap> = BTreeMap::default(); for i in 0..408 { let mut entry = EntrySingle::default(); diff --git a/src/core/torrent/mod.rs b/src/core/torrent/mod.rs index 286a7e047..ed6133cda 100644 --- a/src/core/torrent/mod.rs +++ b/src/core/torrent/mod.rs @@ -25,6 +25,9 @@ //! - The number of peers that have NOT completed downloading the torrent and are still active, that means they are actively participating in the network. //! Peer that don not have a full copy of the torrent data are called "leechers". //! -use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd; -pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used +//use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd; +//pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used + +use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStdSkipMap; +pub type Torrents = TorrentsSkipMapMutexStdSkipMap; // Currently Used