From a334f17e5f62bc9357cc520c7d4fd60a0a1b04b3 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 21 Oct 2022 14:33:56 +0100 Subject: [PATCH 01/13] refactor: extract struct StatsEventSender --- src/tracker/statistics.rs | 44 ++++++++++++++++++++++++++++----------- src/udp/handlers.rs | 15 ++++++++----- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index fb4e4c0fe..7d1d17c51 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -62,24 +62,31 @@ pub struct StatsTracker { } impl StatsTracker { - pub fn new_active_instance() -> Self { - Self::new_instance(true) - } + pub fn new_active_instance() -> (Self, StatsEventSender) { + let mut stats_tracker = Self { + channel_sender: None, + stats: Arc::new(RwLock::new(TrackerStatistics::new())), + }; - pub fn new_inactive_instance() -> Self { - Self::new_instance(false) + let stats_event_sender = stats_tracker.run_worker(); + + (stats_tracker, stats_event_sender) } - pub fn new_instance(active: bool) -> Self { - let mut stats_tracker = Self { + pub fn new_inactive_instance() -> Self { + Self { channel_sender: None, stats: Arc::new(RwLock::new(TrackerStatistics::new())), - }; + } + } - if active { - stats_tracker.run_worker(); + pub fn new_instance(active: bool) -> Self { + if !active { + return Self::new_inactive_instance(); } + let (stats_tracker, _stats_event_sender) = Self::new_active_instance(); + stats_tracker } @@ -90,11 +97,11 @@ impl StatsTracker { } } - pub fn run_worker(&mut self) { + pub fn run_worker(&mut self) -> StatsEventSender { let (tx, mut rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); // set send channel on stats_tracker - self.channel_sender = Some(tx); + self.channel_sender = Some(tx.clone()); let stats = self.stats.clone(); @@ -142,6 +149,8 @@ impl StatsTracker { drop(stats_lock); } }); + + StatsEventSender { sender: tx } } } @@ -161,6 +170,17 @@ impl TrackerStatisticsEventSender for StatsTracker { } } +pub struct StatsEventSender { + sender: Sender, +} + +#[async_trait] +impl TrackerStatisticsEventSender for StatsEventSender { + async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>> { + Some(self.sender.send(event).await) + } +} + #[async_trait] pub trait TrackerStatisticsRepository: Sync + Send { async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics>; diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 845b860e9..7992bcaf0 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -271,19 +271,24 @@ mod tests { fn initialized_public_tracker() -> Arc { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Public).into()); - Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_active_instance())).unwrap()) + initialized_tracker(configuration) } fn initialized_private_tracker() -> Arc { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Private).into()); - Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_active_instance())).unwrap()) + initialized_tracker(configuration) } fn initialized_whitelisted_tracker() -> Arc { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Listed).into()); - Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_active_instance())).unwrap()) + initialized_tracker(configuration) } + fn initialized_tracker(configuration: Arc) -> Arc { + let (stats_tracker, _stats_event_sender) = StatsTracker::new_active_instance(); + Arc::new(TorrentTracker::new(configuration, Box::new(stats_tracker)).unwrap()) + } + fn sample_ipv4_remote_addr() -> SocketAddr { sample_ipv4_socket_address() } @@ -969,8 +974,8 @@ mod tests { #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_external_ip("::126.0.0.1").into()); - let tracker = - Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_active_instance())).unwrap()); + let (stats_tracker, _stats_event_sender) = StatsTracker::new_active_instance(); + let tracker = Arc::new(TorrentTracker::new(configuration, Box::new(stats_tracker)).unwrap()); let loopback_ipv4 = Ipv4Addr::new(127, 0, 0, 1); let loopback_ipv6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); From b784442cb676edaeee6caa8941c2f050d0e9e897 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 21 Oct 2022 16:25:17 +0100 Subject: [PATCH 02/13] refactor: inject new struct StatsEventSender into TorrentTracker Parallel change. We are still using the old TrackerStatsService to send events. --- src/main.rs | 10 ++++- src/tracker/statistics.rs | 6 +-- src/tracker/tracker.rs | 10 ++++- src/udp/handlers.rs | 84 ++++++++++++++++++++++++++++++--------- tests/udp.rs | 5 ++- 5 files changed, 88 insertions(+), 27 deletions(-) diff --git a/src/main.rs b/src/main.rs index dcb92acb8..f995ba377 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,10 +24,16 @@ async fn main() { }; // Initialize stats tracker - let stats_tracker = StatsTracker::new_instance(config.tracker_usage_statistics); + let mut stats_tracker = StatsTracker::new_inactive_instance(); + + let mut stats_event_sender = None; + + if config.tracker_usage_statistics { + stats_event_sender = Some(stats_tracker.run_worker()); + } // Initialize Torrust tracker - let tracker = match TorrentTracker::new(config.clone(), Box::new(stats_tracker)) { + let tracker = match TorrentTracker::new(config.clone(), Box::new(stats_tracker), stats_event_sender) { Ok(tracker) => Arc::new(tracker), Err(error) => { panic!("{}", error) diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index 7d1d17c51..8b57d6bfe 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -62,7 +62,7 @@ pub struct StatsTracker { } impl StatsTracker { - pub fn new_active_instance() -> (Self, StatsEventSender) { + pub fn new_active_instance() -> (Self, Box) { let mut stats_tracker = Self { channel_sender: None, stats: Arc::new(RwLock::new(TrackerStatistics::new())), @@ -97,7 +97,7 @@ impl StatsTracker { } } - pub fn run_worker(&mut self) -> StatsEventSender { + pub fn run_worker(&mut self) -> Box { let (tx, mut rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); // set send channel on stats_tracker @@ -150,7 +150,7 @@ impl StatsTracker { } }); - StatsEventSender { sender: tx } + Box::new(StatsEventSender { sender: tx }) } } diff --git a/src/tracker/tracker.rs b/src/tracker/tracker.rs index 5499eebeb..b1d009077 100644 --- a/src/tracker/tracker.rs +++ b/src/tracker/tracker.rs @@ -12,7 +12,7 @@ use crate::databases::database::Database; use crate::mode::TrackerMode; use crate::peer::TorrentPeer; use crate::protocol::common::InfoHash; -use crate::statistics::{TrackerStatistics, TrackerStatisticsEvent, TrackerStatsService}; +use crate::statistics::{TrackerStatistics, TrackerStatisticsEvent, TrackerStatisticsEventSender, TrackerStatsService}; use crate::tracker::key; use crate::tracker::key::AuthKey; use crate::tracker::torrent::{TorrentEntry, TorrentError, TorrentStats}; @@ -25,11 +25,16 @@ pub struct TorrentTracker { whitelist: RwLock>, torrents: RwLock>, stats_tracker: Box, + _stats_event_sender: Option>, database: Box, } impl TorrentTracker { - pub fn new(config: Arc, stats_tracker: Box) -> Result { + pub fn new( + config: Arc, + stats_tracker: Box, + _stats_event_sender: Option>, + ) -> Result { let database = database::connect_database(&config.db_driver, &config.db_path)?; Ok(TorrentTracker { @@ -39,6 +44,7 @@ impl TorrentTracker { whitelist: RwLock::new(std::collections::HashSet::new()), torrents: RwLock::new(std::collections::BTreeMap::new()), stats_tracker, + _stats_event_sender, database, }) } diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 7992bcaf0..fc3e0968f 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -285,9 +285,9 @@ mod tests { } fn initialized_tracker(configuration: Arc) -> Arc { - let (stats_tracker, _stats_event_sender) = StatsTracker::new_active_instance(); - Arc::new(TorrentTracker::new(configuration, Box::new(stats_tracker)).unwrap()) - } + let (stats_tracker, stats_event_sender) = StatsTracker::new_active_instance(); + Arc::new(TorrentTracker::new(configuration, Box::new(stats_tracker), Some(stats_event_sender)).unwrap()) + } fn sample_ipv4_remote_addr() -> SocketAddr { sample_ipv4_socket_address() @@ -371,6 +371,30 @@ mod tests { } } + struct StatsEventSenderMock { + expected_event: Option, + } + + impl StatsEventSenderMock { + fn new() -> Self { + Self { expected_event: None } + } + + fn should_throw_event(&mut self, expected_event: TrackerStatisticsEvent) { + self.expected_event = Some(expected_event); + } + } + + #[async_trait] + impl TrackerStatisticsEventSender for StatsEventSenderMock { + async fn send_event(&self, _event: TrackerStatisticsEvent) -> Option>> { + if self.expected_event.is_some() { + assert_eq!(_event, *self.expected_event.as_ref().unwrap()); + } + None + } + } + #[async_trait] impl TrackerStatisticsRepository for TrackerStatsServiceMock { async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { @@ -413,7 +437,10 @@ mod tests { use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId}; - use super::{default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr, TrackerStatsServiceMock}; + use super::{ + default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr, StatsEventSenderMock, + TrackerStatsServiceMock, + }; use crate::statistics::TrackerStatisticsEvent; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; @@ -467,11 +494,13 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let stats_event_sender = Box::new(StatsEventSenderMock::new()); let client_socket_address = sample_ipv4_socket_address(); tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Connect); - let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let torrent_tracker = + Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap()); handle_connect(client_socket_address, &sample_connect_request(), torrent_tracker) .await .unwrap(); @@ -480,10 +509,12 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let stats_event_sender = Box::new(StatsEventSenderMock::new()); tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Connect); - let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let torrent_tracker = + Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap()); handle_connect(sample_ipv6_remote_addr(), &sample_connect_request(), torrent_tracker) .await .unwrap(); @@ -577,8 +608,8 @@ mod tests { use crate::udp::handle_announce; use crate::udp::handlers::tests::announce_request::AnnounceRequestBuilder; use crate::udp::handlers::tests::{ - default_tracker_config, initialized_public_tracker, sample_ipv4_socket_address, TorrentPeerBuilder, - TrackerStatsServiceMock, + default_tracker_config, initialized_public_tracker, sample_ipv4_socket_address, StatsEventSenderMock, + TorrentPeerBuilder, TrackerStatsServiceMock, }; use crate::PeerId; @@ -718,10 +749,13 @@ mod tests { #[tokio::test] async fn should_send_the_upd4_announce_event() { let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let stats_event_sender = Box::new(StatsEventSenderMock::new()); tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Announce); - let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), + ); handle_announce( sample_ipv4_socket_address(), &AnnounceRequestBuilder::default().into(), @@ -794,8 +828,8 @@ mod tests { use crate::udp::handle_announce; use crate::udp::handlers::tests::announce_request::AnnounceRequestBuilder; use crate::udp::handlers::tests::{ - default_tracker_config, initialized_public_tracker, sample_ipv6_remote_addr, TorrentPeerBuilder, - TrackerStatsServiceMock, + default_tracker_config, initialized_public_tracker, sample_ipv6_remote_addr, StatsEventSenderMock, + TorrentPeerBuilder, TrackerStatsServiceMock, }; use crate::PeerId; @@ -942,10 +976,13 @@ mod tests { #[tokio::test] async fn should_send_the_upd6_announce_event() { let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let stats_event_sender = Box::new(StatsEventSenderMock::new()); tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Announce); - let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), + ); let remote_addr = sample_ipv6_remote_addr(); @@ -974,8 +1011,9 @@ mod tests { #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_external_ip("::126.0.0.1").into()); - let (stats_tracker, _stats_event_sender) = StatsTracker::new_active_instance(); - let tracker = Arc::new(TorrentTracker::new(configuration, Box::new(stats_tracker)).unwrap()); + let (stats_tracker, stats_event_sender) = StatsTracker::new_active_instance(); + let tracker = + Arc::new(TorrentTracker::new(configuration, Box::new(stats_tracker), Some(stats_event_sender)).unwrap()); let loopback_ipv4 = Ipv4Addr::new(127, 0, 0, 1); let loopback_ipv6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); @@ -1243,16 +1281,21 @@ mod tests { use crate::statistics::TrackerStatisticsEvent; use crate::tracker::tracker::TorrentTracker; use crate::udp::handlers::handle_scrape; - use crate::udp::handlers::tests::{default_tracker_config, sample_ipv4_remote_addr, TrackerStatsServiceMock}; + use crate::udp::handlers::tests::{ + default_tracker_config, sample_ipv4_remote_addr, StatsEventSenderMock, TrackerStatsServiceMock, + }; #[tokio::test] async fn should_send_the_upd4_scrape_event() { let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let stats_event_sender = Box::new(StatsEventSenderMock::new()); tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Scrape); let remote_addr = sample_ipv4_remote_addr(); - let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), + ); handle_scrape(remote_addr, &sample_scrape_request(&remote_addr), tracker.clone()) .await @@ -1267,16 +1310,21 @@ mod tests { use crate::statistics::TrackerStatisticsEvent; use crate::tracker::tracker::TorrentTracker; use crate::udp::handlers::handle_scrape; - use crate::udp::handlers::tests::{default_tracker_config, sample_ipv6_remote_addr, TrackerStatsServiceMock}; + use crate::udp::handlers::tests::{ + default_tracker_config, sample_ipv6_remote_addr, StatsEventSenderMock, TrackerStatsServiceMock, + }; #[tokio::test] async fn should_send_the_upd6_scrape_event() { let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let stats_event_sender = Box::new(StatsEventSenderMock::new()); tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Scrape); let remote_addr = sample_ipv6_remote_addr(); - let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + let tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), + ); handle_scrape(remote_addr, &sample_scrape_request(&remote_addr), tracker.clone()) .await diff --git a/tests/udp.rs b/tests/udp.rs index b391b922f..d2b500d5a 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -51,10 +51,11 @@ mod udp_tracker_server { lazy_static::initialize(&static_time::TIME_AT_APP_START); // Initialize stats tracker - let stats_tracker = StatsTracker::new_active_instance(); + let (stats_tracker, stats_event_sender) = StatsTracker::new_active_instance(); // Initialize Torrust tracker - let tracker = match TorrentTracker::new(configuration.clone(), Box::new(stats_tracker)) { + let tracker = match TorrentTracker::new(configuration.clone(), Box::new(stats_tracker), Some(stats_event_sender)) + { Ok(tracker) => Arc::new(tracker), Err(error) => { panic!("{}", error) From 720a5841c943364bfd99fba2337ea024324b6293 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 21 Oct 2022 16:40:05 +0100 Subject: [PATCH 03/13] refactor: use StatsEventSender to send events instead of StatsTracker. --- src/tracker/tracker.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/tracker/tracker.rs b/src/tracker/tracker.rs index b1d009077..80f6e549d 100644 --- a/src/tracker/tracker.rs +++ b/src/tracker/tracker.rs @@ -25,7 +25,7 @@ pub struct TorrentTracker { whitelist: RwLock>, torrents: RwLock>, stats_tracker: Box, - _stats_event_sender: Option>, + stats_event_sender: Option>, database: Box, } @@ -33,7 +33,7 @@ impl TorrentTracker { pub fn new( config: Arc, stats_tracker: Box, - _stats_event_sender: Option>, + stats_event_sender: Option>, ) -> Result { let database = database::connect_database(&config.db_driver, &config.db_path)?; @@ -44,7 +44,7 @@ impl TorrentTracker { whitelist: RwLock::new(std::collections::HashSet::new()), torrents: RwLock::new(std::collections::BTreeMap::new()), stats_tracker, - _stats_event_sender, + stats_event_sender, database, }) } @@ -242,7 +242,10 @@ impl TorrentTracker { } pub async fn send_stats_event(&self, event: TrackerStatisticsEvent) -> Option>> { - self.stats_tracker.send_event(event).await + match &self.stats_event_sender { + None => None, + Some(stats_event_sender) => stats_event_sender.send_event(event).await, + } } // Remove inactive peers and (optionally) peerless torrents From 5b73d801c99f6f2d92125ad46c2cc8a39b4c68c1 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 21 Oct 2022 17:08:24 +0100 Subject: [PATCH 04/13] refactor: removed unused code and extract fn - The StatsTracker does not need anymore the channel sender. - A setup function for statistics was extracted. --- src/lib.rs | 1 + src/main.rs | 12 +++------- src/stats.rs | 36 ++++++++++++++++++++++++++++ src/tracker/statistics.rs | 50 +-------------------------------------- 4 files changed, 41 insertions(+), 58 deletions(-) create mode 100644 src/stats.rs diff --git a/src/lib.rs b/src/lib.rs index 5f003b5fd..cf830f108 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod jobs; pub mod logging; pub mod protocol; pub mod setup; +pub mod stats; pub mod tracker; pub mod udp; diff --git a/src/main.rs b/src/main.rs index f995ba377..c21aa1793 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use log::info; -use torrust_tracker::tracker::statistics::StatsTracker; +use torrust_tracker::stats::setup_statistics; use torrust_tracker::tracker::tracker::TorrentTracker; use torrust_tracker::{ephemeral_instance_keys, logging, setup, static_time, Configuration}; @@ -23,14 +23,8 @@ async fn main() { } }; - // Initialize stats tracker - let mut stats_tracker = StatsTracker::new_inactive_instance(); - - let mut stats_event_sender = None; - - if config.tracker_usage_statistics { - stats_event_sender = Some(stats_tracker.run_worker()); - } + // Initialize statistics:wq + let (stats_tracker, stats_event_sender) = setup_statistics(config.tracker_usage_statistics); // Initialize Torrust tracker let tracker = match TorrentTracker::new(config.clone(), Box::new(stats_tracker), stats_event_sender) { diff --git a/src/stats.rs b/src/stats.rs new file mode 100644 index 000000000..d459d8f5b --- /dev/null +++ b/src/stats.rs @@ -0,0 +1,36 @@ +use crate::statistics::{StatsTracker, TrackerStatisticsEventSender}; + +pub fn setup_statistics(tracker_usage_statistics: bool) -> (StatsTracker, Option>) { + let mut stats_tracker = StatsTracker::new_inactive_instance(); + + let mut stats_event_sender = None; + + if tracker_usage_statistics { + stats_event_sender = Some(stats_tracker.run_worker()); + } + + (stats_tracker, stats_event_sender) +} + +#[cfg(test)] +mod test { + use crate::stats::setup_statistics; + + #[tokio::test] + async fn should_not_send_any_event_when_statistics_are_disabled() { + let tracker_usage_statistics = false; + + let (_stats_tracker, stats_event_sender) = setup_statistics(tracker_usage_statistics); + + assert!(stats_event_sender.is_none()); + } + + #[tokio::test] + async fn should_send_events_when_statistics_are_enabled() { + let tracker_usage_statistics = true; + + let (_stats_tracker, stats_event_sender) = setup_statistics(tracker_usage_statistics); + + assert!(stats_event_sender.is_some()); + } +} diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index 8b57d6bfe..a89b5d4cc 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -57,14 +57,12 @@ impl TrackerStatistics { } pub struct StatsTracker { - channel_sender: Option>, pub stats: Arc>, } impl StatsTracker { pub fn new_active_instance() -> (Self, Box) { let mut stats_tracker = Self { - channel_sender: None, stats: Arc::new(RwLock::new(TrackerStatistics::new())), }; @@ -75,7 +73,6 @@ impl StatsTracker { pub fn new_inactive_instance() -> Self { Self { - channel_sender: None, stats: Arc::new(RwLock::new(TrackerStatistics::new())), } } @@ -92,7 +89,6 @@ impl StatsTracker { pub fn new() -> Self { Self { - channel_sender: None, stats: Arc::new(RwLock::new(TrackerStatistics::new())), } } @@ -100,9 +96,6 @@ impl StatsTracker { pub fn run_worker(&mut self) -> Box { let (tx, mut rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); - // set send channel on stats_tracker - self.channel_sender = Some(tx.clone()); - let stats = self.stats.clone(); tokio::spawn(async move { @@ -159,17 +152,6 @@ pub trait TrackerStatisticsEventSender: Sync + Send { async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>>; } -#[async_trait] -impl TrackerStatisticsEventSender for StatsTracker { - async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>> { - if let Some(tx) = &self.channel_sender { - Some(tx.send(event).await) - } else { - None - } - } -} - pub struct StatsEventSender { sender: Sender, } @@ -193,36 +175,6 @@ impl TrackerStatisticsRepository for StatsTracker { } } -pub trait TrackerStatsService: TrackerStatisticsEventSender + TrackerStatisticsRepository {} +pub trait TrackerStatsService: TrackerStatisticsRepository {} impl TrackerStatsService for StatsTracker {} - -#[cfg(test)] -mod test { - - mod event_sender { - use crate::statistics::{StatsTracker, TrackerStatisticsEvent, TrackerStatisticsEventSender}; - - #[tokio::test] - async fn should_not_send_any_event_when_statistics_are_disabled() { - let tracker_usage_statistics = false; - - let inactive_stats_tracker = StatsTracker::new_instance(tracker_usage_statistics); - - let result = inactive_stats_tracker.send_event(TrackerStatisticsEvent::Tcp4Announce).await; - - assert!(result.is_none()); - } - - #[tokio::test] - async fn should_send_events_when_statistics_are_enabled() { - let tracker_usage_statistics = true; - - let active_stats_tracker = StatsTracker::new_instance(tracker_usage_statistics); - - let result = active_stats_tracker.send_event(TrackerStatisticsEvent::Tcp4Announce).await; - - assert!(result.is_some()); - } - } -} From daec1fed0553e397ce0aa9823f26f1b6ed42a249 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 21 Oct 2022 17:23:46 +0100 Subject: [PATCH 05/13] refactor: extract stats event_listener --- src/tracker/statistics.rs | 97 ++++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index a89b5d4cc..66aea0169 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use async_trait::async_trait; +use log::debug; use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, RwLock, RwLockReadGuard}; const CHANNEL_BUFFER_SIZE: usize = 65_535; @@ -94,59 +95,63 @@ impl StatsTracker { } pub fn run_worker(&mut self) -> Box { - let (tx, mut rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); + let (tx, rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); let stats = self.stats.clone(); - tokio::spawn(async move { - while let Some(event) = rx.recv().await { - let mut stats_lock = stats.write().await; - - match event { - TrackerStatisticsEvent::Tcp4Announce => { - stats_lock.tcp4_announces_handled += 1; - stats_lock.tcp4_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp4Scrape => { - stats_lock.tcp4_scrapes_handled += 1; - stats_lock.tcp4_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp6Announce => { - stats_lock.tcp6_announces_handled += 1; - stats_lock.tcp6_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp6Scrape => { - stats_lock.tcp6_scrapes_handled += 1; - stats_lock.tcp6_connections_handled += 1; - } - TrackerStatisticsEvent::Udp4Connect => { - stats_lock.udp4_connections_handled += 1; - } - TrackerStatisticsEvent::Udp4Announce => { - stats_lock.udp4_announces_handled += 1; - } - TrackerStatisticsEvent::Udp4Scrape => { - stats_lock.udp4_scrapes_handled += 1; - } - TrackerStatisticsEvent::Udp6Connect => { - stats_lock.udp6_connections_handled += 1; - } - TrackerStatisticsEvent::Udp6Announce => { - stats_lock.udp6_announces_handled += 1; - } - TrackerStatisticsEvent::Udp6Scrape => { - stats_lock.udp6_scrapes_handled += 1; - } - } - - drop(stats_lock); - } - }); + tokio::spawn(async move { event_listener(rx, stats).await }); Box::new(StatsEventSender { sender: tx }) } } +async fn event_listener(mut rx: Receiver, stats: Arc>) { + while let Some(event) = rx.recv().await { + let mut stats_lock = stats.write().await; + + match event { + TrackerStatisticsEvent::Tcp4Announce => { + stats_lock.tcp4_announces_handled += 1; + stats_lock.tcp4_connections_handled += 1; + } + TrackerStatisticsEvent::Tcp4Scrape => { + stats_lock.tcp4_scrapes_handled += 1; + stats_lock.tcp4_connections_handled += 1; + } + TrackerStatisticsEvent::Tcp6Announce => { + stats_lock.tcp6_announces_handled += 1; + stats_lock.tcp6_connections_handled += 1; + } + TrackerStatisticsEvent::Tcp6Scrape => { + stats_lock.tcp6_scrapes_handled += 1; + stats_lock.tcp6_connections_handled += 1; + } + TrackerStatisticsEvent::Udp4Connect => { + stats_lock.udp4_connections_handled += 1; + } + TrackerStatisticsEvent::Udp4Announce => { + stats_lock.udp4_announces_handled += 1; + } + TrackerStatisticsEvent::Udp4Scrape => { + stats_lock.udp4_scrapes_handled += 1; + } + TrackerStatisticsEvent::Udp6Connect => { + stats_lock.udp6_connections_handled += 1; + } + TrackerStatisticsEvent::Udp6Announce => { + stats_lock.udp6_announces_handled += 1; + } + TrackerStatisticsEvent::Udp6Scrape => { + stats_lock.udp6_scrapes_handled += 1; + } + } + + debug!("stats: {:?}", stats_lock); + + drop(stats_lock); + } +} + #[async_trait] pub trait TrackerStatisticsEventSender: Sync + Send { async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>>; From a2b16ff7b2d3bdd97d994cf19cc33cb6b8b4be62 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 21 Oct 2022 17:35:22 +0100 Subject: [PATCH 06/13] fix: tests using mock for old service I only change test to use the new mock. I realized test were wrong becuase they do not fail when no event is sent. THey only fail when the event sent is not the rigth type. --- src/udp/handlers.rs | 56 ++++++++++++++++----------------------------- 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index fc3e0968f..ba545da1b 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -345,29 +345,13 @@ mod tests { struct TrackerStatsServiceMock { stats: Arc>, - expected_event: Option, } impl TrackerStatsServiceMock { fn new() -> Self { Self { stats: Arc::new(RwLock::new(TrackerStatistics::new())), - expected_event: None, - } - } - - fn should_throw_event(&mut self, expected_event: TrackerStatisticsEvent) { - self.expected_event = Some(expected_event); - } - } - - #[async_trait] - impl TrackerStatisticsEventSender for TrackerStatsServiceMock { - async fn send_event(&self, _event: TrackerStatisticsEvent) -> Option>> { - if self.expected_event.is_some() { - assert_eq!(_event, *self.expected_event.as_ref().unwrap()); } - None } } @@ -387,9 +371,9 @@ mod tests { #[async_trait] impl TrackerStatisticsEventSender for StatsEventSenderMock { - async fn send_event(&self, _event: TrackerStatisticsEvent) -> Option>> { + async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>> { if self.expected_event.is_some() { - assert_eq!(_event, *self.expected_event.as_ref().unwrap()); + assert_eq!(event, *self.expected_event.as_ref().unwrap()); } None } @@ -493,11 +477,11 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let stats_event_sender = Box::new(StatsEventSenderMock::new()); + let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); let client_socket_address = sample_ipv4_socket_address(); - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Connect); + stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp4Connect); let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap()); @@ -508,10 +492,10 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let stats_event_sender = Box::new(StatsEventSenderMock::new()); + let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Connect); + stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp6Connect); let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap()); @@ -748,10 +732,10 @@ mod tests { #[tokio::test] async fn should_send_the_upd4_announce_event() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let stats_event_sender = Box::new(StatsEventSenderMock::new()); + let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Announce); + stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp4Announce); let tracker = Arc::new( TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), @@ -975,10 +959,10 @@ mod tests { #[tokio::test] async fn should_send_the_upd6_announce_event() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let stats_event_sender = Box::new(StatsEventSenderMock::new()); + let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Announce); + stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp6Announce); let tracker = Arc::new( TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), @@ -1287,10 +1271,10 @@ mod tests { #[tokio::test] async fn should_send_the_upd4_scrape_event() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let stats_event_sender = Box::new(StatsEventSenderMock::new()); + let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Scrape); + stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp4Scrape); let remote_addr = sample_ipv4_remote_addr(); let tracker = Arc::new( @@ -1316,10 +1300,10 @@ mod tests { #[tokio::test] async fn should_send_the_upd6_scrape_event() { - let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let stats_event_sender = Box::new(StatsEventSenderMock::new()); + let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Scrape); + stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp6Scrape); let remote_addr = sample_ipv6_remote_addr(); let tracker = Arc::new( From bc3df5a74b2fc66155bd88aec1b4fad1942da379 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 21 Oct 2022 18:29:18 +0100 Subject: [PATCH 07/13] fix typo --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index c21aa1793..bfcce014b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ async fn main() { } }; - // Initialize statistics:wq + // Initialize statistics let (stats_tracker, stats_event_sender) = setup_statistics(config.tracker_usage_statistics); // Initialize Torrust tracker From e5701103788f79c61bb175915a6467618be096a9 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 24 Oct 2022 16:33:46 +0100 Subject: [PATCH 08/13] test: add new dev dependency mockall It will be used to mock a trait in tests. --- Cargo.lock | 106 +++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 3 ++ 2 files changed, 109 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 0a60397f9..ce66efa09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,6 +479,12 @@ dependencies = [ "syn", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.9.0" @@ -510,6 +516,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "either" version = "1.8.0" @@ -570,6 +582,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -600,6 +621,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "frunk" version = "0.4.0" @@ -1008,6 +1035,15 @@ dependencies = [ "syn", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.4" @@ -1208,6 +1244,33 @@ dependencies = [ "windows-sys 0.36.1", ] +[[package]] +name = "mockall" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50e4a1c770583dac7ab5e2f6c139153b783a53a1bbee9729613f193e59828326" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "832663583d5fa284ca8810bf7015e46c9fff9622d3cf34bd1eea5003fec06dd0" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "multipart" version = "0.18.0" @@ -1349,6 +1412,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "num-bigint" version = "0.3.3" @@ -1626,6 +1695,36 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "predicates" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5aab5be6e4732b473071984b3164dbbfb7a3674d30ea5ff44410b6bcd960c3c" +dependencies = [ + "difflib", + "float-cmp", + "itertools", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da1c2388b1513e1b605fcec39a95e0a9e8ef088f71443ef37099fa9ae6673fcb" + +[[package]] +name = "predicates-tree" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d86de6de25020a36c6d3643a86d9a6a9f552107c0559c60ea03551b5e16c032" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -2285,6 +2384,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507e9898683b6c43a9aa55b64259b721b52ba226e0f3779137e50ad114a4c90b" + [[package]] name = "textwrap" version = "0.11.0" @@ -2509,6 +2614,7 @@ dependencies = [ "hex", "lazy_static", "log", + "mockall", "openssl", "percent-encoding", "r2d2", diff --git a/Cargo.toml b/Cargo.toml index c7e3790bb..18188565c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,3 +59,6 @@ async-trait = "0.1" aquatic_udp_protocol = "0.2" uuid = { version = "1", features = ["v4"] } + +[dev-dependencies] +mockall = "0.11.3" From 8874032074c12aecb1f250e054caac4d8c9c63f4 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Mon, 24 Oct 2022 16:34:55 +0100 Subject: [PATCH 09/13] fix: weak tests As I explained here: https://github.com/torrust/torrust-tracker/pull/103#issue-1418647900 The mock for the trait TrackerStatisticsEventSender did not work completely weel becuase it only checked for the rigth type of the triggered event but It did not check if the event was sent. I finally used a mockinf library becuase I do not know how to mock trait that uses a mutable reference to 'self'. I need to store wether the event was sent or not and I do not know how to do that without changing the function signature making it mutable. --- src/tracker/statistics.rs | 3 + src/udp/handlers.rs | 135 +++++++++++++++++++------------------- 2 files changed, 72 insertions(+), 66 deletions(-) diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index 66aea0169..73042ff3e 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -2,6 +2,8 @@ use std::sync::Arc; use async_trait::async_trait; use log::debug; +#[cfg(test)] +use mockall::{automock, predicate::*}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, RwLock, RwLockReadGuard}; @@ -153,6 +155,7 @@ async fn event_listener(mut rx: Receiver, stats: Arc Option>>; } diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index ba545da1b..35d2e0247 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -252,16 +252,12 @@ mod tests { use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; use async_trait::async_trait; - use tokio::sync::mpsc::error::SendError; use tokio::sync::{RwLock, RwLockReadGuard}; use crate::mode::TrackerMode; use crate::peer::TorrentPeer; use crate::protocol::clock::{DefaultClock, Time}; - use crate::statistics::{ - StatsTracker, TrackerStatistics, TrackerStatisticsEvent, TrackerStatisticsEventSender, TrackerStatisticsRepository, - TrackerStatsService, - }; + use crate::statistics::{StatsTracker, TrackerStatistics, TrackerStatisticsRepository, TrackerStatsService}; use crate::tracker::tracker::TorrentTracker; use crate::{Configuration, PeerId}; @@ -355,30 +351,6 @@ mod tests { } } - struct StatsEventSenderMock { - expected_event: Option, - } - - impl StatsEventSenderMock { - fn new() -> Self { - Self { expected_event: None } - } - - fn should_throw_event(&mut self, expected_event: TrackerStatisticsEvent) { - self.expected_event = Some(expected_event); - } - } - - #[async_trait] - impl TrackerStatisticsEventSender for StatsEventSenderMock { - async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>> { - if self.expected_event.is_some() { - assert_eq!(event, *self.expected_event.as_ref().unwrap()); - } - None - } - } - #[async_trait] impl TrackerStatisticsRepository for TrackerStatsServiceMock { async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { @@ -417,15 +389,14 @@ mod tests { mod connect_request { + use std::future; use std::sync::Arc; use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId}; + use mockall::predicate::eq; - use super::{ - default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr, StatsEventSenderMock, - TrackerStatsServiceMock, - }; - use crate::statistics::TrackerStatisticsEvent; + use super::{default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr, TrackerStatsServiceMock}; + use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_connect; @@ -478,10 +449,15 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp4Connect)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let client_socket_address = sample_ipv4_socket_address(); - stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp4Connect); let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap()); @@ -493,9 +469,13 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - - stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp6Connect); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp6Connect)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap()); @@ -578,6 +558,7 @@ mod tests { mod using_ipv4 { + use std::future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; @@ -585,15 +566,16 @@ mod tests { AnnounceInterval, AnnounceResponse, InfoHash as AquaticInfoHash, NumberOfPeers, PeerId as AquaticPeerId, Response, ResponsePeer, }; + use mockall::predicate::eq; - use crate::statistics::TrackerStatisticsEvent; + use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_announce; use crate::udp::handlers::tests::announce_request::AnnounceRequestBuilder; use crate::udp::handlers::tests::{ - default_tracker_config, initialized_public_tracker, sample_ipv4_socket_address, StatsEventSenderMock, - TorrentPeerBuilder, TrackerStatsServiceMock, + default_tracker_config, initialized_public_tracker, sample_ipv4_socket_address, TorrentPeerBuilder, + TrackerStatsServiceMock, }; use crate::PeerId; @@ -733,13 +715,18 @@ mod tests { #[tokio::test] async fn should_send_the_upd4_announce_event() { let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - - stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp4Announce); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp4Announce)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let tracker = Arc::new( TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), ); + handle_announce( sample_ipv4_socket_address(), &AnnounceRequestBuilder::default().into(), @@ -798,6 +785,7 @@ mod tests { mod using_ipv6 { + use std::future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; @@ -805,15 +793,16 @@ mod tests { AnnounceInterval, AnnounceResponse, InfoHash as AquaticInfoHash, NumberOfPeers, PeerId as AquaticPeerId, Response, ResponsePeer, }; + use mockall::predicate::eq; - use crate::statistics::TrackerStatisticsEvent; + use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_announce; use crate::udp::handlers::tests::announce_request::AnnounceRequestBuilder; use crate::udp::handlers::tests::{ - default_tracker_config, initialized_public_tracker, sample_ipv6_remote_addr, StatsEventSenderMock, - TorrentPeerBuilder, TrackerStatsServiceMock, + default_tracker_config, initialized_public_tracker, sample_ipv6_remote_addr, TorrentPeerBuilder, + TrackerStatsServiceMock, }; use crate::PeerId; @@ -960,9 +949,13 @@ mod tests { #[tokio::test] async fn should_send_the_upd6_announce_event() { let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - - stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp6Announce); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp6Announce)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let tracker = Arc::new( TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), @@ -1252,29 +1245,34 @@ mod tests { let info_hashes = vec![info_hash]; ScrapeRequest { - connection_id: into_connection_id(&make_connection_cookie(&remote_addr)), + connection_id: into_connection_id(&make_connection_cookie(remote_addr)), transaction_id: TransactionId(0i32), info_hashes, } } mod using_ipv4 { + use std::future; use std::sync::Arc; + use mockall::predicate::eq; + use super::sample_scrape_request; - use crate::statistics::TrackerStatisticsEvent; + use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::handlers::handle_scrape; - use crate::udp::handlers::tests::{ - default_tracker_config, sample_ipv4_remote_addr, StatsEventSenderMock, TrackerStatsServiceMock, - }; + use crate::udp::handlers::tests::{default_tracker_config, sample_ipv4_remote_addr, TrackerStatsServiceMock}; #[tokio::test] async fn should_send_the_upd4_scrape_event() { let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - - stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp4Scrape); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp4Scrape)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let remote_addr = sample_ipv4_remote_addr(); let tracker = Arc::new( @@ -1288,22 +1286,27 @@ mod tests { } mod using_ipv6 { + use std::future; use std::sync::Arc; + use mockall::predicate::eq; + use super::sample_scrape_request; - use crate::statistics::TrackerStatisticsEvent; + use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::handlers::handle_scrape; - use crate::udp::handlers::tests::{ - default_tracker_config, sample_ipv6_remote_addr, StatsEventSenderMock, TrackerStatsServiceMock, - }; + use crate::udp::handlers::tests::{default_tracker_config, sample_ipv6_remote_addr, TrackerStatsServiceMock}; #[tokio::test] async fn should_send_the_upd6_scrape_event() { let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); - let mut stats_event_sender = Box::new(StatsEventSenderMock::new()); - - stats_event_sender.should_throw_event(TrackerStatisticsEvent::Udp6Scrape); + let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); + stats_event_sender_mock + .expect_send_event() + .with(eq(TrackerStatisticsEvent::Udp6Scrape)) + .times(1) + .returning(|_| Box::pin(future::ready(Some(Ok(()))))); + let stats_event_sender = Box::new(stats_event_sender_mock); let remote_addr = sample_ipv6_remote_addr(); let tracker = Arc::new( From d3297cf0b9011933cd5be0018db5f6e4f763c8a4 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Oct 2022 10:31:45 +0100 Subject: [PATCH 10/13] refactor: extract StatsRepository --- src/main.rs | 4 +- src/stats.rs | 16 ++++---- src/tracker/statistics.rs | 81 ++++++++++++++++++++------------------- src/tracker/tracker.rs | 10 ++--- src/udp/handlers.rs | 75 +++++++++++------------------------- tests/udp.rs | 5 +-- 6 files changed, 81 insertions(+), 110 deletions(-) diff --git a/src/main.rs b/src/main.rs index bfcce014b..08061cd7b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,10 +24,10 @@ async fn main() { }; // Initialize statistics - let (stats_tracker, stats_event_sender) = setup_statistics(config.tracker_usage_statistics); + let (stats_event_sender, stats_repository) = setup_statistics(config.tracker_usage_statistics); // Initialize Torrust tracker - let tracker = match TorrentTracker::new(config.clone(), Box::new(stats_tracker), stats_event_sender) { + let tracker = match TorrentTracker::new(config.clone(), stats_event_sender, stats_repository) { Ok(tracker) => Arc::new(tracker), Err(error) => { panic!("{}", error) diff --git a/src/stats.rs b/src/stats.rs index d459d8f5b..1f387a084 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,15 +1,15 @@ -use crate::statistics::{StatsTracker, TrackerStatisticsEventSender}; - -pub fn setup_statistics(tracker_usage_statistics: bool) -> (StatsTracker, Option>) { - let mut stats_tracker = StatsTracker::new_inactive_instance(); +use crate::statistics::{StatsRepository, StatsTracker, TrackerStatisticsEventSender}; +pub fn setup_statistics(tracker_usage_statistics: bool) -> (Option>, StatsRepository) { let mut stats_event_sender = None; + let mut stats_tracker = StatsTracker::new(); + if tracker_usage_statistics { - stats_event_sender = Some(stats_tracker.run_worker()); + stats_event_sender = Some(stats_tracker.run_event_listener()); } - (stats_tracker, stats_event_sender) + (stats_event_sender, stats_tracker.stats_repository) } #[cfg(test)] @@ -20,7 +20,7 @@ mod test { async fn should_not_send_any_event_when_statistics_are_disabled() { let tracker_usage_statistics = false; - let (_stats_tracker, stats_event_sender) = setup_statistics(tracker_usage_statistics); + let (stats_event_sender, _stats_repository) = setup_statistics(tracker_usage_statistics); assert!(stats_event_sender.is_none()); } @@ -29,7 +29,7 @@ mod test { async fn should_send_events_when_statistics_are_enabled() { let tracker_usage_statistics = true; - let (_stats_tracker, stats_event_sender) = setup_statistics(tracker_usage_statistics); + let (stats_event_sender, _stats_repository) = setup_statistics(tracker_usage_statistics); assert!(stats_event_sender.is_some()); } diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index 73042ff3e..8f203c36e 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -40,6 +40,12 @@ pub struct TrackerStatistics { pub udp6_scrapes_handled: u64, } +impl Default for TrackerStatistics { + fn default() -> Self { + Self::new() + } +} + impl TrackerStatistics { pub fn new() -> Self { Self { @@ -60,56 +66,44 @@ impl TrackerStatistics { } pub struct StatsTracker { - pub stats: Arc>, + pub stats_repository: StatsRepository, } -impl StatsTracker { - pub fn new_active_instance() -> (Self, Box) { - let mut stats_tracker = Self { - stats: Arc::new(RwLock::new(TrackerStatistics::new())), - }; - - let stats_event_sender = stats_tracker.run_worker(); - - (stats_tracker, stats_event_sender) - } - - pub fn new_inactive_instance() -> Self { - Self { - stats: Arc::new(RwLock::new(TrackerStatistics::new())), - } +impl Default for StatsTracker { + fn default() -> Self { + Self::new() } +} - pub fn new_instance(active: bool) -> Self { - if !active { - return Self::new_inactive_instance(); - } +impl StatsTracker { + pub fn new_active_instance() -> (Box, StatsRepository) { + let mut stats_tracker = Self::new(); - let (stats_tracker, _stats_event_sender) = Self::new_active_instance(); + let stats_event_sender = stats_tracker.run_event_listener(); - stats_tracker + (stats_event_sender, stats_tracker.stats_repository) } pub fn new() -> Self { Self { - stats: Arc::new(RwLock::new(TrackerStatistics::new())), + stats_repository: StatsRepository::new(), } } - pub fn run_worker(&mut self) -> Box { - let (tx, rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); + pub fn run_event_listener(&mut self) -> Box { + let (sender, receiver) = mpsc::channel::(CHANNEL_BUFFER_SIZE); - let stats = self.stats.clone(); + let stats_repository = self.stats_repository.clone(); - tokio::spawn(async move { event_listener(rx, stats).await }); + tokio::spawn(async move { event_listener(receiver, stats_repository).await }); - Box::new(StatsEventSender { sender: tx }) + Box::new(StatsEventSender { sender }) } } -async fn event_listener(mut rx: Receiver, stats: Arc>) { - while let Some(event) = rx.recv().await { - let mut stats_lock = stats.write().await; +async fn event_listener(mut receiver: Receiver, stats_repository: StatsRepository) { + while let Some(event) = receiver.recv().await { + let mut stats_lock = stats_repository.stats.write().await; match event { TrackerStatisticsEvent::Tcp4Announce => { @@ -171,18 +165,25 @@ impl TrackerStatisticsEventSender for StatsEventSender { } } -#[async_trait] -pub trait TrackerStatisticsRepository: Sync + Send { - async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics>; +#[derive(Clone)] +pub struct StatsRepository { + pub stats: Arc>, } -#[async_trait] -impl TrackerStatisticsRepository for StatsTracker { - async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { - self.stats.read().await +impl Default for StatsRepository { + fn default() -> Self { + Self::new() } } -pub trait TrackerStatsService: TrackerStatisticsRepository {} +impl StatsRepository { + pub fn new() -> Self { + Self { + stats: Arc::new(RwLock::new(TrackerStatistics::new())), + } + } -impl TrackerStatsService for StatsTracker {} + pub async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { + self.stats.read().await + } +} diff --git a/src/tracker/tracker.rs b/src/tracker/tracker.rs index 80f6e549d..7e74a3554 100644 --- a/src/tracker/tracker.rs +++ b/src/tracker/tracker.rs @@ -12,7 +12,7 @@ use crate::databases::database::Database; use crate::mode::TrackerMode; use crate::peer::TorrentPeer; use crate::protocol::common::InfoHash; -use crate::statistics::{TrackerStatistics, TrackerStatisticsEvent, TrackerStatisticsEventSender, TrackerStatsService}; +use crate::statistics::{StatsRepository, TrackerStatistics, TrackerStatisticsEvent, TrackerStatisticsEventSender}; use crate::tracker::key; use crate::tracker::key::AuthKey; use crate::tracker::torrent::{TorrentEntry, TorrentError, TorrentStats}; @@ -24,16 +24,16 @@ pub struct TorrentTracker { keys: RwLock>, whitelist: RwLock>, torrents: RwLock>, - stats_tracker: Box, stats_event_sender: Option>, + stats_repository: StatsRepository, database: Box, } impl TorrentTracker { pub fn new( config: Arc, - stats_tracker: Box, stats_event_sender: Option>, + stats_repository: StatsRepository, ) -> Result { let database = database::connect_database(&config.db_driver, &config.db_path)?; @@ -43,8 +43,8 @@ impl TorrentTracker { keys: RwLock::new(std::collections::HashMap::new()), whitelist: RwLock::new(std::collections::HashSet::new()), torrents: RwLock::new(std::collections::BTreeMap::new()), - stats_tracker, stats_event_sender, + stats_repository, database, }) } @@ -238,7 +238,7 @@ impl TorrentTracker { } pub async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { - self.stats_tracker.get_stats().await + self.stats_repository.get_stats().await } pub async fn send_stats_event(&self, event: TrackerStatisticsEvent) -> Option>> { diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 35d2e0247..b962b1333 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -251,13 +251,11 @@ mod tests { use std::sync::Arc; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; - use async_trait::async_trait; - use tokio::sync::{RwLock, RwLockReadGuard}; use crate::mode::TrackerMode; use crate::peer::TorrentPeer; use crate::protocol::clock::{DefaultClock, Time}; - use crate::statistics::{StatsTracker, TrackerStatistics, TrackerStatisticsRepository, TrackerStatsService}; + use crate::statistics::StatsTracker; use crate::tracker::tracker::TorrentTracker; use crate::{Configuration, PeerId}; @@ -281,8 +279,8 @@ mod tests { } fn initialized_tracker(configuration: Arc) -> Arc { - let (stats_tracker, stats_event_sender) = StatsTracker::new_active_instance(); - Arc::new(TorrentTracker::new(configuration, Box::new(stats_tracker), Some(stats_event_sender)).unwrap()) + let (stats_event_sender, stats_repository) = StatsTracker::new_active_instance(); + Arc::new(TorrentTracker::new(configuration, Some(stats_event_sender), stats_repository).unwrap()) } fn sample_ipv4_remote_addr() -> SocketAddr { @@ -339,27 +337,6 @@ mod tests { } } - struct TrackerStatsServiceMock { - stats: Arc>, - } - - impl TrackerStatsServiceMock { - fn new() -> Self { - Self { - stats: Arc::new(RwLock::new(TrackerStatistics::new())), - } - } - } - - #[async_trait] - impl TrackerStatisticsRepository for TrackerStatsServiceMock { - async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { - self.stats.read().await - } - } - - impl TrackerStatsService for TrackerStatsServiceMock {} - struct TrackerConfigurationBuilder { configuration: Configuration, } @@ -395,8 +372,8 @@ mod tests { use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId}; use mockall::predicate::eq; - use super::{default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr, TrackerStatsServiceMock}; - use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; + use super::{default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr}; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_connect; @@ -448,7 +425,6 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { - let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); stats_event_sender_mock .expect_send_event() @@ -459,8 +435,9 @@ mod tests { let client_socket_address = sample_ipv4_socket_address(); - let torrent_tracker = - Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap()); + let torrent_tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), + ); handle_connect(client_socket_address, &sample_connect_request(), torrent_tracker) .await .unwrap(); @@ -468,7 +445,6 @@ mod tests { #[tokio::test] async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { - let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); stats_event_sender_mock .expect_send_event() @@ -477,8 +453,9 @@ mod tests { .returning(|_| Box::pin(future::ready(Some(Ok(()))))); let stats_event_sender = Box::new(stats_event_sender_mock); - let torrent_tracker = - Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap()); + let torrent_tracker = Arc::new( + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), + ); handle_connect(sample_ipv6_remote_addr(), &sample_connect_request(), torrent_tracker) .await .unwrap(); @@ -568,14 +545,13 @@ mod tests { }; use mockall::predicate::eq; - use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_announce; use crate::udp::handlers::tests::announce_request::AnnounceRequestBuilder; use crate::udp::handlers::tests::{ default_tracker_config, initialized_public_tracker, sample_ipv4_socket_address, TorrentPeerBuilder, - TrackerStatsServiceMock, }; use crate::PeerId; @@ -714,7 +690,6 @@ mod tests { #[tokio::test] async fn should_send_the_upd4_announce_event() { - let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); stats_event_sender_mock .expect_send_event() @@ -724,7 +699,7 @@ mod tests { let stats_event_sender = Box::new(stats_event_sender_mock); let tracker = Arc::new( - TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), ); handle_announce( @@ -795,14 +770,13 @@ mod tests { }; use mockall::predicate::eq; - use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::connection_cookie::{into_connection_id, make_connection_cookie}; use crate::udp::handle_announce; use crate::udp::handlers::tests::announce_request::AnnounceRequestBuilder; use crate::udp::handlers::tests::{ default_tracker_config, initialized_public_tracker, sample_ipv6_remote_addr, TorrentPeerBuilder, - TrackerStatsServiceMock, }; use crate::PeerId; @@ -948,7 +922,6 @@ mod tests { #[tokio::test] async fn should_send_the_upd6_announce_event() { - let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); stats_event_sender_mock .expect_send_event() @@ -958,7 +931,7 @@ mod tests { let stats_event_sender = Box::new(stats_event_sender_mock); let tracker = Arc::new( - TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), ); let remote_addr = sample_ipv6_remote_addr(); @@ -988,9 +961,9 @@ mod tests { #[tokio::test] async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { let configuration = Arc::new(TrackerConfigurationBuilder::default().with_external_ip("::126.0.0.1").into()); - let (stats_tracker, stats_event_sender) = StatsTracker::new_active_instance(); + let (stats_event_sender, stats_repository) = StatsTracker::new_active_instance(); let tracker = - Arc::new(TorrentTracker::new(configuration, Box::new(stats_tracker), Some(stats_event_sender)).unwrap()); + Arc::new(TorrentTracker::new(configuration, Some(stats_event_sender), stats_repository).unwrap()); let loopback_ipv4 = Ipv4Addr::new(127, 0, 0, 1); let loopback_ipv6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); @@ -1258,14 +1231,13 @@ mod tests { use mockall::predicate::eq; use super::sample_scrape_request; - use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::handlers::handle_scrape; - use crate::udp::handlers::tests::{default_tracker_config, sample_ipv4_remote_addr, TrackerStatsServiceMock}; + use crate::udp::handlers::tests::{default_tracker_config, sample_ipv4_remote_addr}; #[tokio::test] async fn should_send_the_upd4_scrape_event() { - let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); stats_event_sender_mock .expect_send_event() @@ -1276,7 +1248,7 @@ mod tests { let remote_addr = sample_ipv4_remote_addr(); let tracker = Arc::new( - TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), ); handle_scrape(remote_addr, &sample_scrape_request(&remote_addr), tracker.clone()) @@ -1292,14 +1264,13 @@ mod tests { use mockall::predicate::eq; use super::sample_scrape_request; - use crate::statistics::{MockTrackerStatisticsEventSender, TrackerStatisticsEvent}; + use crate::statistics::{MockTrackerStatisticsEventSender, StatsRepository, TrackerStatisticsEvent}; use crate::tracker::tracker::TorrentTracker; use crate::udp::handlers::handle_scrape; - use crate::udp::handlers::tests::{default_tracker_config, sample_ipv6_remote_addr, TrackerStatsServiceMock}; + use crate::udp::handlers::tests::{default_tracker_config, sample_ipv6_remote_addr}; #[tokio::test] async fn should_send_the_upd6_scrape_event() { - let tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); let mut stats_event_sender_mock = MockTrackerStatisticsEventSender::new(); stats_event_sender_mock .expect_send_event() @@ -1310,7 +1281,7 @@ mod tests { let remote_addr = sample_ipv6_remote_addr(); let tracker = Arc::new( - TorrentTracker::new(default_tracker_config(), tracker_stats_service, Some(stats_event_sender)).unwrap(), + TorrentTracker::new(default_tracker_config(), Some(stats_event_sender), StatsRepository::new()).unwrap(), ); handle_scrape(remote_addr, &sample_scrape_request(&remote_addr), tracker.clone()) diff --git a/tests/udp.rs b/tests/udp.rs index d2b500d5a..abd16427b 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -51,11 +51,10 @@ mod udp_tracker_server { lazy_static::initialize(&static_time::TIME_AT_APP_START); // Initialize stats tracker - let (stats_tracker, stats_event_sender) = StatsTracker::new_active_instance(); + let (stats_event_sender, stats_repository) = StatsTracker::new_active_instance(); // Initialize Torrust tracker - let tracker = match TorrentTracker::new(configuration.clone(), Box::new(stats_tracker), Some(stats_event_sender)) - { + let tracker = match TorrentTracker::new(configuration.clone(), Some(stats_event_sender), stats_repository) { Ok(tracker) => Arc::new(tracker), Err(error) => { panic!("{}", error) From 0dd95e7961dee201a7fd4517230bf645c0bb3839 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Oct 2022 16:21:59 +0100 Subject: [PATCH 11/13] refactor: extract statistics event_handler --- src/tracker/statistics.rs | 335 ++++++++++++++++++++++++++++++++------ 1 file changed, 288 insertions(+), 47 deletions(-) diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index 8f203c36e..1a681a7a2 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -76,6 +76,12 @@ impl Default for StatsTracker { } impl StatsTracker { + pub fn new() -> Self { + Self { + stats_repository: StatsRepository::new(), + } + } + pub fn new_active_instance() -> (Box, StatsRepository) { let mut stats_tracker = Self::new(); @@ -84,12 +90,6 @@ impl StatsTracker { (stats_event_sender, stats_tracker.stats_repository) } - pub fn new() -> Self { - Self { - stats_repository: StatsRepository::new(), - } - } - pub fn run_event_listener(&mut self) -> Box { let (sender, receiver) = mpsc::channel::(CHANNEL_BUFFER_SIZE); @@ -103,49 +103,56 @@ impl StatsTracker { async fn event_listener(mut receiver: Receiver, stats_repository: StatsRepository) { while let Some(event) = receiver.recv().await { - let mut stats_lock = stats_repository.stats.write().await; - - match event { - TrackerStatisticsEvent::Tcp4Announce => { - stats_lock.tcp4_announces_handled += 1; - stats_lock.tcp4_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp4Scrape => { - stats_lock.tcp4_scrapes_handled += 1; - stats_lock.tcp4_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp6Announce => { - stats_lock.tcp6_announces_handled += 1; - stats_lock.tcp6_connections_handled += 1; - } - TrackerStatisticsEvent::Tcp6Scrape => { - stats_lock.tcp6_scrapes_handled += 1; - stats_lock.tcp6_connections_handled += 1; - } - TrackerStatisticsEvent::Udp4Connect => { - stats_lock.udp4_connections_handled += 1; - } - TrackerStatisticsEvent::Udp4Announce => { - stats_lock.udp4_announces_handled += 1; - } - TrackerStatisticsEvent::Udp4Scrape => { - stats_lock.udp4_scrapes_handled += 1; - } - TrackerStatisticsEvent::Udp6Connect => { - stats_lock.udp6_connections_handled += 1; - } - TrackerStatisticsEvent::Udp6Announce => { - stats_lock.udp6_announces_handled += 1; - } - TrackerStatisticsEvent::Udp6Scrape => { - stats_lock.udp6_scrapes_handled += 1; - } - } - - debug!("stats: {:?}", stats_lock); + event_handler(event, &stats_repository).await; + } +} - drop(stats_lock); +async fn event_handler(event: TrackerStatisticsEvent, stats_repository: &StatsRepository) { + match event { + // TCP4 + TrackerStatisticsEvent::Tcp4Announce => { + stats_repository.increase_tcp4_announces().await; + stats_repository.increase_tcp4_connections().await; + } + TrackerStatisticsEvent::Tcp4Scrape => { + stats_repository.increase_tcp4_scrapes().await; + stats_repository.increase_tcp4_connections().await; + } + + // TCP6 + TrackerStatisticsEvent::Tcp6Announce => { + stats_repository.increase_tcp6_announces().await; + stats_repository.increase_tcp6_connections().await; + } + TrackerStatisticsEvent::Tcp6Scrape => { + stats_repository.increase_tcp6_scrapes().await; + stats_repository.increase_tcp6_connections().await; + } + + // UDP4 + TrackerStatisticsEvent::Udp4Connect => { + stats_repository.increase_udp4_connections().await; + } + TrackerStatisticsEvent::Udp4Announce => { + stats_repository.increase_udp4_announces().await; + } + TrackerStatisticsEvent::Udp4Scrape => { + stats_repository.increase_udp4_scrapes().await; + } + + // UDP6 + TrackerStatisticsEvent::Udp6Connect => { + stats_repository.increase_udp6_connections().await; + } + TrackerStatisticsEvent::Udp6Announce => { + stats_repository.increase_udp6_announces().await; + } + TrackerStatisticsEvent::Udp6Scrape => { + stats_repository.increase_udp6_scrapes().await; + } } + + debug!("stats: {:?}", stats_repository.get_stats().await); } #[async_trait] @@ -186,4 +193,238 @@ impl StatsRepository { pub async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { self.stats.read().await } + + pub async fn increase_tcp4_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp4_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp4_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp4_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp4_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp4_scrapes_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp6_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp6_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp6_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp6_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_tcp6_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.tcp6_scrapes_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp4_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp4_scrapes_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_connections(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_connections_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_announces(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_announces_handled += 1; + drop(stats_lock); + } + + pub async fn increase_udp6_scrapes(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp6_scrapes_handled += 1; + drop(stats_lock); + } +} + +#[cfg(test)] +mod tests { + + mod event_handler { + use crate::statistics::{event_handler, StatsRepository, TrackerStatisticsEvent}; + + #[tokio::test] + async fn should_increase_the_tcp4_announces_counter_when_it_receives_a_tcp4_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp4Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp4Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp4_scrapes_counter_when_it_receives_a_tcp4_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp4Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_scrapes_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp4_connections_counter_when_it_receives_a_tcp4_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp4Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp6_announces_counter_when_it_receives_a_tcp6_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp6Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp6_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp6Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp6_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp6_scrapes_counter_when_it_receives_a_tcp6_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp6Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp6_scrapes_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_tcp6_connections_counter_when_it_receives_a_tcp6_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Tcp6Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.tcp6_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_connections_counter_when_it_receives_a_udp4_connect_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp4Connect, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_announces_counter_when_it_receives_a_udp4_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp4Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp4_scrapes_counter_when_it_receives_a_udp4_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp4Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp4_scrapes_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_connections_counter_when_it_receives_a_udp6_connect_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp6Connect, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_connections_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_announces_counter_when_it_receives_a_udp6_announce_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp6Announce, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_announces_handled, 1); + } + + #[tokio::test] + async fn should_increase_the_udp6_scrapes_counter_when_it_receives_a_udp6_scrape_event() { + let stats_repository = StatsRepository::new(); + + event_handler(TrackerStatisticsEvent::Udp6Scrape, &stats_repository).await; + + let stats = stats_repository.get_stats().await; + + assert_eq!(stats.udp6_scrapes_handled, 1); + } + } } From 9e493055f8c2bb59b923cce1ca2306c681e09c59 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Oct 2022 16:36:28 +0100 Subject: [PATCH 12/13] test: add tests for StatsTracker --- src/tracker/statistics.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index 1a681a7a2..c4d4971af 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -270,6 +270,30 @@ impl StatsRepository { #[cfg(test)] mod tests { + mod stats_tracker { + use crate::statistics::{StatsTracker, TrackerStatistics, TrackerStatisticsEvent}; + + #[tokio::test] + async fn should_contain_the_tracker_statistics() { + let stats_tracker = StatsTracker::new(); + + let stats = stats_tracker.stats_repository.get_stats().await; + + assert_eq!(stats.tcp4_announces_handled, TrackerStatistics::new().tcp4_announces_handled); + } + + #[tokio::test] + async fn should_create_an_event_sender_to_send_statistical_events() { + let mut stats_tracker = StatsTracker::new(); + + let event_sender = stats_tracker.run_event_listener(); + + let result = event_sender.send_event(TrackerStatisticsEvent::Udp4Connect).await; + + assert!(result.is_some()); + } + } + mod event_handler { use crate::statistics::{event_handler, StatsRepository, TrackerStatisticsEvent}; From 6f77dfeee64037cd9011617057d9f1e5c3e2fd63 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Wed, 26 Oct 2022 15:31:37 +0100 Subject: [PATCH 13/13] fix: use only minor version for dependencies --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 18188565c..b2b256a2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,4 +61,4 @@ aquatic_udp_protocol = "0.2" uuid = { version = "1", features = ["v4"] } [dev-dependencies] -mockall = "0.11.3" +mockall = "0.11"