diff --git a/Cargo.lock b/Cargo.lock index f13ed1482..dc6db21c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3939,6 +3939,7 @@ dependencies = [ "log", "mockall", "multimap", + "parking_lot", "percent-encoding", "r2d2", "r2d2_mysql", @@ -4037,6 +4038,7 @@ dependencies = [ "crossbeam-skiplist", "dashmap", "futures", + "parking_lot", "rstest", "tokio", "torrust-tracker-clock", diff --git a/Cargo.toml b/Cargo.toml index ef0c39d4b..94889cbf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ hyper = "1" lazy_static = "1" log = { version = "0", features = ["release_max_level_info"] } multimap = "0" +parking_lot = "0.12.1" percent-encoding = "2" r2d2 = "0" r2d2_mysql = "24" @@ -78,7 +79,7 @@ url = "2" uuid = { version = "1", features = ["v4"] } [package.metadata.cargo-machete] -ignored = ["serde_bytes", "crossbeam-skiplist", "dashmap"] +ignored = ["serde_bytes", "crossbeam-skiplist", "dashmap", "parking_lot"] [dev-dependencies] local-ip-address = "0" diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index 6bc8bfcdd..937ec11e2 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -19,6 +19,7 @@ version.workspace = true crossbeam-skiplist = "0.1" dashmap = "5.5.3" futures = "0.3.29" +parking_lot = "0.12.1" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" } torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" } diff --git a/packages/torrent-repository/benches/repository_benchmark.rs b/packages/torrent-repository/benches/repository_benchmark.rs index 58cd70d9a..4e50f1454 100644 --- a/packages/torrent-repository/benches/repository_benchmark.rs +++ b/packages/torrent-repository/benches/repository_benchmark.rs @@ -5,7 +5,8 @@ mod helpers; use criterion::{criterion_group, criterion_main, Criterion}; use torrust_tracker_torrent_repository::{ TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, - TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, + TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexParkingLot, TorrentsSkipMapMutexStd, + TorrentsSkipMapRwLockParkingLot, }; use crate::helpers::{asyn, sync}; @@ -49,6 +50,14 @@ fn add_one_torrent(c: &mut Criterion) { b.iter_custom(sync::add_one_torrent::); }); + group.bench_function("SkipMapMutexParkingLot", |b| { + b.iter_custom(sync::add_one_torrent::); + }); + + group.bench_function("SkipMapRwLockParkingLot", |b| { + b.iter_custom(sync::add_one_torrent::); + }); + group.bench_function("DashMapMutexStd", |b| { b.iter_custom(sync::add_one_torrent::); }); @@ -102,6 +111,16 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexParkingLot", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + + group.bench_function("SkipMapRwLockParkingLot", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.bench_function("DashMapMutexStd", |b| { b.to_async(&rt) .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); @@ -156,6 +175,16 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexParkingLot", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); + }); + + group.bench_function("SkipMapRwLockParkingLot", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); + }); + group.bench_function("DashMapMutexStd", |b| { b.to_async(&rt) .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); @@ -211,6 +240,18 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexParkingLot", |b| { + b.to_async(&rt).iter_custom(|iters| { + sync::update_multiple_torrents_in_parallel::(&rt, iters, None) + }); + }); + + group.bench_function("SkipMapRwLockParkingLot", |b| { + b.to_async(&rt).iter_custom(|iters| { + sync::update_multiple_torrents_in_parallel::(&rt, iters, None) + }); + }); + group.bench_function("DashMapMutexStd", |b| { b.to_async(&rt) .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index 40fa4efd5..b811d3262 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -8,9 +8,11 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use self::peer_list::PeerList; +pub mod mutex_parking_lot; pub mod mutex_std; pub mod mutex_tokio; pub mod peer_list; +pub mod rw_lock_parking_lot; pub mod single; pub trait Entry { diff --git a/packages/torrent-repository/src/entry/mutex_parking_lot.rs b/packages/torrent-repository/src/entry/mutex_parking_lot.rs new file mode 100644 index 000000000..4f3921ea7 --- /dev/null +++ b/packages/torrent-repository/src/entry/mutex_parking_lot.rs @@ -0,0 +1,49 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; + +use super::{Entry, EntrySync}; +use crate::{EntryMutexParkingLot, EntrySingle}; + +impl EntrySync for EntryMutexParkingLot { + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().get_swarm_metadata() + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + self.lock().is_good(policy) + } + + fn peers_is_empty(&self) -> bool { + self.lock().peers_is_empty() + } + + fn get_peers_len(&self) -> usize { + self.lock().get_peers_len() + } + + fn get_peers(&self, limit: Option) -> Vec> { + self.lock().get_peers(limit) + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.lock().get_peers_for_client(client, limit) + } + + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.lock().upsert_peer(peer) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + self.lock().remove_inactive_peers(current_cutoff); + } +} + +impl From for EntryMutexParkingLot { + fn from(entry: EntrySingle) -> Self { + Arc::new(parking_lot::Mutex::new(entry)) + } +} diff --git a/packages/torrent-repository/src/entry/rw_lock_parking_lot.rs b/packages/torrent-repository/src/entry/rw_lock_parking_lot.rs new file mode 100644 index 000000000..ef0e958d5 --- /dev/null +++ b/packages/torrent-repository/src/entry/rw_lock_parking_lot.rs @@ -0,0 +1,49 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; + +use super::{Entry, EntrySync}; +use crate::{EntryRwLockParkingLot, EntrySingle}; + +impl EntrySync for EntryRwLockParkingLot { + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.read().get_swarm_metadata() + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + self.read().is_good(policy) + } + + fn peers_is_empty(&self) -> bool { + self.read().peers_is_empty() + } + + fn get_peers_len(&self) -> usize { + self.read().get_peers_len() + } + + fn get_peers(&self, limit: Option) -> Vec> { + self.read().get_peers(limit) + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.read().get_peers_for_client(client, limit) + } + + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.write().upsert_peer(peer) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + self.write().remove_inactive_peers(current_cutoff); + } +} + +impl From for EntryRwLockParkingLot { + fn from(entry: EntrySingle) -> Self { + Arc::new(parking_lot::RwLock::new(entry)) + } +} diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index 7a6d209b9..a8955808e 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -9,9 +9,15 @@ use torrust_tracker_clock::clock; pub mod entry; pub mod repository; +// Repo Entries + pub type EntrySingle = entry::Torrent; pub type EntryMutexStd = Arc>; pub type EntryMutexTokio = Arc>; +pub type EntryMutexParkingLot = Arc>; +pub type EntryRwLockParkingLot = Arc>; + +// Repos pub type TorrentsRwLockStd = RwLockStd; pub type TorrentsRwLockStdMutexStd = RwLockStd; @@ -21,6 +27,9 @@ pub type TorrentsRwLockTokioMutexStd = RwLockTokio; pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; +pub type TorrentsSkipMapMutexParkingLot = CrossbeamSkipList; +pub type TorrentsSkipMapRwLockParkingLot = CrossbeamSkipList; + pub type TorrentsDashMapMutexStd = XacrimonDashMap; /// This code needs to be copied into each crate. diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index bc9ecd066..9960b0c30 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -11,7 +11,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::peer_list::PeerList; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle}; +use crate::{EntryMutexParkingLot, EntryMutexStd, EntryRwLockParkingLot, EntrySingle}; #[derive(Default, Debug)] pub struct CrossbeamSkipList { @@ -108,3 +108,185 @@ where } } } + +impl Repository for CrossbeamSkipList +where + EntryRwLockParkingLot: EntrySync, + EntrySingle: Entry, +{ + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); + entry.value().upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) + } + + fn get(&self, key: &InfoHash) -> Option { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.value().clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().read().get_swarm_metadata(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryRwLockParkingLot)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntryRwLockParkingLot::new( + EntrySingle { + swarm: PeerList::default(), + downloaded: *completed, + } + .into(), + ); + + // Since SkipMap is lock-free the torrent could have been inserted + // after checking if it exists. + self.torrents.get_or_insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option { + self.torrents.remove(key).map(|entry| entry.value().clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + for entry in &self.torrents { + if entry.value().is_good(policy) { + continue; + } + + entry.remove(); + } + } +} + +impl Repository for CrossbeamSkipList +where + EntryMutexParkingLot: EntrySync, + EntrySingle: Entry, +{ + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); + entry.value().upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) + } + + fn get(&self, key: &InfoHash) -> Option { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.value().clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().lock().get_swarm_metadata(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexParkingLot)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntryMutexParkingLot::new( + EntrySingle { + swarm: PeerList::default(), + downloaded: *completed, + } + .into(), + ); + + // Since SkipMap is lock-free the torrent could have been inserted + // after checking if it exists. + self.torrents.get_or_insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option { + self.torrents.remove(key).map(|entry| entry.value().clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + for entry in &self.torrents { + if entry.value().is_good(policy) { + continue; + } + + entry.remove(); + } + } +} diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 7c245fe04..f317d0d17 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -7,7 +7,8 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; use torrust_tracker_torrent_repository::{ EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, - TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, + TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexParkingLot, + TorrentsSkipMapMutexStd, TorrentsSkipMapRwLockParkingLot, }; #[derive(Debug)] @@ -19,6 +20,8 @@ pub(crate) enum Repo { RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd), RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio), SkipMapMutexStd(TorrentsSkipMapMutexStd), + SkipMapMutexParkingLot(TorrentsSkipMapMutexParkingLot), + SkipMapRwLockParkingLot(TorrentsSkipMapRwLockParkingLot), DashMapMutexStd(TorrentsDashMapMutexStd), } @@ -32,6 +35,8 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer).await, Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::SkipMapMutexParkingLot(repo) => repo.upsert_peer(info_hash, peer), + Repo::SkipMapRwLockParkingLot(repo) => repo.upsert_peer(info_hash, peer), Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), } } @@ -45,6 +50,8 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.get_swarm_metadata(info_hash).await, Repo::RwLockTokioMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await, Repo::SkipMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::SkipMapMutexParkingLot(repo) => repo.get_swarm_metadata(info_hash), + Repo::SkipMapRwLockParkingLot(repo) => repo.get_swarm_metadata(info_hash), Repo::DashMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), } } @@ -58,6 +65,8 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), + Repo::SkipMapMutexParkingLot(repo) => Some(repo.get(key)?.lock().clone()), + Repo::SkipMapRwLockParkingLot(repo) => Some(repo.get(key)?.read().clone()), Repo::DashMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), } } @@ -71,6 +80,8 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await, Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await, Repo::SkipMapMutexStd(repo) => repo.get_metrics(), + Repo::SkipMapMutexParkingLot(repo) => repo.get_metrics(), + Repo::SkipMapRwLockParkingLot(repo) => repo.get_metrics(), Repo::DashMapMutexStd(repo) => repo.get_metrics(), } } @@ -111,6 +122,16 @@ impl Repo { .iter() .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), + Repo::SkipMapMutexParkingLot(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.lock().clone())) + .collect(), + Repo::SkipMapRwLockParkingLot(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.read().clone())) + .collect(), Repo::DashMapMutexStd(repo) => repo .get_paginated(pagination) .iter() @@ -128,6 +149,8 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents), + Repo::SkipMapMutexParkingLot(repo) => repo.import_persistent(persistent_torrents), + Repo::SkipMapRwLockParkingLot(repo) => repo.import_persistent(persistent_torrents), Repo::DashMapMutexStd(repo) => repo.import_persistent(persistent_torrents), } } @@ -141,6 +164,8 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), + Repo::SkipMapMutexParkingLot(repo) => Some(repo.remove(key)?.lock().clone()), + Repo::SkipMapRwLockParkingLot(repo) => Some(repo.remove(key)?.write().clone()), Repo::DashMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), } } @@ -154,6 +179,8 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::SkipMapMutexParkingLot(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::SkipMapRwLockParkingLot(repo) => repo.remove_inactive_peers(current_cutoff), Repo::DashMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), } } @@ -167,6 +194,8 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy), + Repo::SkipMapMutexParkingLot(repo) => repo.remove_peerless_torrents(policy), + Repo::SkipMapRwLockParkingLot(repo) => repo.remove_peerless_torrents(policy), Repo::DashMapMutexStd(repo) => repo.remove_peerless_torrents(policy), } } @@ -194,6 +223,12 @@ impl Repo { Repo::SkipMapMutexStd(repo) => { repo.torrents.insert(*info_hash, torrent.into()); } + Repo::SkipMapMutexParkingLot(repo) => { + repo.torrents.insert(*info_hash, torrent.into()); + } + Repo::SkipMapRwLockParkingLot(repo) => { + repo.torrents.insert(*info_hash, torrent.into()); + } Repo::DashMapMutexStd(repo) => { repo.torrents.insert(*info_hash, torrent.into()); } diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs index c0699479e..abcf5525e 100644 --- a/packages/torrent-repository/tests/common/torrent.rs +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -5,13 +5,17 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _}; -use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; +use torrust_tracker_torrent_repository::{ + EntryMutexParkingLot, EntryMutexStd, EntryMutexTokio, EntryRwLockParkingLot, EntrySingle, +}; #[derive(Debug, Clone)] pub(crate) enum Torrent { Single(EntrySingle), MutexStd(EntryMutexStd), MutexTokio(EntryMutexTokio), + MutexParkingLot(EntryMutexParkingLot), + RwLockParkingLot(EntryRwLockParkingLot), } impl Torrent { @@ -20,6 +24,8 @@ impl Torrent { Torrent::Single(entry) => entry.get_swarm_metadata(), Torrent::MutexStd(entry) => entry.get_swarm_metadata(), Torrent::MutexTokio(entry) => entry.clone().get_swarm_metadata().await, + Torrent::MutexParkingLot(entry) => entry.clone().get_swarm_metadata(), + Torrent::RwLockParkingLot(entry) => entry.clone().get_swarm_metadata(), } } @@ -28,6 +34,8 @@ impl Torrent { Torrent::Single(entry) => entry.is_good(policy), Torrent::MutexStd(entry) => entry.is_good(policy), Torrent::MutexTokio(entry) => entry.clone().check_good(policy).await, + Torrent::MutexParkingLot(entry) => entry.is_good(policy), + Torrent::RwLockParkingLot(entry) => entry.is_good(policy), } } @@ -36,6 +44,8 @@ impl Torrent { Torrent::Single(entry) => entry.peers_is_empty(), Torrent::MutexStd(entry) => entry.peers_is_empty(), Torrent::MutexTokio(entry) => entry.clone().peers_is_empty().await, + Torrent::MutexParkingLot(entry) => entry.peers_is_empty(), + Torrent::RwLockParkingLot(entry) => entry.peers_is_empty(), } } @@ -44,6 +54,8 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers_len(), Torrent::MutexStd(entry) => entry.get_peers_len(), Torrent::MutexTokio(entry) => entry.clone().get_peers_len().await, + Torrent::MutexParkingLot(entry) => entry.get_peers_len(), + Torrent::RwLockParkingLot(entry) => entry.get_peers_len(), } } @@ -52,6 +64,8 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers(limit), Torrent::MutexStd(entry) => entry.get_peers(limit), Torrent::MutexTokio(entry) => entry.clone().get_peers(limit).await, + Torrent::MutexParkingLot(entry) => entry.get_peers(limit), + Torrent::RwLockParkingLot(entry) => entry.get_peers(limit), } } @@ -60,6 +74,8 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers_for_client(client, limit), Torrent::MutexStd(entry) => entry.get_peers_for_client(client, limit), Torrent::MutexTokio(entry) => entry.clone().get_peers_for_client(client, limit).await, + Torrent::MutexParkingLot(entry) => entry.get_peers_for_client(client, limit), + Torrent::RwLockParkingLot(entry) => entry.get_peers_for_client(client, limit), } } @@ -68,6 +84,8 @@ impl Torrent { Torrent::Single(entry) => entry.upsert_peer(peer), Torrent::MutexStd(entry) => entry.upsert_peer(peer), Torrent::MutexTokio(entry) => entry.clone().upsert_peer(peer).await, + Torrent::MutexParkingLot(entry) => entry.upsert_peer(peer), + Torrent::RwLockParkingLot(entry) => entry.upsert_peer(peer), } } @@ -76,6 +94,8 @@ impl Torrent { Torrent::Single(entry) => entry.remove_inactive_peers(current_cutoff), Torrent::MutexStd(entry) => entry.remove_inactive_peers(current_cutoff), Torrent::MutexTokio(entry) => entry.clone().remove_inactive_peers(current_cutoff).await, + Torrent::MutexParkingLot(entry) => entry.remove_inactive_peers(current_cutoff), + Torrent::RwLockParkingLot(entry) => entry.remove_inactive_peers(current_cutoff), } } } diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/entry/mod.rs index 3c564c6f8..3b9f3e3ad 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/entry/mod.rs @@ -9,7 +9,9 @@ use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; use torrust_tracker_primitives::announce_event::AnnounceEvent; use torrust_tracker_primitives::peer::Peer; use torrust_tracker_primitives::{peer, NumberOfBytes}; -use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; +use torrust_tracker_torrent_repository::{ + EntryMutexParkingLot, EntryMutexStd, EntryMutexTokio, EntryRwLockParkingLot, EntrySingle, +}; use crate::common::torrent::Torrent; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; @@ -20,7 +22,7 @@ fn single() -> Torrent { Torrent::Single(EntrySingle::default()) } #[fixture] -fn standard_mutex() -> Torrent { +fn mutex_std() -> Torrent { Torrent::MutexStd(EntryMutexStd::default()) } @@ -29,6 +31,16 @@ fn mutex_tokio() -> Torrent { Torrent::MutexTokio(EntryMutexTokio::default()) } +#[fixture] +fn mutex_parking_lot() -> Torrent { + Torrent::MutexParkingLot(EntryMutexParkingLot::default()) +} + +#[fixture] +fn rw_lock_parking_lot() -> Torrent { + Torrent::RwLockParkingLot(EntryRwLockParkingLot::default()) +} + #[fixture] fn policy_none() -> TrackerPolicy { TrackerPolicy::new(false, 0, false) @@ -99,7 +111,7 @@ async fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { #[case::empty(&Makes::Empty)] #[tokio::test] async fn it_should_be_empty_by_default( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -115,7 +127,7 @@ async fn it_should_be_empty_by_default( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_check_if_entry_is_good( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { @@ -153,7 +165,7 @@ async fn it_should_check_if_entry_is_good( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_get_peers_for_torrent_entry( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -174,10 +186,7 @@ async fn it_should_get_peers_for_torrent_entry( #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, - #[case] makes: &Makes, -) { +async fn it_should_update_a_peer(#[values(single(), mutex_std(), mutex_tokio())] mut torrent: Torrent, #[case] makes: &Makes) { make(&mut torrent, makes).await; // Make and insert a new peer. @@ -215,7 +224,7 @@ async fn it_should_update_a_peer( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_remove_a_peer_upon_stopped_announcement( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { use torrust_tracker_primitives::peer::ReadInfo as _; @@ -256,7 +265,7 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloaded_statistic( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -287,7 +296,7 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_update_a_peer_as_a_seeder( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -319,7 +328,7 @@ async fn it_should_update_a_peer_as_a_seeder( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_update_a_peer_as_incomplete( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -351,7 +360,7 @@ async fn it_should_update_a_peer_as_incomplete( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_get_peers_excluding_the_client_socket( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -383,7 +392,7 @@ async fn it_should_get_peers_excluding_the_client_socket( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_limit_the_number_of_peers_returned( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -408,7 +417,7 @@ async fn it_should_limit_the_number_of_peers_returned( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_remove_inactive_peers_beyond_cutoff( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { const TIMEOUT: Duration = Duration::from_secs(120); diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index fde34467e..dd9893cc9 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -49,10 +49,20 @@ fn tokio_tokio() -> Repo { } #[fixture] -fn skip_list_std() -> Repo { +fn skip_list_mutex_std() -> Repo { Repo::SkipMapMutexStd(CrossbeamSkipList::default()) } +#[fixture] +fn skip_list_mutex_parking_lot() -> Repo { + Repo::SkipMapMutexParkingLot(CrossbeamSkipList::default()) +} + +#[fixture] +fn skip_list_rw_lock_parking_lot() -> Repo { + Repo::SkipMapRwLockParkingLot(CrossbeamSkipList::default()) +} + #[fixture] fn dash_map_std() -> Repo { Repo::DashMapMutexStd(XacrimonDashMap::default()) @@ -246,7 +256,9 @@ async fn it_should_get_a_torrent_entry( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_mutex_parking_lot(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -279,7 +291,9 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_mutex_std(), + skip_list_mutex_parking_lot(), + skip_list_rw_lock_parking_lot() )] repo: Repo, #[case] entries: Entries, @@ -321,7 +335,9 @@ async fn it_should_get_paginated( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_mutex_std(), + skip_list_mutex_parking_lot(), + skip_list_rw_lock_parking_lot() )] repo: Repo, #[case] entries: Entries, @@ -378,7 +394,9 @@ async fn it_should_get_metrics( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_mutex_parking_lot(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -420,7 +438,9 @@ async fn it_should_import_persistent_torrents( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_mutex_parking_lot(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -459,7 +479,9 @@ async fn it_should_remove_an_entry( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_mutex_parking_lot(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -496,7 +518,9 @@ async fn it_should_remove_inactive_peers( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_mutex_parking_lot(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -594,7 +618,9 @@ async fn it_should_remove_peerless_torrents( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_mutex_parking_lot(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo,