diff --git a/Cargo.lock b/Cargo.lock index 0a60397f9..ce66efa09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,6 +479,12 @@ dependencies = [ "syn", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.9.0" @@ -510,6 +516,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "either" version = "1.8.0" @@ -570,6 +582,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -600,6 +621,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "frunk" version = "0.4.0" @@ -1008,6 +1035,15 @@ dependencies = [ "syn", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.4" @@ -1208,6 +1244,33 @@ dependencies = [ "windows-sys 0.36.1", ] +[[package]] +name = "mockall" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50e4a1c770583dac7ab5e2f6c139153b783a53a1bbee9729613f193e59828326" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "832663583d5fa284ca8810bf7015e46c9fff9622d3cf34bd1eea5003fec06dd0" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "multipart" version = "0.18.0" @@ -1349,6 +1412,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "num-bigint" version = "0.3.3" @@ -1626,6 +1695,36 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "predicates" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5aab5be6e4732b473071984b3164dbbfb7a3674d30ea5ff44410b6bcd960c3c" +dependencies = [ + "difflib", + "float-cmp", + "itertools", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da1c2388b1513e1b605fcec39a95e0a9e8ef088f71443ef37099fa9ae6673fcb" + +[[package]] +name = "predicates-tree" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d86de6de25020a36c6d3643a86d9a6a9f552107c0559c60ea03551b5e16c032" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -2285,6 +2384,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507e9898683b6c43a9aa55b64259b721b52ba226e0f3779137e50ad114a4c90b" + [[package]] name = "textwrap" version = "0.11.0" @@ -2509,6 +2614,7 @@ dependencies = [ "hex", "lazy_static", "log", + "mockall", "openssl", "percent-encoding", "r2d2", diff --git a/Cargo.toml b/Cargo.toml index c7e3790bb..b2b256a2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,3 +59,6 @@ async-trait = "0.1" aquatic_udp_protocol = "0.2" uuid = { version = "1", features = ["v4"] } + +[dev-dependencies] +mockall = "0.11" diff --git a/src/lib.rs b/src/lib.rs index 5f003b5fd..cf830f108 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod jobs; pub mod logging; pub mod protocol; pub mod setup; +pub mod stats; pub mod tracker; pub mod udp; diff --git a/src/main.rs b/src/main.rs index dcb92acb8..08061cd7b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use log::info; -use torrust_tracker::tracker::statistics::StatsTracker; +use torrust_tracker::stats::setup_statistics; use torrust_tracker::tracker::tracker::TorrentTracker; use torrust_tracker::{ephemeral_instance_keys, logging, setup, static_time, Configuration}; @@ -23,11 +23,11 @@ async fn main() { } }; - // Initialize stats tracker - let stats_tracker = StatsTracker::new_instance(config.tracker_usage_statistics); + // Initialize statistics + let (stats_event_sender, stats_repository) = setup_statistics(config.tracker_usage_statistics); // Initialize Torrust tracker - let tracker = match TorrentTracker::new(config.clone(), Box::new(stats_tracker)) { + let tracker = match TorrentTracker::new(config.clone(), stats_event_sender, stats_repository) { Ok(tracker) => Arc::new(tracker), Err(error) => { panic!("{}", error) diff --git a/src/stats.rs b/src/stats.rs new file mode 100644 index 000000000..1f387a084 --- /dev/null +++ b/src/stats.rs @@ -0,0 +1,36 @@ +use crate::statistics::{StatsRepository, StatsTracker, TrackerStatisticsEventSender}; + +pub fn setup_statistics(tracker_usage_statistics: bool) -> (Option>, StatsRepository) { + let mut stats_event_sender = None; + + let mut stats_tracker = StatsTracker::new(); + + if tracker_usage_statistics { + stats_event_sender = Some(stats_tracker.run_event_listener()); + } + + (stats_event_sender, stats_tracker.stats_repository) +} + +#[cfg(test)] +mod test { + use crate::stats::setup_statistics; + + #[tokio::test] + async fn should_not_send_any_event_when_statistics_are_disabled() { + let tracker_usage_statistics = false; + + let (stats_event_sender, _stats_repository) = setup_statistics(tracker_usage_statistics); + + assert!(stats_event_sender.is_none()); + } + + #[tokio::test] + async fn should_send_events_when_statistics_are_enabled() { + let tracker_usage_statistics = true; + + let (stats_event_sender, _stats_repository) = setup_statistics(tracker_usage_statistics); + + assert!(stats_event_sender.is_some()); + } +} diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index fb4e4c0fe..c4d4971af 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -1,8 +1,11 @@ use std::sync::Arc; use async_trait::async_trait; +use log::debug; +#[cfg(test)] +use mockall::{automock, predicate::*}; use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, RwLock, RwLockReadGuard}; const CHANNEL_BUFFER_SIZE: usize = 65_535; @@ -37,6 +40,12 @@ pub struct TrackerStatistics { pub udp6_scrapes_handled: u64, } +impl Default for TrackerStatistics { + fn default() -> Self { + Self::new() + } +} + impl TrackerStatistics { pub fn new() -> Self { Self { @@ -57,152 +66,389 @@ impl TrackerStatistics { } pub struct StatsTracker { - channel_sender: Option>, - pub stats: Arc>, + pub stats_repository: StatsRepository, +} + +impl Default for StatsTracker { + fn default() -> Self { + Self::new() + } } impl StatsTracker { - pub fn new_active_instance() -> Self { - Self::new_instance(true) + pub fn new() -> Self { + Self { + stats_repository: StatsRepository::new(), + } } - pub fn new_inactive_instance() -> Self { - Self::new_instance(false) + pub fn new_active_instance() -> (Box, StatsRepository) { + let mut stats_tracker = Self::new(); + + let stats_event_sender = stats_tracker.run_event_listener(); + + (stats_event_sender, stats_tracker.stats_repository) } - pub fn new_instance(active: bool) -> Self { - let mut stats_tracker = Self { - channel_sender: None, - stats: Arc::new(RwLock::new(TrackerStatistics::new())), - }; + pub fn run_event_listener(&mut self) -> Box { + let (sender, receiver) = mpsc::channel::(CHANNEL_BUFFER_SIZE); - if active { - stats_tracker.run_worker(); - } + let stats_repository = self.stats_repository.clone(); + + tokio::spawn(async move { event_listener(receiver, stats_repository).await }); - stats_tracker + Box::new(StatsEventSender { sender }) } +} - pub fn new() -> Self { - Self { - channel_sender: None, - stats: Arc::new(RwLock::new(TrackerStatistics::new())), - } +async fn event_listener(mut receiver: Receiver, stats_repository: StatsRepository) { + while let Some(event) = receiver.recv().await { + event_handler(event, &stats_repository).await; } +} + +async fn event_handler(event: TrackerStatisticsEvent, stats_repository: &StatsRepository) { + match event { + // TCP4 + TrackerStatisticsEvent::Tcp4Announce => { + stats_repository.increase_tcp4_announces().await; + stats_repository.increase_tcp4_connections().await; + } + TrackerStatisticsEvent::Tcp4Scrape => { + stats_repository.increase_tcp4_scrapes().await; + stats_repository.increase_tcp4_connections().await; + } + + // TCP6 + TrackerStatisticsEvent::Tcp6Announce => { + stats_repository.increase_tcp6_announces().await; + stats_repository.increase_tcp6_connections().await; + } + TrackerStatisticsEvent::Tcp6Scrape => { + stats_repository.increase_tcp6_scrapes().await; + stats_repository.increase_tcp6_connections().await; + } + + // UDP4 + TrackerStatisticsEvent::Udp4Connect => { + stats_repository.increase_udp4_connections().await; + } + TrackerStatisticsEvent::Udp4Announce => { + stats_repository.increase_udp4_announces().await; + } + TrackerStatisticsEvent::Udp4Scrape => { + stats_repository.increase_udp4_scrapes().await; + } - pub fn run_worker(&mut self) { - let (tx, mut rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); - - // set send channel on stats_tracker - self.channel_sender = Some(tx); - - let stats = self.stats.clone(); - - tokio::spawn(async move { - while let Some(event) = rx.recv().await { - let mut stats_lock = stats.write().await; - - match event { - TrackerStatisticsEvent::Tcp4Announce => { - stats_lock.tcp4_announces_handled += 1; - stats_lock.tcp4_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp4Scrape => { - stats_lock.tcp4_scrapes_handled += 1; - stats_lock.tcp4_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp6Announce => { - stats_lock.tcp6_announces_handled += 1; - stats_lock.tcp6_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp6Scrape => { - stats_lock.tcp6_scrapes_handled += 1; - stats_lock.tcp6_connections_handled += 1; - } - TrackerStatisticsEvent::Udp4Connect => { - stats_lock.udp4_connections_handled += 1; - } - TrackerStatisticsEvent::Udp4Announce => { - stats_lock.udp4_announces_handled += 1; - } - TrackerStatisticsEvent::Udp4Scrape => { - stats_lock.udp4_scrapes_handled += 1; - } - TrackerStatisticsEvent::Udp6Connect => { - stats_lock.udp6_connections_handled += 1; - } - TrackerStatisticsEvent::Udp6Announce => { - stats_lock.udp6_announces_handled += 1; - } - TrackerStatisticsEvent::Udp6Scrape => { - stats_lock.udp6_scrapes_handled += 1; - } - } - - drop(stats_lock); - } - }); + // UDP6 + TrackerStatisticsEvent::Udp6Connect => { + stats_repository.increase_udp6_connections().await; + } + TrackerStatisticsEvent::Udp6Announce => { + stats_repository.increase_udp6_announces().await; + } + TrackerStatisticsEvent::Udp6Scrape => { + stats_repository.increase_udp6_scrapes().await; + } } + + debug!("stats: {:?}", stats_repository.get_stats().await); } #[async_trait] +#[cfg_attr(test, automock)] pub trait TrackerStatisticsEventSender: Sync + Send { async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>>; } +pub struct StatsEventSender { + sender: Sender, +} + #[async_trait] -impl TrackerStatisticsEventSender for StatsTracker { +impl TrackerStatisticsEventSender for StatsEventSender { async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>> { - if let Some(tx) = &self.channel_sender { - Some(tx.send(event).await) - } else { - None - } + Some(self.sender.send(event).await) } } -#[async_trait] -pub trait TrackerStatisticsRepository: Sync + Send { - async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics>; +#[derive(Clone)] +pub struct StatsRepository { + pub stats: Arc>, } -#[async_trait] -impl TrackerStatisticsRepository for StatsTracker { - async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { - self.stats.read().await +impl Default for StatsRepository { + fn default() -> Self { + Self::new() } } -pub trait TrackerStatsService: TrackerStatisticsEventSender + TrackerStatisticsRepository {} +impl StatsRepository { + pub fn new() -> Self { + Self { + stats: Arc::new(RwLock::new(TrackerStatistics::new())), + } + } + + pub async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { + self.stats.read().await + } + + pub async fn increase_tcp4_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp4_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp4_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp4_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp4_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp4_scrapes_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp6_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp6_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp6_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp6_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp6_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp6_scrapes_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_scrapes_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_announces_handled += 1; + drop(stats_lock); + } -impl TrackerStatsService for StatsTracker {} + pub async fn increase_udp6_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_scrapes_handled += 1; + drop(stats_lock); + } +} #[cfg(test)] -mod test { +mod tests { - mod event_sender { - use crate::statistics::{StatsTracker, TrackerStatisticsEvent, TrackerStatisticsEventSender}; + mod stats_tracker { + use crate::statistics::{StatsTracker, TrackerStatistics, TrackerStatisticsEvent}; #[tokio::test] - async fn should_not_send_any_event_when_statistics_are_disabled() { - let tracker_usage_statistics = false; - - let inactive_stats_tracker = StatsTracker::new_instance(tracker_usage_statistics); + async fn should_contain_the_tracker_statistics() { + let stats_tracker = StatsTracker::new(); - let result = inactive_stats_tracker.send_event(TrackerStatisticsEvent::Tcp4Announce).await; + let stats = stats_tracker.stats_repository.get_stats().await; - assert!(result.is_none()); + assert_eq!(stats.tcp4_announces_handled, TrackerStatistics::new().tcp4_announces_handled); } #[tokio::test] - async fn should_send_events_when_statistics_are_enabled() { - let tracker_usage_statistics = true; + async fn should_create_an_event_sender_to_send_statistical_events() { + let mut stats_tracker = StatsTracker::new(); - let active_stats_tracker = StatsTracker::new_instance(tracker_usage_statistics); + let event_sender = stats_tracker.run_event_listener(); - let result = active_stats_tracker.send_event(TrackerStatisticsEvent::Tcp4Announce).await; + let result = event_sender.send_event(TrackerStatisticsEvent::Udp4Connect).await; assert!(result.is_some()); } } + + mod event_handler { + use crate::statistics::{event_handler, StatsRepository, TrackerStatisticsEvent}; + + #[tokio::test] + async fn should_increase_the_tcp4_announces_counter_when_it_receives_a_tcp4_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp4Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp4Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp4_scrapes_counter_when_it_receives_a_tcp4_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp4Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_scrapes_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp4Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp6_announces_counter_when_it_receives_a_tcp6_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp6Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp6_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp6Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp6_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp6_scrapes_counter_when_it_receives_a_tcp6_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp6Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp6_scrapes_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp6Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp6_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_connections_counter_when_it_receives_a_udp4_connect_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp4Connect, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_announces_counter_when_it_receives_a_udp4_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp4Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_scrapes_counter_when_it_receives_a_udp4_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp4Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_scrapes_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_connections_counter_when_it_receives_a_udp6_connect_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp6Connect, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_announces_counter_when_it_receives_a_udp6_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp6Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_scrapes_counter_when_it_receives_a_udp6_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp6Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_scrapes_handled, 1); + } + } } diff --git a/src/tracker/tracker.rs b/src/tracker/tracker.rs index 5499eebeb..7e74a3554 100644 --- a/src/tracker/tracker.rs +++ b/src/tracker/tracker.rs @@ -12,7 +12,7 @@ use crate::databases::database::Database; use crate::mode::TrackerMode; use crate::peer::TorrentPeer; use crate::protocol::common::InfoHash; -use crate::statistics::{TrackerStatistics, TrackerStatisticsEvent, TrackerStatsService}; +use crate::statistics::{StatsRepository, TrackerStatistics, TrackerStatisticsEvent, TrackerStatisticsEventSender}; use crate::tracker::key; use crate::tracker::key::AuthKey; use crate::tracker::torrent::{TorrentEntry, TorrentError, TorrentStats}; @@ -24,12 +24,17 @@ pub struct TorrentTracker { keys: RwLock>, whitelist: RwLock>, torrents: RwLock>, - stats_tracker: Box, + stats_event_sender: Option>, + stats_repository: StatsRepository, database: Box, } impl TorrentTracker { - pub fn new(config: Arc, stats_tracker: Box) -> Result { + pub fn new( + config: Arc, + stats_event_sender: Option>, + stats_repository: StatsRepository, + ) -> Result { let database = database::connect_database(&config.db_driver, &config.db_path)?; Ok(TorrentTracker { @@ -38,7 +43,8 @@ impl TorrentTracker { keys: RwLock::new(std::collections::HashMap::new()), whitelist: RwLock::new(std::collections::HashSet::new()), torrents: RwLock::new(std::collections::BTreeMap::new()), - stats_tracker, + stats_event_sender, + stats_repository, database, }) } @@ -232,11 +238,14 @@ impl TorrentTracker { } pub async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { - self.stats_tracker.get_stats().await + self.stats_repository.get_stats().await } pub async fn send_stats_event(&self, event: TrackerStatisticsEvent) -> Option>> { - self.stats_tracker.send_event(event).await + match &self.stats_event_sender { + None => None, + Some(stats_event_sender) => stats_event_sender.send_event(event).await, + } } // Remove inactive peers and (optionally) peerless torrents diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 845b860e9..b962b1333 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -251,17 +251,11 @@ mod tests { use std::sync::Arc; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; - use async_trait::async_trait; - use tokio::sync::mpsc::error::SendError; - use tokio::sync::{RwLock, RwLockReadGuard}; use crate::mode::TrackerMode; use crate::peer::TorrentPeer; use crate::protocol::clock::{DefaultClock, Time}; - use crate::statistics::{ - StatsTracker, TrackerStatistics, TrackerStatisticsEvent, TrackerStatisticsEventSender, TrackerStatisticsRepository, - TrackerStatsService, - }; + use crate::statistics::StatsTracker; use crate::tracker::tracker::TorrentTracker; use crate::{Configuration, PeerId}; @@ -271,17 +265,22 @@ mod tests { fn initialized_public_tracker() -> Arc { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Public).into()); - Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_active_instance())).unwrap()) + initialized_tracker(configuration) } fn initialized_private_tracker() -> Arc { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Private).into()); - Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_active_instance())).unwrap()) + initialized_tracker(configuration) } fn initialized_whitelisted_tracker() -> Arc { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Listed).into()); - Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_active_instance())).unwrap()) + initialized_tracker(configuration) + } + + fn initialized_tracker(configuration: Arc) -> Arc { + let (stats_event_sender, stats_repository) = StatsTracker::new_active_instance(); + Arc::new(TorrentTracker::new(configuration, Some(stats_event_sender), stats_repository).unwrap()) } fn sample_ipv4_remote_addr() -> SocketAddr { @@ -338,43 +337,6 @@ mod tests { } } - struct TrackerStatsServiceMock { - stats: Arc>, - expected_event: Option, - } - - impl TrackerStatsServiceMock { - fn new() -> Self { - Self { - stats: Arc::new(RwLock::new(TrackerStatistics::new())), - expected_event: None, - } - } - - fn should_throw_event(&mut self, expected_event: TrackerStatisticsEvent) { - self.expected_event = Some(expected_event); - } - } - - #[async_trait] - impl TrackerStatisticsEventSender for TrackerStatsServiceMock { - async fn send_event(&self, _event: TrackerStatisticsEvent) -> Option>> { - if self.expected_event.is_some() { - assert_eq!(_event, *self.expected_event.as_ref().unwrap()); - } - None - } - } - - #[async_trait] - impl TrackerStatisticsRepository for TrackerStatsServiceMock { - async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { - self.stats.read().await - } - } - - impl TrackerStatsService for TrackerStatsServiceMock {} - struct TrackerConfigurationBuilder { configuration: Configuration, } @@ -404,12 +366,14 @@ mod tests { mod connect_request { + use std::future; use std::sync::Arc; use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId}; + use mockall::predicate::eq; - use super::{default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr, TrackerStatsServiceMock}; - use crate::statistics::TrackerStatisticsEvent; + use super::{default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr}; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_connect; @@ -461,12 +425,19 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp4Connect)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let client_socket_address = sample_ipv4_socket_address(); - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Connect); - let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let torrent_tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), + ); handle_connect(client_socket_address, &sample_connect_request(), torrent_tracker) .await .unwrap(); @@ -474,11 +445,17 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Connect); - - let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp6Connect)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); + + let torrent_tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), + ); handle_connect(sample_ipv6_remote_addr(), &sample_connect_request(), torrent_tracker) .await .unwrap(); @@ -558,6 +535,7 @@ mod tests { mod using_ipv4 { + use std::future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; @@ -565,15 +543,15 @@ mod tests { AnnounceInterval, AnnounceResponse, InfoHash as AquaticInfoHash, NumberOfPeers, PeerId as AquaticPeerId, Response, ResponsePeer, }; + use mockall::predicate::eq; - use crate::statistics::TrackerStatisticsEvent; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_announce; use crate::udp::handlers::tests::announce_request::AnnounceRequestBuilder; use crate::udp::handlers::tests::{ default_tracker_config, initialized_public_tracker, sample_ipv4_socket_address, TorrentPeerBuilder, - TrackerStatsServiceMock, }; use crate::PeerId; @@ -712,11 +690,18 @@ mod tests { #[tokio::test] async fn should_send_the_upd4_announce_event() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Announce); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp4Announce)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); + + let tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), + ); - let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); handle_announce( sample_ipv4_socket_address(), &AnnounceRequestBuilder::default().into(), @@ -775,6 +760,7 @@ mod tests { mod using_ipv6 { + use std::future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; @@ -782,15 +768,15 @@ mod tests { AnnounceInterval, AnnounceResponse, InfoHash as AquaticInfoHash, NumberOfPeers, PeerId as AquaticPeerId, Response, ResponsePeer, }; + use mockall::predicate::eq; - use crate::statistics::TrackerStatisticsEvent; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_announce; use crate::udp::handlers::tests::announce_request::AnnounceRequestBuilder; use crate::udp::handlers::tests::{ default_tracker_config, initialized_public_tracker, sample_ipv6_remote_addr, TorrentPeerBuilder, - TrackerStatsServiceMock, }; use crate::PeerId; @@ -936,11 +922,17 @@ mod tests { #[tokio::test] async fn should_send_the_upd6_announce_event() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Announce); - - let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp6Announce)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); + + let tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), + ); let remote_addr = sample_ipv6_remote_addr(); @@ -969,8 +961,9 @@ mod tests { #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_external_ip("::126.0.0.1").into()); + let (stats_event_sender, stats_repository) = StatsTracker::new_active_instance(); let tracker = - Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_active_instance())).unwrap()); + Arc::new(TorrentTracker::new(configuration, Some(stats_event_sender), stats_repository).unwrap()); let loopback_ipv4 = Ipv4Addr::new(127, 0, 0, 1); let loopback_ipv6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); @@ -1225,29 +1218,38 @@ mod tests { let info_hashes = vec![info_hash]; ScrapeRequest { - connection_id: into_connection_id(&make_connection_cookie(&remote_addr)), + connection_id: into_connection_id(&make_connection_cookie(remote_addr)), transaction_id: TransactionId(0i32), info_hashes, } } mod using_ipv4 { + use std::future; use std::sync::Arc; + use mockall::predicate::eq; + use super::sample_scrape_request; - use crate::statistics::TrackerStatisticsEvent; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::handlers::handle_scrape; - use crate::udp::handlers::tests::{default_tracker_config, sample_ipv4_remote_addr, TrackerStatsServiceMock}; + use crate::udp::handlers::tests::{default_tracker_config, sample_ipv4_remote_addr}; #[tokio::test] async fn should_send_the_upd4_scrape_event() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Scrape); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp4Scrape)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let remote_addr = sample_ipv4_remote_addr(); - let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), + ); handle_scrape(remote_addr, &sample_scrape_request(&remote_addr), tracker.clone()) .await @@ -1256,22 +1258,31 @@ mod tests { } mod using_ipv6 { + use std::future; use std::sync::Arc; + use mockall::predicate::eq; + use super::sample_scrape_request; - use crate::statistics::TrackerStatisticsEvent; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::handlers::handle_scrape; - use crate::udp::handlers::tests::{default_tracker_config, sample_ipv6_remote_addr, TrackerStatsServiceMock}; + use crate::udp::handlers::tests::{default_tracker_config, sample_ipv6_remote_addr}; #[tokio::test] async fn should_send_the_upd6_scrape_event() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Scrape); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp6Scrape)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let remote_addr = sample_ipv6_remote_addr(); - let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), + ); handle_scrape(remote_addr, &sample_scrape_request(&remote_addr), tracker.clone()) .await diff --git a/tests/udp.rs b/tests/udp.rs index b391b922f..abd16427b 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -51,10 +51,10 @@ mod udp_tracker_server { lazy_static::initialize(&static_time::TIME_AT_APP_START); // Initialize stats tracker - let stats_tracker = StatsTracker::new_active_instance(); + let (stats_event_sender, stats_repository) = StatsTracker::new_active_instance(); // Initialize Torrust tracker - let tracker = match TorrentTracker::new(configuration.clone(), Box::new(stats_tracker)) { + let tracker = match TorrentTracker::new(configuration.clone(), Some(stats_event_sender), stats_repository) { Ok(tracker) => Arc::new(tracker), Err(error) => { panic!("{}", error)