diff --git a/src/app.rs b/src/app.rs index 06fea4d2e..abfe75256 100644 --- a/src/app.rs +++ b/src/app.rs @@ -23,12 +23,14 @@ //! - Tracker REST API: the tracker API can be enabled/disabled. use std::sync::Arc; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use torrust_tracker_configuration::Configuration; use tracing::instrument; use crate::bootstrap::jobs::{health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker}; use crate::servers::registar::Registar; +use crate::servers::udp::server::banning::BanService; use crate::{core, servers}; /// # Panics @@ -37,8 +39,12 @@ use crate::{core, servers}; /// /// - Can't retrieve tracker keys from database. /// - Can't load whitelist from database. -#[instrument(skip(config, tracker))] -pub async fn start(config: &Configuration, tracker: Arc) -> Vec> { +#[instrument(skip(config, tracker, ban_service))] +pub async fn start( + config: &Configuration, + tracker: Arc, + ban_service: Arc>, +) -> Vec> { if config.http_api.is_none() && (config.udp_trackers.is_none() || config.udp_trackers.as_ref().map_or(true, std::vec::Vec::is_empty)) && (config.http_trackers.is_none() || config.http_trackers.as_ref().map_or(true, std::vec::Vec::is_empty)) @@ -75,7 +81,9 @@ pub async fn start(config: &Configuration, tracker: Arc) -> Vec) -> Vec (Configuration, Arc) { +pub fn setup() -> (Configuration, Arc, Arc>) { #[cfg(not(test))] check_seed(); @@ -44,9 +47,11 @@ pub fn setup() -> (Configuration, Arc) { let tracker = initialize_with_configuration(&configuration); + let udp_ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + tracing::info!("Configuration:\n{}", configuration.clone().mask_secrets().to_json()); - (configuration, tracker) + (configuration, tracker, udp_ban_service) } /// checks if the seed is the instance seed in production. diff --git a/src/bootstrap/jobs/tracker_apis.rs b/src/bootstrap/jobs/tracker_apis.rs index 35b13b7ce..858888540 100644 --- a/src/bootstrap/jobs/tracker_apis.rs +++ b/src/bootstrap/jobs/tracker_apis.rs @@ -24,6 +24,7 @@ use std::net::SocketAddr; use std::sync::Arc; use axum_server::tls_rustls::RustlsConfig; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use torrust_tracker_configuration::{AccessTokens, HttpApi}; use tracing::instrument; @@ -33,6 +34,7 @@ use crate::core; use crate::servers::apis::server::{ApiServer, Launcher}; use crate::servers::apis::Version; use crate::servers::registar::ServiceRegistrationForm; +use crate::servers::udp::server::banning::BanService; /// This is the message that the "launcher" spawned task sends to the main /// application process to notify the API server was successfully started. @@ -54,10 +56,11 @@ pub struct ApiServerJobStarted(); /// It would panic if unable to send the `ApiServerJobStarted` notice. /// /// -#[instrument(skip(config, tracker, form))] +#[instrument(skip(config, tracker, ban_service, form))] pub async fn start_job( config: &HttpApi, tracker: Arc, + ban_service: Arc>, form: ServiceRegistrationForm, version: Version, ) -> Option> { @@ -70,21 +73,22 @@ pub async fn start_job( let access_tokens = Arc::new(config.access_tokens.clone()); match version { - Version::V1 => Some(start_v1(bind_to, tls, tracker.clone(), form, access_tokens).await), + Version::V1 => Some(start_v1(bind_to, tls, tracker.clone(), ban_service.clone(), form, access_tokens).await), } } #[allow(clippy::async_yields_async)] -#[instrument(skip(socket, tls, tracker, form, access_tokens))] +#[instrument(skip(socket, tls, tracker, ban_service, form, access_tokens))] async fn start_v1( socket: SocketAddr, tls: Option, tracker: Arc, + ban_service: Arc>, form: ServiceRegistrationForm, access_tokens: Arc, ) -> JoinHandle<()> { let server = ApiServer::new(Launcher::new(socket, tls)) - .start(tracker, form, access_tokens) + .start(tracker, ban_service, form, access_tokens) .await .expect("it should be able to start to the tracker api"); @@ -98,21 +102,25 @@ async fn start_v1( mod tests { use std::sync::Arc; + use tokio::sync::RwLock; use torrust_tracker_test_helpers::configuration::ephemeral_public; use crate::bootstrap::app::initialize_with_configuration; use crate::bootstrap::jobs::tracker_apis::start_job; use crate::servers::apis::Version; use crate::servers::registar::Registar; + use crate::servers::udp::server::banning::BanService; + use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; #[tokio::test] async fn it_should_start_http_tracker() { let cfg = Arc::new(ephemeral_public()); let config = &cfg.http_api.clone().unwrap(); let tracker = initialize_with_configuration(&cfg); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); let version = Version::V1; - start_job(config, tracker, Registar::default().give_form(), version) + start_job(config, tracker, ban_service, Registar::default().give_form(), version) .await .expect("it should be able to join to the tracker api start-job"); } diff --git a/src/bootstrap/jobs/udp_tracker.rs b/src/bootstrap/jobs/udp_tracker.rs index 6aab06d4f..8948811af 100644 --- a/src/bootstrap/jobs/udp_tracker.rs +++ b/src/bootstrap/jobs/udp_tracker.rs @@ -8,12 +8,14 @@ //! > for the configuration options. use std::sync::Arc; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use torrust_tracker_configuration::UdpTracker; use tracing::instrument; use crate::core; use crate::servers::registar::ServiceRegistrationForm; +use crate::servers::udp::server::banning::BanService; use crate::servers::udp::server::spawner::Spawner; use crate::servers::udp::server::Server; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; @@ -29,13 +31,18 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET; /// It will panic if the task did not finish successfully. #[must_use] #[allow(clippy::async_yields_async)] -#[instrument(skip(config, tracker, form))] -pub async fn start_job(config: &UdpTracker, tracker: Arc, form: ServiceRegistrationForm) -> JoinHandle<()> { +#[instrument(skip(config, tracker, ban_service, form))] +pub async fn start_job( + config: &UdpTracker, + tracker: Arc, + ban_service: Arc>, + form: ServiceRegistrationForm, +) -> JoinHandle<()> { let bind_to = config.bind_address; let cookie_lifetime = config.cookie_lifetime; let server = Server::new(Spawner::new(bind_to)) - .start(tracker, form, cookie_lifetime) + .start(tracker, ban_service, form, cookie_lifetime) .await .expect("it should be able to start the udp tracker"); diff --git a/src/console/profiling.rs b/src/console/profiling.rs index 5fb507197..1d31af3ce 100644 --- a/src/console/profiling.rs +++ b/src/console/profiling.rs @@ -179,9 +179,9 @@ pub async fn run() { return; }; - let (config, tracker) = bootstrap::app::setup(); + let (config, tracker, ban_service) = bootstrap::app::setup(); - let jobs = app::start(&config, tracker).await; + let jobs = app::start(&config, tracker, ban_service).await; // Run the tracker for a fixed duration let run_duration = sleep(Duration::from_secs(duration_secs)); diff --git a/src/core/services/statistics/mod.rs b/src/core/services/statistics/mod.rs index 10e1c60fa..4143aaf1f 100644 --- a/src/core/services/statistics/mod.rs +++ b/src/core/services/statistics/mod.rs @@ -40,10 +40,12 @@ pub mod setup; use std::sync::Arc; +use tokio::sync::RwLock; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; use crate::core::statistics::metrics::Metrics; use crate::core::Tracker; +use crate::servers::udp::server::banning::BanService; /// All the metrics collected by the tracker. #[derive(Debug, PartialEq)] @@ -60,28 +62,37 @@ pub struct TrackerMetrics { } /// It returns all the [`TrackerMetrics`] -pub async fn get_metrics(tracker: Arc) -> TrackerMetrics { +pub async fn get_metrics(tracker: Arc, ban_service: Arc>) -> TrackerMetrics { let torrents_metrics = tracker.get_torrents_metrics(); let stats = tracker.get_stats().await; + let udp_banned_ips_total = ban_service.read().await.get_banned_ips_total(); TrackerMetrics { torrents_metrics, protocol_metrics: Metrics { - // TCP + // TCPv4 tcp4_connections_handled: stats.tcp4_connections_handled, tcp4_announces_handled: stats.tcp4_announces_handled, tcp4_scrapes_handled: stats.tcp4_scrapes_handled, + // TCPv6 tcp6_connections_handled: stats.tcp6_connections_handled, tcp6_announces_handled: stats.tcp6_announces_handled, tcp6_scrapes_handled: stats.tcp6_scrapes_handled, // UDP udp_requests_aborted: stats.udp_requests_aborted, + udp_requests_banned: stats.udp_requests_banned, + udp_banned_ips_total: udp_banned_ips_total as u64, + udp_avg_connect_processing_time_ns: stats.udp_avg_connect_processing_time_ns, + udp_avg_announce_processing_time_ns: stats.udp_avg_announce_processing_time_ns, + udp_avg_scrape_processing_time_ns: stats.udp_avg_scrape_processing_time_ns, + // UDPv4 udp4_requests: stats.udp4_requests, udp4_connections_handled: stats.udp4_connections_handled, udp4_announces_handled: stats.udp4_announces_handled, udp4_scrapes_handled: stats.udp4_scrapes_handled, udp4_responses: stats.udp4_responses, udp4_errors_handled: stats.udp4_errors_handled, + // UDPv6 udp6_requests: stats.udp6_requests, udp6_connections_handled: stats.udp6_connections_handled, udp6_announces_handled: stats.udp6_announces_handled, @@ -96,6 +107,7 @@ pub async fn get_metrics(tracker: Arc) -> TrackerMetrics { mod tests { use std::sync::Arc; + use tokio::sync::RwLock; use torrust_tracker_configuration::Configuration; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; use torrust_tracker_test_helpers::configuration; @@ -103,6 +115,8 @@ mod tests { use crate::core; use crate::core::services::statistics::{get_metrics, TrackerMetrics}; use crate::core::services::tracker_factory; + use crate::servers::udp::server::banning::BanService; + use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; pub fn tracker_configuration() -> Configuration { configuration::ephemeral() @@ -111,8 +125,9 @@ mod tests { #[tokio::test] async fn the_statistics_service_should_return_the_tracker_metrics() { let tracker = Arc::new(tracker_factory(&tracker_configuration())); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); - let tracker_metrics = get_metrics(tracker.clone()).await; + let tracker_metrics = get_metrics(tracker.clone(), ban_service.clone()).await; assert_eq!( tracker_metrics, diff --git a/src/core/statistics/event/handler.rs b/src/core/statistics/event/handler.rs index 5acc5e12c..3c435145a 100644 --- a/src/core/statistics/event/handler.rs +++ b/src/core/statistics/event/handler.rs @@ -1,4 +1,4 @@ -use crate::core::statistics::event::Event; +use crate::core::statistics::event::{Event, UdpResponseKind}; use crate::core::statistics::repository::Repository; pub async fn handle_event(event: Event, stats_repository: &Repository) { @@ -24,9 +24,12 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { } // UDP - Event::Udp4RequestAborted => { + Event::UdpRequestAborted => { stats_repository.increase_udp_requests_aborted().await; } + Event::UdpRequestBanned => { + stats_repository.increase_udp_requests_banned().await; + } // UDP4 Event::Udp4Request => { @@ -41,8 +44,30 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { Event::Udp4Scrape => { stats_repository.increase_udp4_scrapes().await; } - Event::Udp4Response => { + Event::Udp4Response { + kind, + req_processing_time, + } => { stats_repository.increase_udp4_responses().await; + + match kind { + UdpResponseKind::Connect => { + stats_repository + .recalculate_udp_avg_connect_processing_time_ns(req_processing_time) + .await; + } + UdpResponseKind::Announce => { + stats_repository + .recalculate_udp_avg_announce_processing_time_ns(req_processing_time) + .await; + } + UdpResponseKind::Scrape => { + stats_repository + .recalculate_udp_avg_scrape_processing_time_ns(req_processing_time) + .await; + } + UdpResponseKind::Error => {} + } } Event::Udp4Error => { stats_repository.increase_udp4_errors().await; @@ -61,7 +86,10 @@ pub async fn handle_event(event: Event, stats_repository: &Repository) { Event::Udp6Scrape => { stats_repository.increase_udp6_scrapes().await; } - Event::Udp6Response => { + Event::Udp6Response { + kind: _, + req_processing_time: _, + } => { stats_repository.increase_udp6_responses().await; } Event::Udp6Error => { diff --git a/src/core/statistics/event/mod.rs b/src/core/statistics/event/mod.rs index b14995cc1..905aa0372 100644 --- a/src/core/statistics/event/mod.rs +++ b/src/core/statistics/event/mod.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + pub mod handler; pub mod listener; pub mod sender; @@ -18,17 +20,32 @@ pub enum Event { Tcp4Scrape, Tcp6Announce, Tcp6Scrape, - Udp4RequestAborted, + UdpRequestAborted, + UdpRequestBanned, Udp4Request, Udp4Connect, Udp4Announce, Udp4Scrape, - Udp4Response, + Udp4Response { + kind: UdpResponseKind, + req_processing_time: Duration, + }, Udp4Error, Udp6Request, Udp6Connect, Udp6Announce, Udp6Scrape, - Udp6Response, + Udp6Response { + kind: UdpResponseKind, + req_processing_time: Duration, + }, Udp6Error, } + +#[derive(Debug, PartialEq, Eq)] +pub enum UdpResponseKind { + Connect, + Announce, + Scrape, + Error, +} diff --git a/src/core/statistics/metrics.rs b/src/core/statistics/metrics.rs index 970302816..40262efd6 100644 --- a/src/core/statistics/metrics.rs +++ b/src/core/statistics/metrics.rs @@ -28,9 +28,26 @@ pub struct Metrics { /// Total number of TCP (HTTP tracker) `scrape` requests from IPv6 peers. pub tcp6_scrapes_handled: u64, + // UDP /// Total number of UDP (UDP tracker) requests aborted. pub udp_requests_aborted: u64, + /// Total number of UDP (UDP tracker) requests banned. + pub udp_requests_banned: u64, + + /// Total number of banned IPs. + pub udp_banned_ips_total: u64, + + /// Average rounded time spent processing UDP connect requests. + pub udp_avg_connect_processing_time_ns: u64, + + /// Average rounded time spent processing UDP announce requests. + pub udp_avg_announce_processing_time_ns: u64, + + /// Average rounded time spent processing UDP scrape requests. + pub udp_avg_scrape_processing_time_ns: u64, + + // UDPv4 /// Total number of UDP (UDP tracker) requests from IPv4 peers. pub udp4_requests: u64, @@ -49,6 +66,7 @@ pub struct Metrics { /// Total number of UDP (UDP tracker) `error` requests from IPv4 peers. pub udp4_errors_handled: u64, + // UDPv6 /// Total number of UDP (UDP tracker) requests from IPv6 peers. pub udp6_requests: u64, diff --git a/src/core/statistics/repository.rs b/src/core/statistics/repository.rs index bdbc046de..ec5100073 100644 --- a/src/core/statistics/repository.rs +++ b/src/core/statistics/repository.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -70,6 +71,12 @@ impl Repository { drop(stats_lock); } + pub async fn increase_udp_requests_banned(&self) { + let mut stats_lock = self.stats.write().await; + stats_lock.udp_requests_banned += 1; + drop(stats_lock); + } + pub async fn increase_udp4_requests(&self) { let mut stats_lock = self.stats.write().await; stats_lock.udp4_requests += 1; @@ -106,6 +113,64 @@ impl Repository { drop(stats_lock); } + #[allow(clippy::cast_precision_loss)] + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + pub async fn recalculate_udp_avg_connect_processing_time_ns(&self, req_processing_time: Duration) { + let mut stats_lock = self.stats.write().await; + + let req_processing_time = req_processing_time.as_nanos() as f64; + let udp_connections_handled = (stats_lock.udp4_connections_handled + stats_lock.udp6_connections_handled) as f64; + + let previous_avg = stats_lock.udp_avg_connect_processing_time_ns; + + // Moving average: https://en.wikipedia.org/wiki/Moving_average + let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_connections_handled; + + stats_lock.udp_avg_connect_processing_time_ns = new_avg.ceil() as u64; + + drop(stats_lock); + } + + #[allow(clippy::cast_precision_loss)] + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + pub async fn recalculate_udp_avg_announce_processing_time_ns(&self, req_processing_time: Duration) { + let mut stats_lock = self.stats.write().await; + + let req_processing_time = req_processing_time.as_nanos() as f64; + + let udp_announces_handled = (stats_lock.udp4_announces_handled + stats_lock.udp6_announces_handled) as f64; + + let previous_avg = stats_lock.udp_avg_announce_processing_time_ns; + + // Moving average: https://en.wikipedia.org/wiki/Moving_average + let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_announces_handled; + + stats_lock.udp_avg_announce_processing_time_ns = new_avg.ceil() as u64; + + drop(stats_lock); + } + + #[allow(clippy::cast_precision_loss)] + #[allow(clippy::cast_possible_truncation)] + #[allow(clippy::cast_sign_loss)] + pub async fn recalculate_udp_avg_scrape_processing_time_ns(&self, req_processing_time: Duration) { + let mut stats_lock = self.stats.write().await; + + let req_processing_time = req_processing_time.as_nanos() as f64; + let udp_scrapes_handled = (stats_lock.udp4_scrapes_handled + stats_lock.udp6_scrapes_handled) as f64; + + let previous_avg = stats_lock.udp_avg_scrape_processing_time_ns; + + // Moving average: https://en.wikipedia.org/wiki/Moving_average + let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_scrapes_handled; + + stats_lock.udp_avg_scrape_processing_time_ns = new_avg.ceil() as u64; + + drop(stats_lock); + } + pub async fn increase_udp6_requests(&self) { let mut stats_lock = self.stats.write().await; stats_lock.udp6_requests += 1; diff --git a/src/main.rs b/src/main.rs index 0e2bcfbc9..c93982191 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,9 +2,9 @@ use torrust_tracker_lib::{app, bootstrap}; #[tokio::main] async fn main() { - let (config, tracker) = bootstrap::app::setup(); + let (config, tracker, udp_ban_service) = bootstrap::app::setup(); - let jobs = app::start(&config, tracker).await; + let jobs = app::start(&config, tracker, udp_ban_service).await; // handle the signals tokio::select! { diff --git a/src/servers/apis/routes.rs b/src/servers/apis/routes.rs index 0b0862fb9..98442ea97 100644 --- a/src/servers/apis/routes.rs +++ b/src/servers/apis/routes.rs @@ -15,6 +15,7 @@ use axum::response::Response; use axum::routing::get; use axum::{middleware, BoxError, Router}; use hyper::{Request, StatusCode}; +use tokio::sync::RwLock; use torrust_tracker_configuration::{AccessTokens, DEFAULT_TIMEOUT}; use tower::timeout::TimeoutLayer; use tower::ServiceBuilder; @@ -32,16 +33,22 @@ use super::v1::middlewares::auth::State; use crate::core::Tracker; use crate::servers::apis::API_LOG_TARGET; use crate::servers::logging::Latency; +use crate::servers::udp::server::banning::BanService; /// Add all API routes to the router. #[allow(clippy::needless_pass_by_value)] -#[instrument(skip(tracker, access_tokens))] -pub fn router(tracker: Arc, access_tokens: Arc, server_socket_addr: SocketAddr) -> Router { +#[instrument(skip(tracker, ban_service, access_tokens))] +pub fn router( + tracker: Arc, + ban_service: Arc>, + access_tokens: Arc, + server_socket_addr: SocketAddr, +) -> Router { let router = Router::new(); let api_url_prefix = "/api"; - let router = v1::routes::add(api_url_prefix, router, tracker.clone()); + let router = v1::routes::add(api_url_prefix, router, tracker.clone(), ban_service.clone()); let state = State { access_tokens }; diff --git a/src/servers/apis/server.rs b/src/servers/apis/server.rs index eadadecf2..9d1c77c03 100644 --- a/src/servers/apis/server.rs +++ b/src/servers/apis/server.rs @@ -33,6 +33,7 @@ use derive_more::Constructor; use futures::future::BoxFuture; use thiserror::Error; use tokio::sync::oneshot::{Receiver, Sender}; +use tokio::sync::RwLock; use torrust_tracker_configuration::AccessTokens; use tracing::{instrument, Level}; @@ -44,6 +45,7 @@ use crate::servers::custom_axum_server::{self, TimeoutAcceptor}; use crate::servers::logging::STARTED_ON; use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::{graceful_shutdown, Halted}; +use crate::servers::udp::server::banning::BanService; /// Errors that can occur when starting or stopping the API server. #[derive(Debug, Error)] @@ -122,10 +124,11 @@ impl ApiServer { /// # Panics /// /// It would panic if the bound socket address cannot be sent back to this starter. - #[instrument(skip(self, tracker, form, access_tokens), err, ret(Display, level = Level::INFO))] + #[instrument(skip(self, tracker, ban_service, form, access_tokens), err, ret(Display, level = Level::INFO))] pub async fn start( self, tracker: Arc, + ban_service: Arc>, form: ServiceRegistrationForm, access_tokens: Arc, ) -> Result, Error> { @@ -137,7 +140,7 @@ impl ApiServer { let task = tokio::spawn(async move { tracing::debug!(target: API_LOG_TARGET, "Starting with launcher in spawned task ..."); - let _task = launcher.start(tracker, access_tokens, tx_start, rx_halt).await; + let _task = launcher.start(tracker, ban_service, access_tokens, tx_start, rx_halt).await; tracing::debug!(target: API_LOG_TARGET, "Started with launcher in spawned task"); @@ -235,10 +238,11 @@ impl Launcher { /// /// Will panic if unable to bind to the socket, or unable to get the address of the bound socket. /// Will also panic if unable to send message regarding the bound socket address. - #[instrument(skip(self, tracker, access_tokens, tx_start, rx_halt))] + #[instrument(skip(self, tracker, ban_service, access_tokens, tx_start, rx_halt))] pub fn start( &self, tracker: Arc, + ban_service: Arc>, access_tokens: Arc, tx_start: Sender, rx_halt: Receiver, @@ -246,7 +250,7 @@ impl Launcher { let socket = std::net::TcpListener::bind(self.bind_to).expect("Could not bind tcp_listener to address."); let address = socket.local_addr().expect("Could not get local_addr from tcp_listener."); - let router = router(tracker, access_tokens, address); + let router = router(tracker, ban_service, access_tokens, address); let handle = Handle::new(); @@ -294,12 +298,15 @@ impl Launcher { mod tests { use std::sync::Arc; + use tokio::sync::RwLock; use torrust_tracker_test_helpers::configuration::ephemeral_public; use crate::bootstrap::app::initialize_with_configuration; use crate::bootstrap::jobs::make_rust_tls; use crate::servers::apis::server::{ApiServer, Launcher}; use crate::servers::registar::Registar; + use crate::servers::udp::server::banning::BanService; + use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; #[tokio::test] async fn it_should_be_able_to_start_and_stop() { @@ -307,6 +314,7 @@ mod tests { let config = &cfg.http_api.clone().unwrap(); let tracker = initialize_with_configuration(&cfg); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); let bind_to = config.bind_address; @@ -321,7 +329,7 @@ mod tests { let register = &Registar::default(); let started = stopped - .start(tracker, register.give_form(), access_tokens) + .start(tracker, ban_service, register.give_form(), access_tokens) .await .expect("it should start the server"); let stopped = started.stop().await.expect("it should stop the server"); diff --git a/src/servers/apis/v1/context/stats/handlers.rs b/src/servers/apis/v1/context/stats/handlers.rs index 8b11b1ff1..b630c763d 100644 --- a/src/servers/apis/v1/context/stats/handlers.rs +++ b/src/servers/apis/v1/context/stats/handlers.rs @@ -6,10 +6,12 @@ use axum::extract::State; use axum::response::Response; use axum_extra::extract::Query; use serde::Deserialize; +use tokio::sync::RwLock; use super::responses::{metrics_response, stats_response}; use crate::core::services::statistics::get_metrics; use crate::core::Tracker; +use crate::servers::udp::server::banning::BanService; #[derive(Deserialize, Debug, Default)] #[serde(rename_all = "lowercase")] @@ -35,8 +37,11 @@ pub struct QueryParams { /// /// Refer to the [API endpoint documentation](crate::servers::apis::v1::context::stats#get-tracker-statistics) /// for more information about this endpoint. -pub async fn get_stats_handler(State(tracker): State>, params: Query) -> Response { - let metrics = get_metrics(tracker.clone()).await; +pub async fn get_stats_handler( + State(state): State<(Arc, Arc>)>, + params: Query, +) -> Response { + let metrics = get_metrics(state.0.clone(), state.1.clone()).await; match params.0.format { Some(format) => match format { diff --git a/src/servers/apis/v1/context/stats/resources.rs b/src/servers/apis/v1/context/stats/resources.rs index 55cb3a581..c6a526a7d 100644 --- a/src/servers/apis/v1/context/stats/resources.rs +++ b/src/servers/apis/v1/context/stats/resources.rs @@ -34,9 +34,21 @@ pub struct Stats { /// Total number of TCP (HTTP tracker) `scrape` requests from IPv6 peers. pub tcp6_scrapes_handled: u64, + // UDP /// Total number of UDP (UDP tracker) requests aborted. pub udp_requests_aborted: u64, + /// Total number of UDP (UDP tracker) requests banned. + pub udp_requests_banned: u64, + /// Total number of IPs banned for UDP (UDP tracker) requests. + pub udp_banned_ips_total: u64, + /// Average rounded time spent processing UDP connect requests. + pub udp_avg_connect_processing_time_ns: u64, + /// Average rounded time spent processing UDP announce requests. + pub udp_avg_announce_processing_time_ns: u64, + /// Average rounded time spent processing UDP scrape requests. + pub udp_avg_scrape_processing_time_ns: u64, + // UDPv4 /// Total number of UDP (UDP tracker) requests from IPv4 peers. pub udp4_requests: u64, /// Total number of UDP (UDP tracker) connections from IPv4 peers. @@ -50,6 +62,7 @@ pub struct Stats { /// Total number of UDP (UDP tracker) `scrape` requests from IPv4 peers. pub udp4_errors_handled: u64, + // UDPv6 /// Total number of UDP (UDP tracker) requests from IPv6 peers. pub udp6_requests: u64, /// Total number of UDP (UDP tracker) `connection` requests from IPv6 peers. @@ -80,12 +93,19 @@ impl From for Stats { tcp6_scrapes_handled: metrics.protocol_metrics.tcp6_scrapes_handled, // UDP udp_requests_aborted: metrics.protocol_metrics.udp_requests_aborted, + udp_requests_banned: metrics.protocol_metrics.udp_requests_banned, + udp_banned_ips_total: metrics.protocol_metrics.udp_banned_ips_total, + udp_avg_connect_processing_time_ns: metrics.protocol_metrics.udp_avg_connect_processing_time_ns, + udp_avg_announce_processing_time_ns: metrics.protocol_metrics.udp_avg_announce_processing_time_ns, + udp_avg_scrape_processing_time_ns: metrics.protocol_metrics.udp_avg_scrape_processing_time_ns, + // UDPv4 udp4_requests: metrics.protocol_metrics.udp4_requests, udp4_connections_handled: metrics.protocol_metrics.udp4_connections_handled, udp4_announces_handled: metrics.protocol_metrics.udp4_announces_handled, udp4_scrapes_handled: metrics.protocol_metrics.udp4_scrapes_handled, udp4_responses: metrics.protocol_metrics.udp4_responses, udp4_errors_handled: metrics.protocol_metrics.udp4_errors_handled, + // UDPv6 udp6_requests: metrics.protocol_metrics.udp6_requests, udp6_connections_handled: metrics.protocol_metrics.udp6_connections_handled, udp6_announces_handled: metrics.protocol_metrics.udp6_announces_handled, @@ -124,18 +144,25 @@ mod tests { tcp6_scrapes_handled: 10, // UDP udp_requests_aborted: 11, - udp4_requests: 12, - udp4_connections_handled: 13, - udp4_announces_handled: 14, - udp4_scrapes_handled: 15, - udp4_responses: 16, - udp4_errors_handled: 17, - udp6_requests: 18, - udp6_connections_handled: 19, - udp6_announces_handled: 20, - udp6_scrapes_handled: 21, - udp6_responses: 22, - udp6_errors_handled: 23 + udp_requests_banned: 12, + udp_banned_ips_total: 13, + udp_avg_connect_processing_time_ns: 14, + udp_avg_announce_processing_time_ns: 15, + udp_avg_scrape_processing_time_ns: 16, + // UDPv4 + udp4_requests: 17, + udp4_connections_handled: 18, + udp4_announces_handled: 19, + udp4_scrapes_handled: 20, + udp4_responses: 21, + udp4_errors_handled: 22, + // UDPv6 + udp6_requests: 23, + udp6_connections_handled: 24, + udp6_announces_handled: 25, + udp6_scrapes_handled: 26, + udp6_responses: 27, + udp6_errors_handled: 28 } }), Stats { @@ -143,27 +170,35 @@ mod tests { seeders: 1, completed: 2, leechers: 3, - // TCP + // TCPv4 tcp4_connections_handled: 5, tcp4_announces_handled: 6, tcp4_scrapes_handled: 7, + // TCPv6 tcp6_connections_handled: 8, tcp6_announces_handled: 9, tcp6_scrapes_handled: 10, // UDP udp_requests_aborted: 11, - udp4_requests: 12, - udp4_connections_handled: 13, - udp4_announces_handled: 14, - udp4_scrapes_handled: 15, - udp4_responses: 16, - udp4_errors_handled: 17, - udp6_requests: 18, - udp6_connections_handled: 19, - udp6_announces_handled: 20, - udp6_scrapes_handled: 21, - udp6_responses: 22, - udp6_errors_handled: 23 + udp_requests_banned: 12, + udp_banned_ips_total: 13, + udp_avg_connect_processing_time_ns: 14, + udp_avg_announce_processing_time_ns: 15, + udp_avg_scrape_processing_time_ns: 16, + // UDPv4 + udp4_requests: 17, + udp4_connections_handled: 18, + udp4_announces_handled: 19, + udp4_scrapes_handled: 20, + udp4_responses: 21, + udp4_errors_handled: 22, + // UDPv6 + udp6_requests: 23, + udp6_connections_handled: 24, + udp6_announces_handled: 25, + udp6_scrapes_handled: 26, + udp6_responses: 27, + udp6_errors_handled: 28 } ); } diff --git a/src/servers/apis/v1/context/stats/responses.rs b/src/servers/apis/v1/context/stats/responses.rs index 6b214d0c9..a67b5328a 100644 --- a/src/servers/apis/v1/context/stats/responses.rs +++ b/src/servers/apis/v1/context/stats/responses.rs @@ -21,6 +21,10 @@ pub fn metrics_response(tracker_metrics: &TrackerMetrics) -> Response { lines.push(format!("completed {}", tracker_metrics.torrents_metrics.downloaded)); lines.push(format!("leechers {}", tracker_metrics.torrents_metrics.incomplete)); + // TCP + + // TCPv4 + lines.push(format!( "tcp4_connections_handled {}", tracker_metrics.protocol_metrics.tcp4_connections_handled @@ -34,6 +38,8 @@ pub fn metrics_response(tracker_metrics: &TrackerMetrics) -> Response { tracker_metrics.protocol_metrics.tcp4_scrapes_handled )); + // TCPv6 + lines.push(format!( "tcp6_connections_handled {}", tracker_metrics.protocol_metrics.tcp6_connections_handled @@ -47,10 +53,34 @@ pub fn metrics_response(tracker_metrics: &TrackerMetrics) -> Response { tracker_metrics.protocol_metrics.tcp6_scrapes_handled )); + // UDP + lines.push(format!( "udp_requests_aborted {}", tracker_metrics.protocol_metrics.udp_requests_aborted )); + lines.push(format!( + "udp_requests_banned {}", + tracker_metrics.protocol_metrics.udp_requests_banned + )); + lines.push(format!( + "udp_banned_ips_total {}", + tracker_metrics.protocol_metrics.udp_banned_ips_total + )); + lines.push(format!( + "udp_avg_connect_processing_time_ns {}", + tracker_metrics.protocol_metrics.udp_avg_connect_processing_time_ns + )); + lines.push(format!( + "udp_avg_announce_processing_time_ns {}", + tracker_metrics.protocol_metrics.udp_avg_announce_processing_time_ns + )); + lines.push(format!( + "udp_avg_scrape_processing_time_ns {}", + tracker_metrics.protocol_metrics.udp_avg_scrape_processing_time_ns + )); + + // UDPv4 lines.push(format!("udp4_requests {}", tracker_metrics.protocol_metrics.udp4_requests)); lines.push(format!( @@ -71,6 +101,8 @@ pub fn metrics_response(tracker_metrics: &TrackerMetrics) -> Response { tracker_metrics.protocol_metrics.udp4_errors_handled )); + // UDPv6 + lines.push(format!("udp6_requests {}", tracker_metrics.protocol_metrics.udp6_requests)); lines.push(format!( "udp6_connections_handled {}", diff --git a/src/servers/apis/v1/context/stats/routes.rs b/src/servers/apis/v1/context/stats/routes.rs index d8d552697..fde1056c3 100644 --- a/src/servers/apis/v1/context/stats/routes.rs +++ b/src/servers/apis/v1/context/stats/routes.rs @@ -7,11 +7,16 @@ use std::sync::Arc; use axum::routing::get; use axum::Router; +use tokio::sync::RwLock; use super::handlers::get_stats_handler; use crate::core::Tracker; +use crate::servers::udp::server::banning::BanService; /// It adds the routes to the router for the [`stats`](crate::servers::apis::v1::context::stats) API context. -pub fn add(prefix: &str, router: Router, tracker: Arc) -> Router { - router.route(&format!("{prefix}/stats"), get(get_stats_handler).with_state(tracker)) +pub fn add(prefix: &str, router: Router, tracker: Arc, ban_service: Arc>) -> Router { + router.route( + &format!("{prefix}/stats"), + get(get_stats_handler).with_state((tracker, ban_service)), + ) } diff --git a/src/servers/apis/v1/routes.rs b/src/servers/apis/v1/routes.rs index 3786b3532..23ef6c47e 100644 --- a/src/servers/apis/v1/routes.rs +++ b/src/servers/apis/v1/routes.rs @@ -2,16 +2,18 @@ use std::sync::Arc; use axum::Router; +use tokio::sync::RwLock; use super::context::{auth_key, stats, torrent, whitelist}; use crate::core::Tracker; +use crate::servers::udp::server::banning::BanService; /// Add the routes for the v1 API. -pub fn add(prefix: &str, router: Router, tracker: Arc) -> Router { +pub fn add(prefix: &str, router: Router, tracker: Arc, ban_service: Arc>) -> Router { let v1_prefix = format!("{prefix}/v1"); let router = auth_key::routes::add(&v1_prefix, router, tracker.clone()); - let router = stats::routes::add(&v1_prefix, router, tracker.clone()); + let router = stats::routes::add(&v1_prefix, router, tracker.clone(), ban_service); let router = whitelist::routes::add(&v1_prefix, router, tracker.clone()); torrent::routes::add(&v1_prefix, router, tracker) diff --git a/src/servers/udp/server/banning.rs b/src/servers/udp/server/banning.rs index df236820c..d32dfa541 100644 --- a/src/servers/udp/server/banning.rs +++ b/src/servers/udp/server/banning.rs @@ -20,7 +20,6 @@ use std::net::IpAddr; use bloom::{CountingBloomFilter, ASMS}; use tokio::time::Instant; -use url::Url; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; @@ -28,16 +27,14 @@ pub struct BanService { max_connection_id_errors_per_ip: u32, fuzzy_error_counter: CountingBloomFilter, accurate_error_counter: HashMap, - local_addr: Url, last_connection_id_errors_reset: Instant, } impl BanService { #[must_use] - pub fn new(max_connection_id_errors_per_ip: u32, local_addr: Url) -> Self { + pub fn new(max_connection_id_errors_per_ip: u32) -> Self { Self { max_connection_id_errors_per_ip, - local_addr, fuzzy_error_counter: CountingBloomFilter::with_rate(4, 0.01, 100), accurate_error_counter: HashMap::new(), last_connection_id_errors_reset: tokio::time::Instant::now(), @@ -54,6 +51,11 @@ impl BanService { self.accurate_error_counter.get(ip).copied() } + #[must_use] + pub fn get_banned_ips_total(&self) -> usize { + self.accurate_error_counter.len() + } + #[must_use] pub fn get_estimate_count(&self, ip: &IpAddr) -> u32 { self.fuzzy_error_counter.estimate_count(&ip.to_string()) @@ -82,8 +84,7 @@ impl BanService { self.last_connection_id_errors_reset = Instant::now(); - let local_addr = self.local_addr.to_string(); - tracing::info!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (connection id errors filter cleared)"); + tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp::run_udp_server::loop (connection id errors filter cleared)"); } } @@ -95,8 +96,7 @@ mod tests { /// Sample service with one day ban duration. fn ban_service(counter_limit: u32) -> BanService { - let udp_tracker_url = "udp://127.0.0.1".parse().unwrap(); - BanService::new(counter_limit, udp_tracker_url) + BanService::new(counter_limit) } #[test] diff --git a/src/servers/udp/server/launcher.rs b/src/servers/udp/server/launcher.rs index ada50eb31..753dc9915 100644 --- a/src/servers/udp/server/launcher.rs +++ b/src/servers/udp/server/launcher.rs @@ -24,7 +24,7 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET; /// The maximum number of connection id errors per ip. Clients will be banned if /// they exceed this limit. -const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10; +pub const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10; const IP_BANS_RESET_INTERVAL_IN_SECS: u64 = 3600; /// A UDP server instance launcher. @@ -40,9 +40,10 @@ impl Launcher { /// It panics if unable to send address of socket. /// It panics if the udp server is loaded when the tracker is private. /// - #[instrument(skip(tracker, bind_to, tx_start, rx_halt))] + #[instrument(skip(tracker, ban_service, bind_to, tx_start, rx_halt))] pub async fn run_with_graceful_shutdown( tracker: Arc, + ban_service: Arc>, bind_to: SocketAddr, cookie_lifetime: Duration, tx_start: oneshot::Sender, @@ -80,7 +81,7 @@ impl Launcher { let local_addr = local_udp_url.clone(); tokio::task::spawn(async move { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)"); - let () = Self::run_udp_server_main(receiver, tracker.clone(), cookie_lifetime).await; + let () = Self::run_udp_server_main(receiver, tracker.clone(), ban_service.clone(), cookie_lifetime).await; }) }; @@ -117,8 +118,13 @@ impl Launcher { ServiceHealthCheckJob::new(binding, info, job) } - #[instrument(skip(receiver, tracker))] - async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc, cookie_lifetime: Duration) { + #[instrument(skip(receiver, tracker, ban_service))] + async fn run_udp_server_main( + mut receiver: Receiver, + tracker: Arc, + ban_service: Arc>, + cookie_lifetime: Duration, + ) { let active_requests = &mut ActiveRequests::default(); let addr = receiver.bound_socket_address(); @@ -127,11 +133,6 @@ impl Launcher { let cookie_lifetime = cookie_lifetime.as_secs_f64(); - let ban_service = Arc::new(RwLock::new(BanService::new( - MAX_CONNECTION_ID_ERRORS_PER_IP, - local_addr.parse().unwrap(), - ))); - let ban_cleaner = ban_service.clone(); tokio::spawn(async move { @@ -175,6 +176,9 @@ impl Launcher { if ban_service.read().await.is_banned(&req.from.ip()) { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop continue: (banned ip)"); + + tracker.send_stats_event(statistics::event::Event::UdpRequestBanned).await; + continue; } @@ -202,7 +206,7 @@ impl Launcher { if old_request_aborted { // Evicted task from active requests buffer was aborted. - tracker.send_stats_event(statistics::event::Event::Udp4RequestAborted).await; + tracker.send_stats_event(statistics::event::Event::UdpRequestAborted).await; } } else { tokio::task::yield_now().await; diff --git a/src/servers/udp/server/mod.rs b/src/servers/udp/server/mod.rs index 9f974ca8c..6eb98a7b1 100644 --- a/src/servers/udp/server/mod.rs +++ b/src/servers/udp/server/mod.rs @@ -58,17 +58,23 @@ mod tests { use std::sync::Arc; use std::time::Duration; + use tokio::sync::RwLock; use torrust_tracker_test_helpers::configuration::ephemeral_public; use super::spawner::Spawner; use super::Server; use crate::bootstrap::app::initialize_with_configuration; use crate::servers::registar::Registar; + use crate::servers::udp::server::banning::BanService; + use crate::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; #[tokio::test] async fn it_should_be_able_to_start_and_stop() { let cfg = Arc::new(ephemeral_public()); + let tracker = initialize_with_configuration(&cfg); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let udp_trackers = cfg.udp_trackers.clone().expect("missing UDP trackers configuration"); let config = &udp_trackers[0]; let bind_to = config.bind_address; @@ -77,7 +83,7 @@ mod tests { let stopped = Server::new(Spawner::new(bind_to)); let started = stopped - .start(tracker, register.give_form(), config.cookie_lifetime) + .start(tracker, ban_service, register.give_form(), config.cookie_lifetime) .await .expect("it should start the server"); @@ -91,7 +97,10 @@ mod tests { #[tokio::test] async fn it_should_be_able_to_start_and_stop_with_wait() { let cfg = Arc::new(ephemeral_public()); + let tracker = initialize_with_configuration(&cfg); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let config = &cfg.udp_trackers.as_ref().unwrap().first().unwrap(); let bind_to = config.bind_address; let register = &Registar::default(); @@ -99,7 +108,7 @@ mod tests { let stopped = Server::new(Spawner::new(bind_to)); let started = stopped - .start(tracker, register.give_form(), config.cookie_lifetime) + .start(tracker, ban_service, register.give_form(), config.cookie_lifetime) .await .expect("it should start the server"); diff --git a/src/servers/udp/server/processor.rs b/src/servers/udp/server/processor.rs index 9a9798698..e0f7c4624 100644 --- a/src/servers/udp/server/processor.rs +++ b/src/servers/udp/server/processor.rs @@ -1,13 +1,16 @@ use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; +use std::time::Duration; use aquatic_udp_protocol::Response; use tokio::sync::RwLock; +use tokio::time::Instant; use tracing::{instrument, Level}; use super::banning::BanService; use super::bound_socket::BoundSocket; +use crate::core::statistics::event::UdpResponseKind; use crate::core::{statistics, Tracker}; use crate::servers::udp::handlers::CookieTimeValues; use crate::servers::udp::{handlers, RawRequest}; @@ -30,6 +33,9 @@ impl Processor { #[instrument(skip(self, request, ban_service))] pub async fn process_request(self, request: RawRequest, ban_service: Arc>) { let from = request.from; + + let start_time = Instant::now(); + let response = handlers::handle_packet( request, &self.tracker, @@ -39,11 +45,13 @@ impl Processor { ) .await; - self.send_response(from, response).await; + let elapsed_time = start_time.elapsed(); + + self.send_response(from, response, elapsed_time).await; } #[instrument(skip(self))] - async fn send_response(self, target: SocketAddr, response: Response) { + async fn send_response(self, target: SocketAddr, response: Response, req_processing_time: Duration) { tracing::debug!("send response"); let response_type = match &response { @@ -54,6 +62,13 @@ impl Processor { Response::Error(e) => format!("Error: {e:?}"), }; + let response_kind = match &response { + Response::Connect(_) => UdpResponseKind::Connect, + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => UdpResponseKind::Announce, + Response::Scrape(_) => UdpResponseKind::Scrape, + Response::Error(_e) => UdpResponseKind::Error, + }; + let mut writer = Cursor::new(Vec::with_capacity(200)); match response.write_bytes(&mut writer) { @@ -71,10 +86,20 @@ impl Processor { match target.ip() { IpAddr::V4(_) => { - self.tracker.send_stats_event(statistics::event::Event::Udp4Response).await; + self.tracker + .send_stats_event(statistics::event::Event::Udp4Response { + kind: response_kind, + req_processing_time, + }) + .await; } IpAddr::V6(_) => { - self.tracker.send_stats_event(statistics::event::Event::Udp6Response).await; + self.tracker + .send_stats_event(statistics::event::Event::Udp6Response { + kind: response_kind, + req_processing_time, + }) + .await; } } } diff --git a/src/servers/udp/server/spawner.rs b/src/servers/udp/server/spawner.rs index acebdcf75..ce2fe8eae 100644 --- a/src/servers/udp/server/spawner.rs +++ b/src/servers/udp/server/spawner.rs @@ -5,9 +5,10 @@ use std::time::Duration; use derive_more::derive::Display; use derive_more::Constructor; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, RwLock}; use tokio::task::JoinHandle; +use super::banning::BanService; use super::launcher::Launcher; use crate::bootstrap::jobs::Started; use crate::core::Tracker; @@ -28,6 +29,7 @@ impl Spawner { pub fn spawn_launcher( &self, tracker: Arc, + ban_service: Arc>, cookie_lifetime: Duration, tx_start: oneshot::Sender, rx_halt: oneshot::Receiver, @@ -35,7 +37,7 @@ impl Spawner { let spawner = Self::new(self.bind_to); tokio::spawn(async move { - Launcher::run_with_graceful_shutdown(tracker, spawner.bind_to, cookie_lifetime, tx_start, rx_halt).await; + Launcher::run_with_graceful_shutdown(tracker, ban_service, spawner.bind_to, cookie_lifetime, tx_start, rx_halt).await; spawner }) } diff --git a/src/servers/udp/server/states.rs b/src/servers/udp/server/states.rs index 8b87c6efb..02742049d 100644 --- a/src/servers/udp/server/states.rs +++ b/src/servers/udp/server/states.rs @@ -5,9 +5,11 @@ use std::time::Duration; use derive_more::derive::Display; use derive_more::Constructor; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use tracing::{instrument, Level}; +use super::banning::BanService; use super::spawner::Spawner; use super::{Server, UdpError}; use crate::bootstrap::jobs::Started; @@ -62,10 +64,11 @@ impl Server { /// /// It panics if unable to receive the bound socket address from service. /// - #[instrument(skip(self, tracker, form), err, ret(Display, level = Level::INFO))] + #[instrument(skip(self, tracker, ban_service, form), err, ret(Display, level = Level::INFO))] pub async fn start( self, tracker: Arc, + ban_service: Arc>, form: ServiceRegistrationForm, cookie_lifetime: Duration, ) -> Result, std::io::Error> { @@ -75,7 +78,10 @@ impl Server { assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); // May need to wrap in a task to about a tokio bug. - let task = self.state.spawner.spawn_launcher(tracker, cookie_lifetime, tx_start, rx_halt); + let task = self + .state + .spawner + .spawn_launcher(tracker, ban_service, cookie_lifetime, tx_start, rx_halt); let local_addr = rx_start.await.expect("it should be able to start the service").address; diff --git a/tests/servers/api/environment.rs b/tests/servers/api/environment.rs index f754e329f..00fb9d05b 100644 --- a/tests/servers/api/environment.rs +++ b/tests/servers/api/environment.rs @@ -3,12 +3,15 @@ use std::sync::Arc; use bittorrent_primitives::info_hash::InfoHash; use futures::executor::block_on; +use tokio::sync::RwLock; use torrust_tracker_configuration::{Configuration, HttpApi}; use torrust_tracker_lib::bootstrap::app::initialize_with_configuration; use torrust_tracker_lib::bootstrap::jobs::make_rust_tls; use torrust_tracker_lib::core::Tracker; use torrust_tracker_lib::servers::apis::server::{ApiServer, Launcher, Running, Stopped}; use torrust_tracker_lib::servers::registar::Registar; +use torrust_tracker_lib::servers::udp::server::banning::BanService; +use torrust_tracker_lib::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; use torrust_tracker_primitives::peer; use super::connection_info::ConnectionInfo; @@ -19,6 +22,7 @@ where { pub config: Arc, pub tracker: Arc, + pub ban_service: Arc>, pub registar: Registar, pub server: ApiServer, } @@ -37,6 +41,8 @@ impl Environment { pub fn new(configuration: &Arc) -> Self { let tracker = initialize_with_configuration(configuration); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); + let config = Arc::new(configuration.http_api.clone().expect("missing API configuration")); let bind_to = config.bind_address; @@ -48,6 +54,7 @@ impl Environment { Self { config, tracker, + ban_service, registar: Registar::default(), server, } @@ -59,10 +66,11 @@ impl Environment { Environment { config: self.config, tracker: self.tracker.clone(), + ban_service: self.ban_service.clone(), registar: self.registar.clone(), server: self .server - .start(self.tracker, self.registar.give_form(), access_tokens) + .start(self.tracker, self.ban_service, self.registar.give_form(), access_tokens) .await .unwrap(), } @@ -78,6 +86,7 @@ impl Environment { Environment { config: self.config, tracker: self.tracker, + ban_service: self.ban_service, registar: Registar::default(), server: self.server.stop().await.unwrap(), } diff --git a/tests/servers/api/v1/contract/context/stats.rs b/tests/servers/api/v1/contract/context/stats.rs index a81ad6f8c..bc6e495a3 100644 --- a/tests/servers/api/v1/contract/context/stats.rs +++ b/tests/servers/api/v1/contract/context/stats.rs @@ -45,12 +45,19 @@ async fn should_allow_getting_tracker_statistics() { tcp6_scrapes_handled: 0, // UDP udp_requests_aborted: 0, + udp_requests_banned: 0, + udp_banned_ips_total: 0, + udp_avg_connect_processing_time_ns: 0, + udp_avg_announce_processing_time_ns: 0, + udp_avg_scrape_processing_time_ns: 0, + // UDPv4 udp4_requests: 0, udp4_connections_handled: 0, udp4_announces_handled: 0, udp4_scrapes_handled: 0, udp4_responses: 0, udp4_errors_handled: 0, + // UDPv6 udp6_requests: 0, udp6_connections_handled: 0, udp6_announces_handled: 0, diff --git a/tests/servers/udp/contract.rs b/tests/servers/udp/contract.rs index de46b7c10..f0ed98b21 100644 --- a/tests/servers/udp/contract.rs +++ b/tests/servers/udp/contract.rs @@ -229,12 +229,16 @@ mod receiving_an_announce_request { logging::setup(); let env = Started::new(&configuration::ephemeral().into()).await; + let tracker = env.tracker.clone(); + let ban_service = env.ban_service.clone(); let client = match UdpTrackerClient::new(env.bind_address(), DEFAULT_TIMEOUT).await { Ok(udp_tracker_client) => udp_tracker_client, Err(err) => panic!("{err}"), }; + let udp_banned_ips_total_before = ban_service.read().await.get_banned_ips_total(); + // The eleven first requests should be fine let invalid_connection_id = ConnectionId::new(0); // Zero is one of the not normal values. @@ -267,6 +271,8 @@ mod receiving_an_announce_request { info_hash, ); + let udp_requests_banned_before = tracker.get_stats().await.udp_requests_banned; + // This should return a timeout error match client.send(announce_request.into()).await { Ok(_) => (), @@ -275,6 +281,15 @@ mod receiving_an_announce_request { assert!(client.receive().await.is_err()); + let udp_requests_banned_after = tracker.get_stats().await.udp_requests_banned; + let udp_banned_ips_total_after = ban_service.read().await.get_banned_ips_total(); + + // UDP counter for banned requests should be increased by 1 + assert_eq!(udp_requests_banned_after, udp_requests_banned_before + 1); + + // UDP counter for banned IPs should be increased by 1 + assert_eq!(udp_banned_ips_total_after, udp_banned_ips_total_before + 1); + env.stop().await; } } diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index 01639accc..f744809c5 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -2,10 +2,13 @@ use std::net::SocketAddr; use std::sync::Arc; use bittorrent_primitives::info_hash::InfoHash; +use tokio::sync::RwLock; use torrust_tracker_configuration::{Configuration, UdpTracker, DEFAULT_TIMEOUT}; use torrust_tracker_lib::bootstrap::app::initialize_with_configuration; use torrust_tracker_lib::core::Tracker; use torrust_tracker_lib::servers::registar::Registar; +use torrust_tracker_lib::servers::udp::server::banning::BanService; +use torrust_tracker_lib::servers::udp::server::launcher::MAX_CONNECTION_ID_ERRORS_PER_IP; use torrust_tracker_lib::servers::udp::server::spawner::Spawner; use torrust_tracker_lib::servers::udp::server::states::{Running, Stopped}; use torrust_tracker_lib::servers::udp::server::Server; @@ -17,6 +20,7 @@ where { pub config: Arc, pub tracker: Arc, + pub ban_service: Arc>, pub registar: Registar, pub server: Server, } @@ -36,6 +40,7 @@ impl Environment { #[allow(dead_code)] pub fn new(configuration: &Arc) -> Self { let tracker = initialize_with_configuration(configuration); + let ban_service = Arc::new(RwLock::new(BanService::new(MAX_CONNECTION_ID_ERRORS_PER_IP))); let udp_tracker = configuration.udp_trackers.clone().expect("missing UDP tracker configuration"); @@ -48,6 +53,7 @@ impl Environment { Self { config, tracker, + ban_service, registar: Registar::default(), server, } @@ -59,10 +65,11 @@ impl Environment { Environment { config: self.config, tracker: self.tracker.clone(), + ban_service: self.ban_service.clone(), registar: self.registar.clone(), server: self .server - .start(self.tracker, self.registar.give_form(), cookie_lifetime) + .start(self.tracker, self.ban_service, self.registar.give_form(), cookie_lifetime) .await .unwrap(), } @@ -85,6 +92,7 @@ impl Environment { Environment { config: self.config, tracker: self.tracker, + ban_service: self.ban_service, registar: Registar::default(), server: stopped.expect("it stop the udp tracker service"), }