From 5750e2c22d52dfa976150e363d3f400b93efd31e Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 16 Apr 2024 14:16:13 +0100 Subject: [PATCH 1/4] chore(deps): add dependency parking_lot It provides implementations of Mutex and RwLock that are smaller, faster and more flexible than those in the Rust standard library. It will be used to check if a new torrent repo implementation using these lock is faster. --- Cargo.lock | 2 ++ Cargo.toml | 1 + packages/torrent-repository/Cargo.toml | 1 + 3 files changed, 4 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index f13ed1482..dc6db21c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3939,6 +3939,7 @@ dependencies = [ "log", "mockall", "multimap", + "parking_lot", "percent-encoding", "r2d2", "r2d2_mysql", @@ -4037,6 +4038,7 @@ dependencies = [ "crossbeam-skiplist", "dashmap", "futures", + "parking_lot", "rstest", "tokio", "torrust-tracker-clock", diff --git a/Cargo.toml b/Cargo.toml index ef0c39d4b..57c18453b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ hyper = "1" lazy_static = "1" log = { version = "0", features = ["release_max_level_info"] } multimap = "0" +parking_lot = "0.12.1" percent-encoding = "2" r2d2 = "0" r2d2_mysql = "24" diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index 6bc8bfcdd..937ec11e2 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -19,6 +19,7 @@ version.workspace = true crossbeam-skiplist = "0.1" dashmap = "5.5.3" futures = "0.3.29" +parking_lot = "0.12.1" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" } torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" } From 9258ac0cf2b42530950e7cd0cd40792b45c8f7b9 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 16 Apr 2024 14:21:10 +0100 Subject: [PATCH 2/4] feat: new torrent repo implementation using parking_lot RwLock --- .../benches/repository_benchmark.rs | 22 ++++- packages/torrent-repository/src/entry/mod.rs | 1 + .../src/entry/mutex_parking_lot.rs | 49 ++++++++++ packages/torrent-repository/src/lib.rs | 7 ++ .../src/repository/skip_map_mutex_std.rs | 93 ++++++++++++++++++- .../torrent-repository/tests/common/repo.rs | 18 ++++ .../tests/common/torrent.rs | 11 ++- .../torrent-repository/tests/entry/mod.rs | 34 +++---- .../tests/repository/mod.rs | 31 +++++-- 9 files changed, 238 insertions(+), 28 deletions(-) create mode 100644 packages/torrent-repository/src/entry/mutex_parking_lot.rs diff --git a/packages/torrent-repository/benches/repository_benchmark.rs b/packages/torrent-repository/benches/repository_benchmark.rs index 58cd70d9a..75e7fd5b8 100644 --- a/packages/torrent-repository/benches/repository_benchmark.rs +++ b/packages/torrent-repository/benches/repository_benchmark.rs @@ -5,7 +5,7 @@ mod helpers; use criterion::{criterion_group, criterion_main, Criterion}; use torrust_tracker_torrent_repository::{ TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, - TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, + TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, TorrentsSkipMapRwLockParkingLot, }; use crate::helpers::{asyn, sync}; @@ -49,6 +49,10 @@ fn add_one_torrent(c: &mut Criterion) { b.iter_custom(sync::add_one_torrent::); }); + group.bench_function("SkipMapRwLockParkingLot", |b| { + b.iter_custom(sync::add_one_torrent::); + }); + group.bench_function("DashMapMutexStd", |b| { b.iter_custom(sync::add_one_torrent::); }); @@ -102,6 +106,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapRwLockParkingLot", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.bench_function("DashMapMutexStd", |b| { b.to_async(&rt) .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); @@ -156,6 +165,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapRwLockParkingLot", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); + }); + group.bench_function("DashMapMutexStd", |b| { b.to_async(&rt) .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); @@ -211,6 +225,12 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapRwLockParkingLot", |b| { + b.to_async(&rt).iter_custom(|iters| { + sync::update_multiple_torrents_in_parallel::(&rt, iters, None) + }); + }); + group.bench_function("DashMapMutexStd", |b| { b.to_async(&rt) .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index 40fa4efd5..dbe1416be 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -8,6 +8,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use self::peer_list::PeerList; +pub mod mutex_parking_lot; pub mod mutex_std; pub mod mutex_tokio; pub mod peer_list; diff --git a/packages/torrent-repository/src/entry/mutex_parking_lot.rs b/packages/torrent-repository/src/entry/mutex_parking_lot.rs new file mode 100644 index 000000000..ef0e958d5 --- /dev/null +++ b/packages/torrent-repository/src/entry/mutex_parking_lot.rs @@ -0,0 +1,49 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; + +use super::{Entry, EntrySync}; +use crate::{EntryRwLockParkingLot, EntrySingle}; + +impl EntrySync for EntryRwLockParkingLot { + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.read().get_swarm_metadata() + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + self.read().is_good(policy) + } + + fn peers_is_empty(&self) -> bool { + self.read().peers_is_empty() + } + + fn get_peers_len(&self) -> usize { + self.read().get_peers_len() + } + + fn get_peers(&self, limit: Option) -> Vec> { + self.read().get_peers(limit) + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.read().get_peers_for_client(client, limit) + } + + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.write().upsert_peer(peer) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + self.write().remove_inactive_peers(current_cutoff); + } +} + +impl From for EntryRwLockParkingLot { + fn from(entry: EntrySingle) -> Self { + Arc::new(parking_lot::RwLock::new(entry)) + } +} diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index 7a6d209b9..5d3a7ed45 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -9,9 +9,14 @@ use torrust_tracker_clock::clock; pub mod entry; pub mod repository; +// Repo Entries + pub type EntrySingle = entry::Torrent; pub type EntryMutexStd = Arc>; pub type EntryMutexTokio = Arc>; +pub type EntryRwLockParkingLot = Arc>; + +// Repos pub type TorrentsRwLockStd = RwLockStd; pub type TorrentsRwLockStdMutexStd = RwLockStd; @@ -21,6 +26,8 @@ pub type TorrentsRwLockTokioMutexStd = RwLockTokio; pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; +pub type TorrentsSkipMapRwLockParkingLot = CrossbeamSkipList; + pub type TorrentsDashMapMutexStd = XacrimonDashMap; /// This code needs to be copied into each crate. diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index bc9ecd066..0a2a566e7 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -11,7 +11,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::peer_list::PeerList; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntrySingle}; +use crate::{EntryMutexStd, EntryRwLockParkingLot, EntrySingle}; #[derive(Default, Debug)] pub struct CrossbeamSkipList { @@ -108,3 +108,94 @@ where } } } + +impl Repository for CrossbeamSkipList +where + EntryRwLockParkingLot: EntrySync, + EntrySingle: Entry, +{ + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); + entry.value().upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) + } + + fn get(&self, key: &InfoHash) -> Option { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.value().clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().read().get_swarm_metadata(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryRwLockParkingLot)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntryRwLockParkingLot::new( + EntrySingle { + swarm: PeerList::default(), + downloaded: *completed, + } + .into(), + ); + + // Since SkipMap is lock-free the torrent could have been inserted + // after checking if it exists. + self.torrents.get_or_insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option { + self.torrents.remove(key).map(|entry| entry.value().clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + for entry in &self.torrents { + if entry.value().is_good(policy) { + continue; + } + + entry.remove(); + } + } +} diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 7c245fe04..c5da6258d 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -8,6 +8,7 @@ use torrust_tracker_torrent_repository::repository::{Repository as _, Repository use torrust_tracker_torrent_repository::{ EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, + TorrentsSkipMapRwLockParkingLot, }; #[derive(Debug)] @@ -19,6 +20,7 @@ pub(crate) enum Repo { RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd), RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio), SkipMapMutexStd(TorrentsSkipMapMutexStd), + SkipMapRwLockParkingLot(TorrentsSkipMapRwLockParkingLot), DashMapMutexStd(TorrentsDashMapMutexStd), } @@ -32,6 +34,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer).await, Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::SkipMapRwLockParkingLot(repo) => repo.upsert_peer(info_hash, peer), Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), } } @@ -45,6 +48,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.get_swarm_metadata(info_hash).await, Repo::RwLockTokioMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await, Repo::SkipMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::SkipMapRwLockParkingLot(repo) => repo.get_swarm_metadata(info_hash), Repo::DashMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), } } @@ -58,6 +62,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), + Repo::SkipMapRwLockParkingLot(repo) => Some(repo.get(key)?.read().clone()), Repo::DashMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), } } @@ -71,6 +76,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await, Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await, Repo::SkipMapMutexStd(repo) => repo.get_metrics(), + Repo::SkipMapRwLockParkingLot(repo) => repo.get_metrics(), Repo::DashMapMutexStd(repo) => repo.get_metrics(), } } @@ -111,6 +117,11 @@ impl Repo { .iter() .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), + Repo::SkipMapRwLockParkingLot(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.read().clone())) + .collect(), Repo::DashMapMutexStd(repo) => repo .get_paginated(pagination) .iter() @@ -128,6 +139,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents), + Repo::SkipMapRwLockParkingLot(repo) => repo.import_persistent(persistent_torrents), Repo::DashMapMutexStd(repo) => repo.import_persistent(persistent_torrents), } } @@ -141,6 +153,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), + Repo::SkipMapRwLockParkingLot(repo) => Some(repo.remove(key)?.write().clone()), Repo::DashMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), } } @@ -154,6 +167,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::SkipMapRwLockParkingLot(repo) => repo.remove_inactive_peers(current_cutoff), Repo::DashMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), } } @@ -167,6 +181,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy), + Repo::SkipMapRwLockParkingLot(repo) => repo.remove_peerless_torrents(policy), Repo::DashMapMutexStd(repo) => repo.remove_peerless_torrents(policy), } } @@ -194,6 +209,9 @@ impl Repo { Repo::SkipMapMutexStd(repo) => { repo.torrents.insert(*info_hash, torrent.into()); } + Repo::SkipMapRwLockParkingLot(repo) => { + repo.torrents.insert(*info_hash, torrent.into()); + } Repo::DashMapMutexStd(repo) => { repo.torrents.insert(*info_hash, torrent.into()); } diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs index c0699479e..f672d14ef 100644 --- a/packages/torrent-repository/tests/common/torrent.rs +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -5,13 +5,14 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _}; -use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; +use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntryRwLockParkingLot, EntrySingle}; #[derive(Debug, Clone)] pub(crate) enum Torrent { Single(EntrySingle), MutexStd(EntryMutexStd), MutexTokio(EntryMutexTokio), + RwLockParkingLot(EntryRwLockParkingLot), } impl Torrent { @@ -20,6 +21,7 @@ impl Torrent { Torrent::Single(entry) => entry.get_swarm_metadata(), Torrent::MutexStd(entry) => entry.get_swarm_metadata(), Torrent::MutexTokio(entry) => entry.clone().get_swarm_metadata().await, + Torrent::RwLockParkingLot(entry) => entry.clone().get_swarm_metadata(), } } @@ -28,6 +30,7 @@ impl Torrent { Torrent::Single(entry) => entry.is_good(policy), Torrent::MutexStd(entry) => entry.is_good(policy), Torrent::MutexTokio(entry) => entry.clone().check_good(policy).await, + Torrent::RwLockParkingLot(entry) => entry.is_good(policy), } } @@ -36,6 +39,7 @@ impl Torrent { Torrent::Single(entry) => entry.peers_is_empty(), Torrent::MutexStd(entry) => entry.peers_is_empty(), Torrent::MutexTokio(entry) => entry.clone().peers_is_empty().await, + Torrent::RwLockParkingLot(entry) => entry.peers_is_empty(), } } @@ -44,6 +48,7 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers_len(), Torrent::MutexStd(entry) => entry.get_peers_len(), Torrent::MutexTokio(entry) => entry.clone().get_peers_len().await, + Torrent::RwLockParkingLot(entry) => entry.get_peers_len(), } } @@ -52,6 +57,7 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers(limit), Torrent::MutexStd(entry) => entry.get_peers(limit), Torrent::MutexTokio(entry) => entry.clone().get_peers(limit).await, + Torrent::RwLockParkingLot(entry) => entry.get_peers(limit), } } @@ -60,6 +66,7 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers_for_client(client, limit), Torrent::MutexStd(entry) => entry.get_peers_for_client(client, limit), Torrent::MutexTokio(entry) => entry.clone().get_peers_for_client(client, limit).await, + Torrent::RwLockParkingLot(entry) => entry.get_peers_for_client(client, limit), } } @@ -68,6 +75,7 @@ impl Torrent { Torrent::Single(entry) => entry.upsert_peer(peer), Torrent::MutexStd(entry) => entry.upsert_peer(peer), Torrent::MutexTokio(entry) => entry.clone().upsert_peer(peer).await, + Torrent::RwLockParkingLot(entry) => entry.upsert_peer(peer), } } @@ -76,6 +84,7 @@ impl Torrent { Torrent::Single(entry) => entry.remove_inactive_peers(current_cutoff), Torrent::MutexStd(entry) => entry.remove_inactive_peers(current_cutoff), Torrent::MutexTokio(entry) => entry.clone().remove_inactive_peers(current_cutoff).await, + Torrent::RwLockParkingLot(entry) => entry.remove_inactive_peers(current_cutoff), } } } diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/entry/mod.rs index 3c564c6f8..aa3126000 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/entry/mod.rs @@ -9,7 +9,7 @@ use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; use torrust_tracker_primitives::announce_event::AnnounceEvent; use torrust_tracker_primitives::peer::Peer; use torrust_tracker_primitives::{peer, NumberOfBytes}; -use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle}; +use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntryRwLockParkingLot, EntrySingle}; use crate::common::torrent::Torrent; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; @@ -20,7 +20,7 @@ fn single() -> Torrent { Torrent::Single(EntrySingle::default()) } #[fixture] -fn standard_mutex() -> Torrent { +fn mutex_std() -> Torrent { Torrent::MutexStd(EntryMutexStd::default()) } @@ -29,6 +29,11 @@ fn mutex_tokio() -> Torrent { Torrent::MutexTokio(EntryMutexTokio::default()) } +#[fixture] +fn rw_lock_parking_lot() -> Torrent { + Torrent::RwLockParkingLot(EntryRwLockParkingLot::default()) +} + #[fixture] fn policy_none() -> TrackerPolicy { TrackerPolicy::new(false, 0, false) @@ -99,7 +104,7 @@ async fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { #[case::empty(&Makes::Empty)] #[tokio::test] async fn it_should_be_empty_by_default( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -115,7 +120,7 @@ async fn it_should_be_empty_by_default( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_check_if_entry_is_good( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { @@ -153,7 +158,7 @@ async fn it_should_check_if_entry_is_good( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_get_peers_for_torrent_entry( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -174,10 +179,7 @@ async fn it_should_get_peers_for_torrent_entry( #[case::downloaded(&Makes::Downloaded)] #[case::three(&Makes::Three)] #[tokio::test] -async fn it_should_update_a_peer( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, - #[case] makes: &Makes, -) { +async fn it_should_update_a_peer(#[values(single(), mutex_std(), mutex_tokio())] mut torrent: Torrent, #[case] makes: &Makes) { make(&mut torrent, makes).await; // Make and insert a new peer. @@ -215,7 +217,7 @@ async fn it_should_update_a_peer( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_remove_a_peer_upon_stopped_announcement( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { use torrust_tracker_primitives::peer::ReadInfo as _; @@ -256,7 +258,7 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloaded_statistic( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -287,7 +289,7 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_update_a_peer_as_a_seeder( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -319,7 +321,7 @@ async fn it_should_update_a_peer_as_a_seeder( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_update_a_peer_as_incomplete( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -351,7 +353,7 @@ async fn it_should_update_a_peer_as_incomplete( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_get_peers_excluding_the_client_socket( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -383,7 +385,7 @@ async fn it_should_get_peers_excluding_the_client_socket( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_limit_the_number_of_peers_returned( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -408,7 +410,7 @@ async fn it_should_limit_the_number_of_peers_returned( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_remove_inactive_peers_beyond_cutoff( - #[values(single(), standard_mutex(), mutex_tokio())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { const TIMEOUT: Duration = Duration::from_secs(120); diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index fde34467e..ac53e6510 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -49,10 +49,15 @@ fn tokio_tokio() -> Repo { } #[fixture] -fn skip_list_std() -> Repo { +fn skip_list_mutex_std() -> Repo { Repo::SkipMapMutexStd(CrossbeamSkipList::default()) } +#[fixture] +fn skip_list_rw_lock_parking_lot() -> Repo { + Repo::SkipMapRwLockParkingLot(CrossbeamSkipList::default()) +} + #[fixture] fn dash_map_std() -> Repo { Repo::DashMapMutexStd(XacrimonDashMap::default()) @@ -246,7 +251,8 @@ async fn it_should_get_a_torrent_entry( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -279,7 +285,8 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_mutex_std(), + skip_list_rw_lock_parking_lot() )] repo: Repo, #[case] entries: Entries, @@ -321,7 +328,8 @@ async fn it_should_get_paginated( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_mutex_std(), + skip_list_rw_lock_parking_lot() )] repo: Repo, #[case] entries: Entries, @@ -378,7 +386,8 @@ async fn it_should_get_metrics( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -420,7 +429,8 @@ async fn it_should_import_persistent_torrents( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -459,7 +469,8 @@ async fn it_should_remove_an_entry( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -496,7 +507,8 @@ async fn it_should_remove_inactive_peers( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, @@ -594,7 +606,8 @@ async fn it_should_remove_peerless_torrents( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std(), + skip_list_mutex_std(), + skip_list_rw_lock_parking_lot(), dash_map_std() )] repo: Repo, From 0fa396cc34ae457ff2855bd339317b3b1dc15672 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 16 Apr 2024 16:18:44 +0100 Subject: [PATCH 3/4] chore(deps): add parking_lot to cargo machete It's used for benchmarking in the torrent-repository workspace package. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 57c18453b..94889cbf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ url = "2" uuid = { version = "1", features = ["v4"] } [package.metadata.cargo-machete] -ignored = ["serde_bytes", "crossbeam-skiplist", "dashmap"] +ignored = ["serde_bytes", "crossbeam-skiplist", "dashmap", "parking_lot"] [dev-dependencies] local-ip-address = "0" From 0058e72550d827da08071b63961ce78596d758e1 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 16 Apr 2024 16:28:38 +0100 Subject: [PATCH 4/4] feat: new torrent repo implementation using parking_lot Mutex --- .../benches/repository_benchmark.rs | 23 ++++- packages/torrent-repository/src/entry/mod.rs | 1 + .../src/entry/mutex_parking_lot.rs | 24 ++--- .../src/entry/rw_lock_parking_lot.rs | 49 ++++++++++ packages/torrent-repository/src/lib.rs | 2 + .../src/repository/skip_map_mutex_std.rs | 93 ++++++++++++++++++- .../torrent-repository/tests/common/repo.rs | 21 ++++- .../tests/common/torrent.rs | 13 ++- .../torrent-repository/tests/entry/mod.rs | 29 +++--- .../tests/repository/mod.rs | 13 +++ 10 files changed, 240 insertions(+), 28 deletions(-) create mode 100644 packages/torrent-repository/src/entry/rw_lock_parking_lot.rs diff --git a/packages/torrent-repository/benches/repository_benchmark.rs b/packages/torrent-repository/benches/repository_benchmark.rs index 75e7fd5b8..4e50f1454 100644 --- a/packages/torrent-repository/benches/repository_benchmark.rs +++ b/packages/torrent-repository/benches/repository_benchmark.rs @@ -5,7 +5,8 @@ mod helpers; use criterion::{criterion_group, criterion_main, Criterion}; use torrust_tracker_torrent_repository::{ TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, - TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, TorrentsSkipMapRwLockParkingLot, + TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexParkingLot, TorrentsSkipMapMutexStd, + TorrentsSkipMapRwLockParkingLot, }; use crate::helpers::{asyn, sync}; @@ -49,6 +50,10 @@ fn add_one_torrent(c: &mut Criterion) { b.iter_custom(sync::add_one_torrent::); }); + group.bench_function("SkipMapMutexParkingLot", |b| { + b.iter_custom(sync::add_one_torrent::); + }); + group.bench_function("SkipMapRwLockParkingLot", |b| { b.iter_custom(sync::add_one_torrent::); }); @@ -106,6 +111,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexParkingLot", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.bench_function("SkipMapRwLockParkingLot", |b| { b.to_async(&rt) .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); @@ -165,6 +175,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexParkingLot", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); + }); + group.bench_function("SkipMapRwLockParkingLot", |b| { b.to_async(&rt) .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); @@ -225,6 +240,12 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("SkipMapMutexParkingLot", |b| { + b.to_async(&rt).iter_custom(|iters| { + sync::update_multiple_torrents_in_parallel::(&rt, iters, None) + }); + }); + group.bench_function("SkipMapRwLockParkingLot", |b| { b.to_async(&rt).iter_custom(|iters| { sync::update_multiple_torrents_in_parallel::(&rt, iters, None) diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index dbe1416be..b811d3262 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -12,6 +12,7 @@ pub mod mutex_parking_lot; pub mod mutex_std; pub mod mutex_tokio; pub mod peer_list; +pub mod rw_lock_parking_lot; pub mod single; pub trait Entry { diff --git a/packages/torrent-repository/src/entry/mutex_parking_lot.rs b/packages/torrent-repository/src/entry/mutex_parking_lot.rs index ef0e958d5..4f3921ea7 100644 --- a/packages/torrent-repository/src/entry/mutex_parking_lot.rs +++ b/packages/torrent-repository/src/entry/mutex_parking_lot.rs @@ -6,44 +6,44 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use super::{Entry, EntrySync}; -use crate::{EntryRwLockParkingLot, EntrySingle}; +use crate::{EntryMutexParkingLot, EntrySingle}; -impl EntrySync for EntryRwLockParkingLot { +impl EntrySync for EntryMutexParkingLot { fn get_swarm_metadata(&self) -> SwarmMetadata { - self.read().get_swarm_metadata() + self.lock().get_swarm_metadata() } fn is_good(&self, policy: &TrackerPolicy) -> bool { - self.read().is_good(policy) + self.lock().is_good(policy) } fn peers_is_empty(&self) -> bool { - self.read().peers_is_empty() + self.lock().peers_is_empty() } fn get_peers_len(&self) -> usize { - self.read().get_peers_len() + self.lock().get_peers_len() } fn get_peers(&self, limit: Option) -> Vec> { - self.read().get_peers(limit) + self.lock().get_peers(limit) } fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { - self.read().get_peers_for_client(client, limit) + self.lock().get_peers_for_client(client, limit) } fn upsert_peer(&self, peer: &peer::Peer) -> bool { - self.write().upsert_peer(peer) + self.lock().upsert_peer(peer) } fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { - self.write().remove_inactive_peers(current_cutoff); + self.lock().remove_inactive_peers(current_cutoff); } } -impl From for EntryRwLockParkingLot { +impl From for EntryMutexParkingLot { fn from(entry: EntrySingle) -> Self { - Arc::new(parking_lot::RwLock::new(entry)) + Arc::new(parking_lot::Mutex::new(entry)) } } diff --git a/packages/torrent-repository/src/entry/rw_lock_parking_lot.rs b/packages/torrent-repository/src/entry/rw_lock_parking_lot.rs new file mode 100644 index 000000000..ef0e958d5 --- /dev/null +++ b/packages/torrent-repository/src/entry/rw_lock_parking_lot.rs @@ -0,0 +1,49 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; + +use super::{Entry, EntrySync}; +use crate::{EntryRwLockParkingLot, EntrySingle}; + +impl EntrySync for EntryRwLockParkingLot { + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.read().get_swarm_metadata() + } + + fn is_good(&self, policy: &TrackerPolicy) -> bool { + self.read().is_good(policy) + } + + fn peers_is_empty(&self) -> bool { + self.read().peers_is_empty() + } + + fn get_peers_len(&self) -> usize { + self.read().get_peers_len() + } + + fn get_peers(&self, limit: Option) -> Vec> { + self.read().get_peers(limit) + } + + fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec> { + self.read().get_peers_for_client(client, limit) + } + + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.write().upsert_peer(peer) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + self.write().remove_inactive_peers(current_cutoff); + } +} + +impl From for EntryRwLockParkingLot { + fn from(entry: EntrySingle) -> Self { + Arc::new(parking_lot::RwLock::new(entry)) + } +} diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index 5d3a7ed45..a8955808e 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -14,6 +14,7 @@ pub mod repository; pub type EntrySingle = entry::Torrent; pub type EntryMutexStd = Arc>; pub type EntryMutexTokio = Arc>; +pub type EntryMutexParkingLot = Arc>; pub type EntryRwLockParkingLot = Arc>; // Repos @@ -26,6 +27,7 @@ pub type TorrentsRwLockTokioMutexStd = RwLockTokio; pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; +pub type TorrentsSkipMapMutexParkingLot = CrossbeamSkipList; pub type TorrentsSkipMapRwLockParkingLot = CrossbeamSkipList; pub type TorrentsDashMapMutexStd = XacrimonDashMap; diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index 0a2a566e7..9960b0c30 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -11,7 +11,7 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use super::Repository; use crate::entry::peer_list::PeerList; use crate::entry::{Entry, EntrySync}; -use crate::{EntryMutexStd, EntryRwLockParkingLot, EntrySingle}; +use crate::{EntryMutexParkingLot, EntryMutexStd, EntryRwLockParkingLot, EntrySingle}; #[derive(Default, Debug)] pub struct CrossbeamSkipList { @@ -199,3 +199,94 @@ where } } } + +impl Repository for CrossbeamSkipList +where + EntryMutexParkingLot: EntrySync, + EntrySingle: Entry, +{ + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); + entry.value().upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) + } + + fn get(&self, key: &InfoHash) -> Option { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.value().clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().lock().get_swarm_metadata(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexParkingLot)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntryMutexParkingLot::new( + EntrySingle { + swarm: PeerList::default(), + downloaded: *completed, + } + .into(), + ); + + // Since SkipMap is lock-free the torrent could have been inserted + // after checking if it exists. + self.torrents.get_or_insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option { + self.torrents.remove(key).map(|entry| entry.value().clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + for entry in &self.torrents { + if entry.value().is_good(policy) { + continue; + } + + entry.remove(); + } + } +} diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index c5da6258d..f317d0d17 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -7,8 +7,8 @@ use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; use torrust_tracker_torrent_repository::{ EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, - TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, - TorrentsSkipMapRwLockParkingLot, + TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexParkingLot, + TorrentsSkipMapMutexStd, TorrentsSkipMapRwLockParkingLot, }; #[derive(Debug)] @@ -20,6 +20,7 @@ pub(crate) enum Repo { RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd), RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio), SkipMapMutexStd(TorrentsSkipMapMutexStd), + SkipMapMutexParkingLot(TorrentsSkipMapMutexParkingLot), SkipMapRwLockParkingLot(TorrentsSkipMapRwLockParkingLot), DashMapMutexStd(TorrentsDashMapMutexStd), } @@ -34,6 +35,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer).await, Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::SkipMapMutexParkingLot(repo) => repo.upsert_peer(info_hash, peer), Repo::SkipMapRwLockParkingLot(repo) => repo.upsert_peer(info_hash, peer), Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), } @@ -48,6 +50,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.get_swarm_metadata(info_hash).await, Repo::RwLockTokioMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await, Repo::SkipMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::SkipMapMutexParkingLot(repo) => repo.get_swarm_metadata(info_hash), Repo::SkipMapRwLockParkingLot(repo) => repo.get_swarm_metadata(info_hash), Repo::DashMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), } @@ -62,6 +65,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), + Repo::SkipMapMutexParkingLot(repo) => Some(repo.get(key)?.lock().clone()), Repo::SkipMapRwLockParkingLot(repo) => Some(repo.get(key)?.read().clone()), Repo::DashMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), } @@ -76,6 +80,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await, Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await, Repo::SkipMapMutexStd(repo) => repo.get_metrics(), + Repo::SkipMapMutexParkingLot(repo) => repo.get_metrics(), Repo::SkipMapRwLockParkingLot(repo) => repo.get_metrics(), Repo::DashMapMutexStd(repo) => repo.get_metrics(), } @@ -117,6 +122,11 @@ impl Repo { .iter() .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), + Repo::SkipMapMutexParkingLot(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.lock().clone())) + .collect(), Repo::SkipMapRwLockParkingLot(repo) => repo .get_paginated(pagination) .iter() @@ -139,6 +149,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents), + Repo::SkipMapMutexParkingLot(repo) => repo.import_persistent(persistent_torrents), Repo::SkipMapRwLockParkingLot(repo) => repo.import_persistent(persistent_torrents), Repo::DashMapMutexStd(repo) => repo.import_persistent(persistent_torrents), } @@ -153,6 +164,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), + Repo::SkipMapMutexParkingLot(repo) => Some(repo.remove(key)?.lock().clone()), Repo::SkipMapRwLockParkingLot(repo) => Some(repo.remove(key)?.write().clone()), Repo::DashMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), } @@ -167,6 +179,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::SkipMapMutexParkingLot(repo) => repo.remove_inactive_peers(current_cutoff), Repo::SkipMapRwLockParkingLot(repo) => repo.remove_inactive_peers(current_cutoff), Repo::DashMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), } @@ -181,6 +194,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy), + Repo::SkipMapMutexParkingLot(repo) => repo.remove_peerless_torrents(policy), Repo::SkipMapRwLockParkingLot(repo) => repo.remove_peerless_torrents(policy), Repo::DashMapMutexStd(repo) => repo.remove_peerless_torrents(policy), } @@ -209,6 +223,9 @@ impl Repo { Repo::SkipMapMutexStd(repo) => { repo.torrents.insert(*info_hash, torrent.into()); } + Repo::SkipMapMutexParkingLot(repo) => { + repo.torrents.insert(*info_hash, torrent.into()); + } Repo::SkipMapRwLockParkingLot(repo) => { repo.torrents.insert(*info_hash, torrent.into()); } diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs index f672d14ef..abcf5525e 100644 --- a/packages/torrent-repository/tests/common/torrent.rs +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -5,13 +5,16 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch}; use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _}; -use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntryRwLockParkingLot, EntrySingle}; +use torrust_tracker_torrent_repository::{ + EntryMutexParkingLot, EntryMutexStd, EntryMutexTokio, EntryRwLockParkingLot, EntrySingle, +}; #[derive(Debug, Clone)] pub(crate) enum Torrent { Single(EntrySingle), MutexStd(EntryMutexStd), MutexTokio(EntryMutexTokio), + MutexParkingLot(EntryMutexParkingLot), RwLockParkingLot(EntryRwLockParkingLot), } @@ -21,6 +24,7 @@ impl Torrent { Torrent::Single(entry) => entry.get_swarm_metadata(), Torrent::MutexStd(entry) => entry.get_swarm_metadata(), Torrent::MutexTokio(entry) => entry.clone().get_swarm_metadata().await, + Torrent::MutexParkingLot(entry) => entry.clone().get_swarm_metadata(), Torrent::RwLockParkingLot(entry) => entry.clone().get_swarm_metadata(), } } @@ -30,6 +34,7 @@ impl Torrent { Torrent::Single(entry) => entry.is_good(policy), Torrent::MutexStd(entry) => entry.is_good(policy), Torrent::MutexTokio(entry) => entry.clone().check_good(policy).await, + Torrent::MutexParkingLot(entry) => entry.is_good(policy), Torrent::RwLockParkingLot(entry) => entry.is_good(policy), } } @@ -39,6 +44,7 @@ impl Torrent { Torrent::Single(entry) => entry.peers_is_empty(), Torrent::MutexStd(entry) => entry.peers_is_empty(), Torrent::MutexTokio(entry) => entry.clone().peers_is_empty().await, + Torrent::MutexParkingLot(entry) => entry.peers_is_empty(), Torrent::RwLockParkingLot(entry) => entry.peers_is_empty(), } } @@ -48,6 +54,7 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers_len(), Torrent::MutexStd(entry) => entry.get_peers_len(), Torrent::MutexTokio(entry) => entry.clone().get_peers_len().await, + Torrent::MutexParkingLot(entry) => entry.get_peers_len(), Torrent::RwLockParkingLot(entry) => entry.get_peers_len(), } } @@ -57,6 +64,7 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers(limit), Torrent::MutexStd(entry) => entry.get_peers(limit), Torrent::MutexTokio(entry) => entry.clone().get_peers(limit).await, + Torrent::MutexParkingLot(entry) => entry.get_peers(limit), Torrent::RwLockParkingLot(entry) => entry.get_peers(limit), } } @@ -66,6 +74,7 @@ impl Torrent { Torrent::Single(entry) => entry.get_peers_for_client(client, limit), Torrent::MutexStd(entry) => entry.get_peers_for_client(client, limit), Torrent::MutexTokio(entry) => entry.clone().get_peers_for_client(client, limit).await, + Torrent::MutexParkingLot(entry) => entry.get_peers_for_client(client, limit), Torrent::RwLockParkingLot(entry) => entry.get_peers_for_client(client, limit), } } @@ -75,6 +84,7 @@ impl Torrent { Torrent::Single(entry) => entry.upsert_peer(peer), Torrent::MutexStd(entry) => entry.upsert_peer(peer), Torrent::MutexTokio(entry) => entry.clone().upsert_peer(peer).await, + Torrent::MutexParkingLot(entry) => entry.upsert_peer(peer), Torrent::RwLockParkingLot(entry) => entry.upsert_peer(peer), } } @@ -84,6 +94,7 @@ impl Torrent { Torrent::Single(entry) => entry.remove_inactive_peers(current_cutoff), Torrent::MutexStd(entry) => entry.remove_inactive_peers(current_cutoff), Torrent::MutexTokio(entry) => entry.clone().remove_inactive_peers(current_cutoff).await, + Torrent::MutexParkingLot(entry) => entry.remove_inactive_peers(current_cutoff), Torrent::RwLockParkingLot(entry) => entry.remove_inactive_peers(current_cutoff), } } diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/entry/mod.rs index aa3126000..3b9f3e3ad 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/entry/mod.rs @@ -9,7 +9,9 @@ use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; use torrust_tracker_primitives::announce_event::AnnounceEvent; use torrust_tracker_primitives::peer::Peer; use torrust_tracker_primitives::{peer, NumberOfBytes}; -use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntryRwLockParkingLot, EntrySingle}; +use torrust_tracker_torrent_repository::{ + EntryMutexParkingLot, EntryMutexStd, EntryMutexTokio, EntryRwLockParkingLot, EntrySingle, +}; use crate::common::torrent::Torrent; use crate::common::torrent_peer_builder::{a_completed_peer, a_started_peer}; @@ -29,6 +31,11 @@ fn mutex_tokio() -> Torrent { Torrent::MutexTokio(EntryMutexTokio::default()) } +#[fixture] +fn mutex_parking_lot() -> Torrent { + Torrent::MutexParkingLot(EntryMutexParkingLot::default()) +} + #[fixture] fn rw_lock_parking_lot() -> Torrent { Torrent::RwLockParkingLot(EntryRwLockParkingLot::default()) @@ -104,7 +111,7 @@ async fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { #[case::empty(&Makes::Empty)] #[tokio::test] async fn it_should_be_empty_by_default( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -120,7 +127,7 @@ async fn it_should_be_empty_by_default( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_check_if_entry_is_good( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, #[values(policy_none(), policy_persist(), policy_remove(), policy_remove_persist())] policy: TrackerPolicy, ) { @@ -158,7 +165,7 @@ async fn it_should_check_if_entry_is_good( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_get_peers_for_torrent_entry( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -217,7 +224,7 @@ async fn it_should_update_a_peer(#[values(single(), mutex_std(), mutex_tokio())] #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_remove_a_peer_upon_stopped_announcement( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { use torrust_tracker_primitives::peer::ReadInfo as _; @@ -258,7 +265,7 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloaded_statistic( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -289,7 +296,7 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_update_a_peer_as_a_seeder( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -321,7 +328,7 @@ async fn it_should_update_a_peer_as_a_seeder( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_update_a_peer_as_incomplete( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { let peers = make(&mut torrent, makes).await; @@ -353,7 +360,7 @@ async fn it_should_update_a_peer_as_incomplete( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_get_peers_excluding_the_client_socket( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -385,7 +392,7 @@ async fn it_should_get_peers_excluding_the_client_socket( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_limit_the_number_of_peers_returned( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { make(&mut torrent, makes).await; @@ -410,7 +417,7 @@ async fn it_should_limit_the_number_of_peers_returned( #[case::three(&Makes::Three)] #[tokio::test] async fn it_should_remove_inactive_peers_beyond_cutoff( - #[values(single(), mutex_std(), mutex_tokio(), rw_lock_parking_lot())] mut torrent: Torrent, + #[values(single(), mutex_std(), mutex_tokio(), mutex_parking_lot(), rw_lock_parking_lot())] mut torrent: Torrent, #[case] makes: &Makes, ) { const TIMEOUT: Duration = Duration::from_secs(120); diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index ac53e6510..dd9893cc9 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -53,6 +53,11 @@ fn skip_list_mutex_std() -> Repo { Repo::SkipMapMutexStd(CrossbeamSkipList::default()) } +#[fixture] +fn skip_list_mutex_parking_lot() -> Repo { + Repo::SkipMapMutexParkingLot(CrossbeamSkipList::default()) +} + #[fixture] fn skip_list_rw_lock_parking_lot() -> Repo { Repo::SkipMapRwLockParkingLot(CrossbeamSkipList::default()) @@ -252,6 +257,7 @@ async fn it_should_get_a_torrent_entry( tokio_mutex(), tokio_tokio(), skip_list_mutex_std(), + skip_list_mutex_parking_lot(), skip_list_rw_lock_parking_lot(), dash_map_std() )] @@ -286,6 +292,7 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order( tokio_mutex(), tokio_tokio(), skip_list_mutex_std(), + skip_list_mutex_parking_lot(), skip_list_rw_lock_parking_lot() )] repo: Repo, @@ -329,6 +336,7 @@ async fn it_should_get_paginated( tokio_mutex(), tokio_tokio(), skip_list_mutex_std(), + skip_list_mutex_parking_lot(), skip_list_rw_lock_parking_lot() )] repo: Repo, @@ -387,6 +395,7 @@ async fn it_should_get_metrics( tokio_mutex(), tokio_tokio(), skip_list_mutex_std(), + skip_list_mutex_parking_lot(), skip_list_rw_lock_parking_lot(), dash_map_std() )] @@ -430,6 +439,7 @@ async fn it_should_import_persistent_torrents( tokio_mutex(), tokio_tokio(), skip_list_mutex_std(), + skip_list_mutex_parking_lot(), skip_list_rw_lock_parking_lot(), dash_map_std() )] @@ -470,6 +480,7 @@ async fn it_should_remove_an_entry( tokio_mutex(), tokio_tokio(), skip_list_mutex_std(), + skip_list_mutex_parking_lot(), skip_list_rw_lock_parking_lot(), dash_map_std() )] @@ -508,6 +519,7 @@ async fn it_should_remove_inactive_peers( tokio_mutex(), tokio_tokio(), skip_list_mutex_std(), + skip_list_mutex_parking_lot(), skip_list_rw_lock_parking_lot(), dash_map_std() )] @@ -607,6 +619,7 @@ async fn it_should_remove_peerless_torrents( tokio_mutex(), tokio_tokio(), skip_list_mutex_std(), + skip_list_mutex_parking_lot(), skip_list_rw_lock_parking_lot(), dash_map_std() )]