From bc95b84e2361636b78e60bb83c01633dd6bf4d8c Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 15 Apr 2024 11:15:10 +0100 Subject: [PATCH 1/2] feat: new peer list impl with SkipMap in torrent repo --- packages/torrent-repository/src/entry/mod.rs | 13 +- .../torrent-repository/src/entry/skip_map.rs | 119 ++++++++++++++++++ .../src/entry/skip_map_mutex_std.rs | 51 ++++++++ packages/torrent-repository/src/lib.rs | 9 ++ .../src/repository/skip_map_mutex_std.rs | 93 +++++++++++++- src/core/torrent/mod.rs | 7 +- 6 files changed, 288 insertions(+), 4 deletions(-) create mode 100644 packages/torrent-repository/src/entry/skip_map.rs create mode 100644 packages/torrent-repository/src/entry/skip_map_mutex_std.rs diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index d72ff254b..57154dd4c 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use std::net::SocketAddr; use std::sync::Arc; -//use serde::{Deserialize, Serialize}; +use crossbeam_skiplist::SkipMap; use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; @@ -10,6 +10,8 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; pub mod mutex_std; pub mod mutex_tokio; pub mod single; +pub mod skip_map; +pub mod skip_map_mutex_std; pub trait Entry { /// It returns the swarm metadata (statistics) as a struct: @@ -87,3 +89,12 @@ pub struct Torrent { /// The number of peers that have ever completed downloading the torrent associated to this entry pub(crate) downloaded: u32, } + +#[derive(Debug, Default)] +pub struct SkipMapTorrent { + /// The swarm: a network of peers that are all trying to download the torrent associated to this entry + // #[serde(skip)] + pub(crate) peers: SkipMap>, + /// The number of peers that have ever completed downloading the torrent associated to this entry + pub(crate) downloaded: u32, +} diff --git a/packages/torrent-repository/src/entry/skip_map.rs b/packages/torrent-repository/src/entry/skip_map.rs new file mode 100644 index 000000000..0a173369b --- /dev/null +++ b/packages/torrent-repository/src/entry/skip_map.rs @@ -0,0 +1,119 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::announce_event::AnnounceEvent; +use torrust_tracker_primitives::peer::{self, ReadInfo}; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::DurationSinceUnixEpoch; + +use super::Entry; +use crate::EntrySkipMap; + +impl Entry for EntrySkipMap { + #[allow(clippy::cast_possible_truncation)] + fn get_swarm_metadata(&self) -> SwarmMetadata { + let complete: u32 = self.peers.iter().filter(|entry| entry.value().is_seeder()).count() as u32; + let incomplete: u32 = self.peers.len() as u32 - complete; + + SwarmMetadata { + downloaded: self.downloaded, + complete, + incomplete, + } + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + if policy.persistent_torrent_completed_stat && self.downloaded > 0 { + return true; + } + + if policy.remove_peerless_torrents && self.peers.is_empty() { + return false; + } + + true + } + + fn peers_is_empty(&self) -> bool { + self.peers.is_empty() + } + + fn get_peers_len(&self) -> usize { + self.peers.len() + } + fn get_peers(&self, limit: Option) -> Vec> { + match limit { + Some(limit) => self.peers.iter().take(limit).map(|entry| entry.value().clone()).collect(), + None => self.peers.iter().map(|entry| entry.value().clone()).collect(), + } + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + match limit { + Some(limit) => self + .peers + .iter() + // Take peers which are not the client peer + .filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client) + // Limit the number of peers on the result + .take(limit) + .map(|entry| entry.value().clone()) + .collect(), + None => self + .peers + .iter() + // Take peers which are not the client peer + .filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client) + .map(|entry| entry.value().clone()) + .collect(), + } + } + + fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { + let mut downloaded_stats_updated: bool = false; + + match peer::ReadInfo::get_event(peer) { + AnnounceEvent::Stopped => { + drop(self.peers.remove(&peer::ReadInfo::get_id(peer))); + } + AnnounceEvent::Completed => { + let previous = self.peers.get(&peer.get_id()); + + let increase_downloads = match previous { + Some(entry) => { + // Don't count if peer was already completed. + entry.value().event != AnnounceEvent::Completed + } + None => { + // Don't count if peer was not previously known + false + } + }; + + self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer)); + + if increase_downloads { + self.downloaded += 1; + downloaded_stats_updated = true; + } + } + _ => { + drop(self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer))); + } + } + + downloaded_stats_updated + } + + fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { + //self.peers + // .retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff); + + for entry in &self.peers { + if entry.value().get_updated() >= current_cutoff { + entry.remove(); + } + } + } +} diff --git a/packages/torrent-repository/src/entry/skip_map_mutex_std.rs b/packages/torrent-repository/src/entry/skip_map_mutex_std.rs new file mode 100644 index 000000000..79a4fdce3 --- /dev/null +++ b/packages/torrent-repository/src/entry/skip_map_mutex_std.rs @@ -0,0 +1,51 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; + +use super::{Entry, EntrySync}; +use crate::{EntrySkipMap, EntrySkipMapMutexStd}; + +impl EntrySync for EntrySkipMapMutexStd { + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().expect("it should get a lock").get_swarm_metadata() + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + self.lock().expect("it should get a lock").is_good(policy) + } + + fn peers_is_empty(&self) -> bool { + self.lock().expect("it should get a lock").peers_is_empty() + } + + fn get_peers_len(&self) -> usize { + self.lock().expect("it should get a lock").get_peers_len() + } + + fn get_peers(&self, limit: Option) -> Vec> { + self.lock().expect("it should get lock").get_peers(limit) + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.lock().expect("it should get lock").get_peers_for_client(client, limit) + } + + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.lock().expect("it should lock the entry").upsert_peer(peer) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + self.lock() + .expect("it should lock the entry") + .remove_inactive_peers(current_cutoff); + } +} + +impl From for EntrySkipMapMutexStd { + fn from(entry: EntrySkipMap) -> Self { + Arc::new(std::sync::Mutex::new(entry)) + } +} diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index 7a6d209b9..d01818af3 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -9,10 +9,17 @@ use torrust_tracker_clock::clock; pub mod entry; pub mod repository; +// Entry + pub type EntrySingle = entry::Torrent; pub type EntryMutexStd = Arc>; pub type EntryMutexTokio = Arc>; +pub type EntrySkipMap = entry::SkipMapTorrent; +pub type EntrySkipMapMutexStd = Arc>; + +// Repos + pub type TorrentsRwLockStd = RwLockStd; pub type TorrentsRwLockStdMutexStd = RwLockStd; pub type TorrentsRwLockStdMutexTokio = RwLockStd; @@ -23,6 +30,8 @@ pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; pub type TorrentsDashMapMutexStd = XacrimonDashMap; +pub type TorrentsSkipMapMutexStdSkipMap = CrossbeamSkipList; + /// This code needs to be copied into each crate. /// Working version, for production. #[cfg(not(test))] diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index ef3e7e478..fa4845e54 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -11,7 +11,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle}; +use crate::{EntryMutexStd, EntrySingle, EntrySkipMap, EntrySkipMapMutexStd}; #[derive(Default, Debug)] pub struct CrossbeamSkipList { @@ -108,3 +108,94 @@ where } } } + +impl Repository for CrossbeamSkipList +where + EntrySkipMapMutexStd: EntrySync, + EntrySkipMap: Entry, +{ + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); + entry.value().upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) + } + + fn get(&self, key: &InfoHash) -> Option { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.value().clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySkipMapMutexStd)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntrySkipMapMutexStd::new( + EntrySkipMap { + peers: SkipMap::default(), + downloaded: *completed, + } + .into(), + ); + + // Since SkipMap is lock-free the torrent could have been inserted + // after checking if it exists. + self.torrents.get_or_insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option { + self.torrents.remove(key).map(|entry| entry.value().clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + for entry in &self.torrents { + if entry.value().is_good(policy) { + continue; + } + + entry.remove(); + } + } +} diff --git a/src/core/torrent/mod.rs b/src/core/torrent/mod.rs index 286a7e047..ed6133cda 100644 --- a/src/core/torrent/mod.rs +++ b/src/core/torrent/mod.rs @@ -25,6 +25,9 @@ //! - The number of peers that have NOT completed downloading the torrent and are still active, that means they are actively participating in the network. //! Peer that don not have a full copy of the torrent data are called "leechers". //! -use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd; -pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used +//use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStd; +//pub type Torrents = TorrentsSkipMapMutexStd; // Currently Used + +use torrust_tracker_torrent_repository::TorrentsSkipMapMutexStdSkipMap; +pub type Torrents = TorrentsSkipMapMutexStdSkipMap; // Currently Used From 4af6c0ba6a601755065bcbea87984302bfda47aa Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 15 Apr 2024 13:04:34 +0100 Subject: [PATCH 2/2] refactor: use generics for perr list implementation with SkipMap --- packages/torrent-repository/src/entry/mod.rs | 16 +-- .../torrent-repository/src/entry/mutex_std.rs | 44 ++++++- .../src/entry/mutex_tokio.rs | 42 ++++++- .../torrent-repository/src/entry/single.rs | 111 +++++++++++++++- .../torrent-repository/src/entry/skip_map.rs | 119 ------------------ .../src/entry/skip_map_mutex_std.rs | 51 -------- packages/torrent-repository/src/lib.rs | 40 +++--- .../src/repository/dash_map_mutex_std.rs | 14 +-- .../src/repository/rw_lock_std.rs | 24 ++-- .../src/repository/rw_lock_std_mutex_std.rs | 26 ++-- .../src/repository/rw_lock_std_mutex_tokio.rs | 26 ++-- .../src/repository/rw_lock_tokio.rs | 22 ++-- .../src/repository/rw_lock_tokio_mutex_std.rs | 24 ++-- .../repository/rw_lock_tokio_mutex_tokio.rs | 24 ++-- .../src/repository/skip_map_mutex_std.rs | 30 ++--- .../torrent-repository/tests/common/repo.rs | 21 ++-- .../tests/common/torrent.rs | 8 +- .../tests/repository/mod.rs | 8 +- 18 files changed, 338 insertions(+), 312 deletions(-) delete mode 100644 packages/torrent-repository/src/entry/skip_map.rs delete mode 100644 packages/torrent-repository/src/entry/skip_map_mutex_std.rs diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index 57154dd4c..cbc975942 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -2,7 +2,6 @@ use std::fmt::Debug; use std::net::SocketAddr; use std::sync::Arc; -use crossbeam_skiplist::SkipMap; use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; @@ -10,8 +9,6 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; pub mod mutex_std; pub mod mutex_tokio; pub mod single; -pub mod skip_map; -pub mod skip_map_mutex_std; pub trait Entry { /// It returns the swarm metadata (statistics) as a struct: @@ -82,19 +79,10 @@ pub trait EntryAsync { /// that's the list of all the peers trying to download the same torrent. /// The tracker keeps one entry like this for every torrent. #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Torrent { +pub struct Torrent { /// The swarm: a network of peers that are all trying to download the torrent associated to this entry // #[serde(skip)] - pub(crate) peers: std::collections::BTreeMap>, - /// The number of peers that have ever completed downloading the torrent associated to this entry - pub(crate) downloaded: u32, -} - -#[derive(Debug, Default)] -pub struct SkipMapTorrent { - /// The swarm: a network of peers that are all trying to download the torrent associated to this entry - // #[serde(skip)] - pub(crate) peers: SkipMap>, + pub(crate) peers: T, /// The number of peers that have ever completed downloading the torrent associated to this entry pub(crate) downloaded: u32, } diff --git a/packages/torrent-repository/src/entry/mutex_std.rs b/packages/torrent-repository/src/entry/mutex_std.rs index 990d8ab76..16818d16b 100644 --- a/packages/torrent-repository/src/entry/mutex_std.rs +++ b/packages/torrent-repository/src/entry/mutex_std.rs @@ -6,9 +6,9 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use super::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, SkipMapPeerList}; -impl EntrySync for EntryMutexStd { +impl EntrySync for EntryMutexStd { fn get_swarm_metadata(&self) -> SwarmMetadata { self.lock().expect("it should get a lock").get_swarm_metadata() } @@ -44,8 +44,44 @@ impl EntrySync for EntryMutexStd { } } -impl From for EntryMutexStd { - fn from(entry: EntrySingle) -> Self { +impl EntrySync for EntryMutexStd { + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().expect("it should get a lock").get_swarm_metadata() + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + self.lock().expect("it should get a lock").is_good(policy) + } + + fn peers_is_empty(&self) -> bool { + self.lock().expect("it should get a lock").peers_is_empty() + } + + fn get_peers_len(&self) -> usize { + self.lock().expect("it should get a lock").get_peers_len() + } + + fn get_peers(&self, limit: Option) -> Vec> { + self.lock().expect("it should get lock").get_peers(limit) + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.lock().expect("it should get lock").get_peers_for_client(client, limit) + } + + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.lock().expect("it should lock the entry").upsert_peer(peer) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + self.lock() + .expect("it should lock the entry") + .remove_inactive_peers(current_cutoff); + } +} + +impl From> for EntryMutexStd { + fn from(entry: EntrySingle) -> Self { Arc::new(std::sync::Mutex::new(entry)) } } diff --git a/packages/torrent-repository/src/entry/mutex_tokio.rs b/packages/torrent-repository/src/entry/mutex_tokio.rs index c5363e51a..84f292dca 100644 --- a/packages/torrent-repository/src/entry/mutex_tokio.rs +++ b/packages/torrent-repository/src/entry/mutex_tokio.rs @@ -6,9 +6,9 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use super::{Entry, EntryAsync}; -use crate::{EntryMutexTokio, EntrySingle}; +use crate::{BTreeMapPeerList, EntryMutexTokio, EntrySingle, SkipMapPeerList}; -impl EntryAsync for EntryMutexTokio { +impl EntryAsync for EntryMutexTokio { async fn get_swarm_metadata(&self) -> SwarmMetadata { self.lock().await.get_swarm_metadata() } @@ -42,8 +42,42 @@ impl EntryAsync for EntryMutexTokio { } } -impl From for EntryMutexTokio { - fn from(entry: EntrySingle) -> Self { +impl EntryAsync for EntryMutexTokio { + async fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().await.get_swarm_metadata() + } + + async fn check_good(self, policy: &TrackerPolicy) -> bool { + self.lock().await.is_good(policy) + } + + async fn peers_is_empty(&self) -> bool { + self.lock().await.peers_is_empty() + } + + async fn get_peers_len(&self) -> usize { + self.lock().await.get_peers_len() + } + + async fn get_peers(&self, limit: Option) -> Vec> { + self.lock().await.get_peers(limit) + } + + async fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.lock().await.get_peers_for_client(client, limit) + } + + async fn upsert_peer(self, peer: &peer::Peer) -> bool { + self.lock().await.upsert_peer(peer) + } + + async fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) { + self.lock().await.remove_inactive_peers(current_cutoff); + } +} + +impl From> for EntryMutexTokio { + fn from(entry: EntrySingle) -> Self { Arc::new(tokio::sync::Mutex::new(entry)) } } diff --git a/packages/torrent-repository/src/entry/single.rs b/packages/torrent-repository/src/entry/single.rs index a38b54023..1c5b44229 100644 --- a/packages/torrent-repository/src/entry/single.rs +++ b/packages/torrent-repository/src/entry/single.rs @@ -3,14 +3,14 @@ use std::sync::Arc; use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::announce_event::AnnounceEvent; -use torrust_tracker_primitives::peer::{self}; +use torrust_tracker_primitives::peer::{self, ReadInfo}; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::DurationSinceUnixEpoch; use super::Entry; -use crate::EntrySingle; +use crate::{BTreeMapPeerList, EntrySingle, SkipMapPeerList}; -impl Entry for EntrySingle { +impl Entry for EntrySingle { #[allow(clippy::cast_possible_truncation)] fn get_swarm_metadata(&self) -> SwarmMetadata { let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32; @@ -98,3 +98,108 @@ impl Entry for EntrySingle { .retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff); } } + +impl Entry for EntrySingle { + #[allow(clippy::cast_possible_truncation)] + fn get_swarm_metadata(&self) -> SwarmMetadata { + let complete: u32 = self.peers.iter().filter(|entry| entry.value().is_seeder()).count() as u32; + let incomplete: u32 = self.peers.len() as u32 - complete; + + SwarmMetadata { + downloaded: self.downloaded, + complete, + incomplete, + } + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + if policy.persistent_torrent_completed_stat && self.downloaded > 0 { + return true; + } + + if policy.remove_peerless_torrents && self.peers.is_empty() { + return false; + } + + true + } + + fn peers_is_empty(&self) -> bool { + self.peers.is_empty() + } + + fn get_peers_len(&self) -> usize { + self.peers.len() + } + fn get_peers(&self, limit: Option) -> Vec> { + match limit { + Some(limit) => self.peers.iter().take(limit).map(|entry| entry.value().clone()).collect(), + None => self.peers.iter().map(|entry| entry.value().clone()).collect(), + } + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + match limit { + Some(limit) => self + .peers + .iter() + // Take peers which are not the client peer + .filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client) + // Limit the number of peers on the result + .take(limit) + .map(|entry| entry.value().clone()) + .collect(), + None => self + .peers + .iter() + // Take peers which are not the client peer + .filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client) + .map(|entry| entry.value().clone()) + .collect(), + } + } + + fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { + let mut downloaded_stats_updated: bool = false; + + match peer::ReadInfo::get_event(peer) { + AnnounceEvent::Stopped => { + drop(self.peers.remove(&peer::ReadInfo::get_id(peer))); + } + AnnounceEvent::Completed => { + let previous = self.peers.get(&peer.get_id()); + + let increase_downloads = match previous { + Some(entry) => { + // Don't count if peer was already completed. + entry.value().event != AnnounceEvent::Completed + } + None => { + // Don't count if peer was not previously known + false + } + }; + + self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer)); + + if increase_downloads { + self.downloaded += 1; + downloaded_stats_updated = true; + } + } + _ => { + drop(self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer))); + } + } + + downloaded_stats_updated + } + + fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.peers { + if entry.value().get_updated() >= current_cutoff { + entry.remove(); + } + } + } +} diff --git a/packages/torrent-repository/src/entry/skip_map.rs b/packages/torrent-repository/src/entry/skip_map.rs deleted file mode 100644 index 0a173369b..000000000 --- a/packages/torrent-repository/src/entry/skip_map.rs +++ /dev/null @@ -1,119 +0,0 @@ -use std::net::SocketAddr; -use std::sync::Arc; - -use torrust_tracker_configuration::TrackerPolicy; -use torrust_tracker_primitives::announce_event::AnnounceEvent; -use torrust_tracker_primitives::peer::{self, ReadInfo}; -use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; -use torrust_tracker_primitives::DurationSinceUnixEpoch; - -use super::Entry; -use crate::EntrySkipMap; - -impl Entry for EntrySkipMap { - #[allow(clippy::cast_possible_truncation)] - fn get_swarm_metadata(&self) -> SwarmMetadata { - let complete: u32 = self.peers.iter().filter(|entry| entry.value().is_seeder()).count() as u32; - let incomplete: u32 = self.peers.len() as u32 - complete; - - SwarmMetadata { - downloaded: self.downloaded, - complete, - incomplete, - } - } - - fn is_good(&self, policy: &TrackerPolicy) -> bool { - if policy.persistent_torrent_completed_stat && self.downloaded > 0 { - return true; - } - - if policy.remove_peerless_torrents && self.peers.is_empty() { - return false; - } - - true - } - - fn peers_is_empty(&self) -> bool { - self.peers.is_empty() - } - - fn get_peers_len(&self) -> usize { - self.peers.len() - } - fn get_peers(&self, limit: Option) -> Vec> { - match limit { - Some(limit) => self.peers.iter().take(limit).map(|entry| entry.value().clone()).collect(), - None => self.peers.iter().map(|entry| entry.value().clone()).collect(), - } - } - - fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { - match limit { - Some(limit) => self - .peers - .iter() - // Take peers which are not the client peer - .filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client) - // Limit the number of peers on the result - .take(limit) - .map(|entry| entry.value().clone()) - .collect(), - None => self - .peers - .iter() - // Take peers which are not the client peer - .filter(|entry| peer::ReadInfo::get_address(entry.value().as_ref()) != *client) - .map(|entry| entry.value().clone()) - .collect(), - } - } - - fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { - let mut downloaded_stats_updated: bool = false; - - match peer::ReadInfo::get_event(peer) { - AnnounceEvent::Stopped => { - drop(self.peers.remove(&peer::ReadInfo::get_id(peer))); - } - AnnounceEvent::Completed => { - let previous = self.peers.get(&peer.get_id()); - - let increase_downloads = match previous { - Some(entry) => { - // Don't count if peer was already completed. - entry.value().event != AnnounceEvent::Completed - } - None => { - // Don't count if peer was not previously known - false - } - }; - - self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer)); - - if increase_downloads { - self.downloaded += 1; - downloaded_stats_updated = true; - } - } - _ => { - drop(self.peers.insert(peer::ReadInfo::get_id(peer), Arc::new(*peer))); - } - } - - downloaded_stats_updated - } - - fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { - //self.peers - // .retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff); - - for entry in &self.peers { - if entry.value().get_updated() >= current_cutoff { - entry.remove(); - } - } - } -} diff --git a/packages/torrent-repository/src/entry/skip_map_mutex_std.rs b/packages/torrent-repository/src/entry/skip_map_mutex_std.rs deleted file mode 100644 index 79a4fdce3..000000000 --- a/packages/torrent-repository/src/entry/skip_map_mutex_std.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::net::SocketAddr; -use std::sync::Arc; - -use torrust_tracker_configuration::TrackerPolicy; -use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; - -use super::{Entry, EntrySync}; -use crate::{EntrySkipMap, EntrySkipMapMutexStd}; - -impl EntrySync for EntrySkipMapMutexStd { - fn get_swarm_metadata(&self) -> SwarmMetadata { - self.lock().expect("it should get a lock").get_swarm_metadata() - } - - fn is_good(&self, policy: &TrackerPolicy) -> bool { - self.lock().expect("it should get a lock").is_good(policy) - } - - fn peers_is_empty(&self) -> bool { - self.lock().expect("it should get a lock").peers_is_empty() - } - - fn get_peers_len(&self) -> usize { - self.lock().expect("it should get a lock").get_peers_len() - } - - fn get_peers(&self, limit: Option) -> Vec> { - self.lock().expect("it should get lock").get_peers(limit) - } - - fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { - self.lock().expect("it should get lock").get_peers_for_client(client, limit) - } - - fn upsert_peer(&self, peer: &peer::Peer) -> bool { - self.lock().expect("it should lock the entry").upsert_peer(peer) - } - - fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { - self.lock() - .expect("it should lock the entry") - .remove_inactive_peers(current_cutoff); - } -} - -impl From for EntrySkipMapMutexStd { - fn from(entry: EntrySkipMap) -> Self { - Arc::new(std::sync::Mutex::new(entry)) - } -} diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index d01818af3..ba70b4463 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -1,36 +1,46 @@ +use std::collections::BTreeMap; use std::sync::Arc; +use crossbeam_skiplist::SkipMap; use repository::dash_map_mutex_std::XacrimonDashMap; use repository::rw_lock_std::RwLockStd; use repository::rw_lock_tokio::RwLockTokio; use repository::skip_map_mutex_std::CrossbeamSkipList; use torrust_tracker_clock::clock; +use torrust_tracker_primitives::peer; pub mod entry; pub mod repository; -// Entry +// Peer List -pub type EntrySingle = entry::Torrent; -pub type EntryMutexStd = Arc>; -pub type EntryMutexTokio = Arc>; +pub type BTreeMapPeerList = BTreeMap>; +pub type SkipMapPeerList = SkipMap>; -pub type EntrySkipMap = entry::SkipMapTorrent; -pub type EntrySkipMapMutexStd = Arc>; +// Torrent Entry + +pub type EntrySingle = entry::Torrent; +pub type EntryMutexStd = Arc>>; +pub type EntryMutexTokio = Arc>>; // Repos -pub type TorrentsRwLockStd = RwLockStd; -pub type TorrentsRwLockStdMutexStd = RwLockStd; -pub type TorrentsRwLockStdMutexTokio = RwLockStd; -pub type TorrentsRwLockTokio = RwLockTokio; -pub type TorrentsRwLockTokioMutexStd = RwLockTokio; -pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; +// Torrent repo and peer list: BTreeMap +pub type TorrentsRwLockStd = RwLockStd>; +pub type TorrentsRwLockStdMutexStd = RwLockStd>; +pub type TorrentsRwLockStdMutexTokio = RwLockStd>; +pub type TorrentsRwLockTokio = RwLockTokio>; +pub type TorrentsRwLockTokioMutexStd = RwLockTokio>; +pub type TorrentsRwLockTokioMutexTokio = RwLockTokio>; + +// Torrent repo: SkipMap; Peer list: BTreeMap +pub type TorrentsSkipMapMutexStd = CrossbeamSkipList>; -pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; -pub type TorrentsDashMapMutexStd = XacrimonDashMap; +// Torrent repo: DashMap; Peer list: BTreeMap +pub type TorrentsDashMapMutexStd = XacrimonDashMap>; -pub type TorrentsSkipMapMutexStdSkipMap = CrossbeamSkipList; +// Torrent repo and peer list: SkipMap +pub type TorrentsSkipMapMutexStdSkipMap = CrossbeamSkipList>; /// This code needs to be copied into each crate. /// Working version, for production. diff --git a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs index b398b09dc..517a9b31f 100644 --- a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs @@ -11,17 +11,17 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle}; #[derive(Default, Debug)] pub struct XacrimonDashMap { pub torrents: DashMap, } -impl Repository for XacrimonDashMap +impl Repository> for XacrimonDashMap> where - EntryMutexStd: EntrySync, - EntrySingle: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { if let Some(entry) = self.torrents.get(info_hash) { @@ -38,7 +38,7 @@ where self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let maybe_entry = self.torrents.get(key); maybe_entry.map(|entry| entry.clone()) } @@ -57,7 +57,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { match pagination { Some(pagination) => self .torrents @@ -92,7 +92,7 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { self.torrents.remove(key).map(|(_key, value)| value.clone()) } diff --git a/packages/torrent-repository/src/repository/rw_lock_std.rs b/packages/torrent-repository/src/repository/rw_lock_std.rs index af48428e4..2f6ce2fa0 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std.rs @@ -9,7 +9,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::Entry; -use crate::{EntrySingle, TorrentsRwLockStd}; +use crate::{BTreeMapPeerList, EntrySingle, TorrentsRwLockStd}; #[derive(Default, Debug)] pub struct RwLockStd { @@ -28,24 +28,28 @@ impl RwLockStd { } impl TorrentsRwLockStd { - fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + fn get_torrents<'a>( + &'a self, + ) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().expect("it should get the read lock") } - fn get_torrents_mut<'a>(&'a self) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + fn get_torrents_mut<'a>( + &'a self, + ) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().expect("it should get the write lock") } } -impl Repository for TorrentsRwLockStd +impl Repository> for TorrentsRwLockStd where - EntrySingle: Entry, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let mut db = self.get_torrents_mut(); @@ -59,7 +63,7 @@ where self.get(info_hash).map(|entry| entry.get_swarm_metadata()) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents(); db.get(key).cloned() } @@ -78,7 +82,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { let db = self.get_torrents(); match pagination { @@ -110,7 +114,7 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut(); db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs index 74cdc4475..f4e95fb6b 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs @@ -10,28 +10,32 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle, TorrentsRwLockStdMutexStd}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, TorrentsRwLockStdMutexStd}; impl TorrentsRwLockStdMutexStd { - fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + fn get_torrents<'a>( + &'a self, + ) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().expect("unable to get torrent list") } - fn get_torrents_mut<'a>(&'a self) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + fn get_torrents_mut<'a>( + &'a self, + ) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().expect("unable to get writable torrent list") } } -impl Repository for TorrentsRwLockStdMutexStd +impl Repository> for TorrentsRwLockStdMutexStd where - EntryMutexStd: EntrySync, - EntrySingle: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().get(info_hash).cloned(); @@ -53,7 +57,7 @@ where .map(super::super::entry::EntrySync::get_swarm_metadata) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents(); db.get(key).cloned() } @@ -72,7 +76,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { let db = self.get_torrents(); match pagination { @@ -107,7 +111,7 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut(); db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs index 83ac02c91..c7202f180 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs @@ -14,28 +14,32 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::RepositoryAsync; use crate::entry::{Entry, EntryAsync}; -use crate::{EntryMutexTokio, EntrySingle, TorrentsRwLockStdMutexTokio}; +use crate::{BTreeMapPeerList, EntryMutexTokio, EntrySingle, TorrentsRwLockStdMutexTokio}; impl TorrentsRwLockStdMutexTokio { - fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + fn get_torrents<'a>( + &'a self, + ) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().expect("unable to get torrent list") } - fn get_torrents_mut<'a>(&'a self) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + fn get_torrents_mut<'a>( + &'a self, + ) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().expect("unable to get writable torrent list") } } -impl RepositoryAsync for TorrentsRwLockStdMutexTokio +impl RepositoryAsync> for TorrentsRwLockStdMutexTokio where - EntryMutexTokio: EntryAsync, - EntrySingle: Entry, + EntryMutexTokio: EntryAsync, + EntrySingle: Entry, { async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().get(info_hash).cloned(); @@ -60,12 +64,12 @@ where } } - async fn get(&self, key: &InfoHash) -> Option { + async fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents(); db.get(key).cloned() } - async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexTokio)> { + async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexTokio)> { let db = self.get_torrents(); match pagination { @@ -116,7 +120,7 @@ where } } - async fn remove(&self, key: &InfoHash) -> Option { + async fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut(); db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio.rs index b95f1e31e..56f9b95a8 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio.rs @@ -9,7 +9,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::RepositoryAsync; use crate::entry::Entry; -use crate::{EntrySingle, TorrentsRwLockTokio}; +use crate::{BTreeMapPeerList, EntrySingle, TorrentsRwLockTokio}; #[derive(Default, Debug)] pub struct RwLockTokio { @@ -30,26 +30,28 @@ impl RwLockTokio { } impl TorrentsRwLockTokio { - async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + async fn get_torrents<'a>( + &'a self, + ) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().await } async fn get_torrents_mut<'a>( &'a self, - ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().await } } -impl RepositoryAsync for TorrentsRwLockTokio +impl RepositoryAsync> for TorrentsRwLockTokio where - EntrySingle: Entry, + EntrySingle: Entry, { async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let mut db = self.get_torrents_mut().await; @@ -63,12 +65,12 @@ where self.get(info_hash).await.map(|entry| entry.get_swarm_metadata()) } - async fn get(&self, key: &InfoHash) -> Option { + async fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents().await; db.get(key).cloned() } - async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { + async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { let db = self.get_torrents().await; match pagination { @@ -114,7 +116,7 @@ where } } - async fn remove(&self, key: &InfoHash) -> Option { + async fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut().await; db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs index bde959940..2827e5933 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs @@ -10,30 +10,32 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::RepositoryAsync; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle, TorrentsRwLockTokioMutexStd}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, TorrentsRwLockTokioMutexStd}; impl TorrentsRwLockTokioMutexStd { - async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + async fn get_torrents<'a>( + &'a self, + ) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().await } async fn get_torrents_mut<'a>( &'a self, - ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().await } } -impl RepositoryAsync for TorrentsRwLockTokioMutexStd +impl RepositoryAsync> for TorrentsRwLockTokioMutexStd where - EntryMutexStd: EntrySync, - EntrySingle: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); @@ -53,12 +55,12 @@ where self.get(info_hash).await.map(|entry| entry.get_swarm_metadata()) } - async fn get(&self, key: &InfoHash) -> Option { + async fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents().await; db.get(key).cloned() } - async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { let db = self.get_torrents().await; match pagination { @@ -107,7 +109,7 @@ where } } - async fn remove(&self, key: &InfoHash) -> Option { + async fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut().await; db.remove(key) } diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs index 1d002e317..3809a8492 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs @@ -10,30 +10,32 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::RepositoryAsync; use crate::entry::{Entry, EntryAsync}; -use crate::{EntryMutexTokio, EntrySingle, TorrentsRwLockTokioMutexTokio}; +use crate::{BTreeMapPeerList, EntryMutexTokio, EntrySingle, TorrentsRwLockTokioMutexTokio}; impl TorrentsRwLockTokioMutexTokio { - async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap> + async fn get_torrents<'a>( + &'a self, + ) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.read().await } async fn get_torrents_mut<'a>( &'a self, - ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap> + ) -> tokio::sync::RwLockWriteGuard<'a, std::collections::BTreeMap>> where - std::collections::BTreeMap: 'a, + std::collections::BTreeMap>: 'a, { self.torrents.write().await } } -impl RepositoryAsync for TorrentsRwLockTokioMutexTokio +impl RepositoryAsync> for TorrentsRwLockTokioMutexTokio where - EntryMutexTokio: EntryAsync, - EntrySingle: Entry, + EntryMutexTokio: EntryAsync, + EntrySingle: Entry, { async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); @@ -56,12 +58,12 @@ where } } - async fn get(&self, key: &InfoHash) -> Option { + async fn get(&self, key: &InfoHash) -> Option> { let db = self.get_torrents().await; db.get(key).cloned() } - async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexTokio)> { + async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexTokio)> { let db = self.get_torrents().await; match pagination { @@ -110,7 +112,7 @@ where } } - async fn remove(&self, key: &InfoHash) -> Option { + async fn remove(&self, key: &InfoHash) -> Option> { let mut db = self.get_torrents_mut().await; db.remove(key) } diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index fa4845e54..e3c5b0977 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -11,17 +11,17 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle, EntrySkipMap, EntrySkipMapMutexStd}; +use crate::{BTreeMapPeerList, EntryMutexStd, EntrySingle, SkipMapPeerList}; #[derive(Default, Debug)] pub struct CrossbeamSkipList { pub torrents: SkipMap, } -impl Repository for CrossbeamSkipList +impl Repository> for CrossbeamSkipList> where - EntryMutexStd: EntrySync, - EntrySingle: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); @@ -32,7 +32,7 @@ where self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let maybe_entry = self.torrents.get(key); maybe_entry.map(|entry| entry.value().clone()) } @@ -51,7 +51,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { match pagination { Some(pagination) => self .torrents @@ -88,7 +88,7 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { self.torrents.remove(key).map(|entry| entry.value().clone()) } @@ -109,10 +109,10 @@ where } } -impl Repository for CrossbeamSkipList +impl Repository> for CrossbeamSkipList> where - EntrySkipMapMutexStd: EntrySync, - EntrySkipMap: Entry, + EntryMutexStd: EntrySync, + EntrySingle: Entry, { fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); @@ -123,7 +123,7 @@ where self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) } - fn get(&self, key: &InfoHash) -> Option { + fn get(&self, key: &InfoHash) -> Option> { let maybe_entry = self.torrents.get(key); maybe_entry.map(|entry| entry.value().clone()) } @@ -142,7 +142,7 @@ where metrics } - fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySkipMapMutexStd)> { + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { match pagination { Some(pagination) => self .torrents @@ -165,8 +165,8 @@ where continue; } - let entry = EntrySkipMapMutexStd::new( - EntrySkipMap { + let entry = EntryMutexStd::new( + EntrySingle { peers: SkipMap::default(), downloaded: *completed, } @@ -179,7 +179,7 @@ where } } - fn remove(&self, key: &InfoHash) -> Option { + fn remove(&self, key: &InfoHash) -> Option> { self.torrents.remove(key).map(|entry| entry.value().clone()) } diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 7c245fe04..fc5160cd4 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -6,8 +6,9 @@ use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; use torrust_tracker_torrent_repository::{ - EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, - TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, + BTreeMapPeerList, EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, + TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, + TorrentsSkipMapMutexStd, }; #[derive(Debug)] @@ -49,7 +50,7 @@ impl Repo { } } - pub(crate) async fn get(&self, key: &InfoHash) -> Option { + pub(crate) async fn get(&self, key: &InfoHash) -> Option> { match self { Repo::RwLockStd(repo) => repo.get(key), Repo::RwLockStdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), @@ -75,7 +76,7 @@ impl Repo { } } - pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { + pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> { match self { Repo::RwLockStd(repo) => repo.get_paginated(pagination), Repo::RwLockStdMutexStd(repo) => repo @@ -84,7 +85,7 @@ impl Repo { .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), Repo::RwLockStdMutexTokio(repo) => { - let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; + let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; for (i, t) in repo.get_paginated(pagination).await { v.push((i, t.lock().await.clone())); @@ -99,7 +100,7 @@ impl Repo { .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), Repo::RwLockTokioMutexTokio(repo) => { - let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; + let mut v: Vec<(InfoHash, EntrySingle)> = vec![]; for (i, t) in repo.get_paginated(pagination).await { v.push((i, t.lock().await.clone())); @@ -132,7 +133,7 @@ impl Repo { } } - pub(crate) async fn remove(&self, key: &InfoHash) -> Option { + pub(crate) async fn remove(&self, key: &InfoHash) -> Option> { match self { Repo::RwLockStd(repo) => repo.remove(key), Repo::RwLockStdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), @@ -171,7 +172,11 @@ impl Repo { } } - pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option { + pub(crate) async fn insert( + &self, + info_hash: &InfoHash, + torrent: EntrySingle, + ) -> Option> { match self { Repo::RwLockStd(repo) => { repo.write().insert(*info_hash, torrent); diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs index c0699479e..09a18e869 100644 --- a/packages/torrent-repository/tests/common/torrent.rs +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -5,13 +5,13 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _}; -use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; +use torrust_tracker_torrent_repository::{BTreeMapPeerList, EntryMutexStd, EntryMutexTokio, EntrySingle}; #[derive(Debug, Clone)] pub(crate) enum Torrent { - Single(EntrySingle), - MutexStd(EntryMutexStd), - MutexTokio(EntryMutexTokio), + Single(EntrySingle), + MutexStd(EntryMutexStd), + MutexTokio(EntryMutexTokio), } impl Torrent { diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index fde34467e..c31c9c32a 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -13,7 +13,7 @@ use torrust_tracker_torrent_repository::repository::dash_map_mutex_std::Xacrimon use torrust_tracker_torrent_repository::repository::rw_lock_std::RwLockStd; use torrust_tracker_torrent_repository::repository::rw_lock_tokio::RwLockTokio; use torrust_tracker_torrent_repository::repository::skip_map_mutex_std::CrossbeamSkipList; -use torrust_tracker_torrent_repository::EntrySingle; +use torrust_tracker_torrent_repository::{BTreeMapPeerList, EntrySingle}; use crate::common::repo::Repo; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; @@ -58,7 +58,7 @@ fn dash_map_std() -> Repo { Repo::DashMapMutexStd(XacrimonDashMap::default()) } -type Entries = Vec<(InfoHash, EntrySingle)>; +type Entries = Vec<(InfoHash, EntrySingle)>; #[fixture] fn empty() -> Entries { @@ -125,7 +125,7 @@ fn three() -> Entries { #[fixture] fn many_out_of_order() -> Entries { - let mut entries: HashSet<(InfoHash, EntrySingle)> = HashSet::default(); + let mut entries: HashSet<(InfoHash, EntrySingle)> = HashSet::default(); for i in 0..408 { let mut entry = EntrySingle::default(); @@ -140,7 +140,7 @@ fn many_out_of_order() -> Entries { #[fixture] fn many_hashed_in_order() -> Entries { - let mut entries: BTreeMap = BTreeMap::default(); + let mut entries: BTreeMap> = BTreeMap::default(); for i in 0..408 { let mut entry = EntrySingle::default();