From 9b3b75bd5fdf1dfc812d656294dbeac65b2643ca Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 07:59:21 +0100 Subject: [PATCH 01/13] fix: log message --- src/servers/udp/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 64f2fa2ab..af52e2de3 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -331,7 +331,7 @@ impl Udp { ) { let halt_task = tokio::task::spawn(shutdown_signal_with_message( rx_halt, - format!("Halting Http Service Bound to Socket: {bind_to}"), + format!("Halting UDP Service Bound to Socket: {bind_to}"), )); let socket = tokio::time::timeout(Duration::from_millis(5000), Socket::new(bind_to)) From 0e3678d2d6b4f4c0f0f6be7218b01e2b9e6e3fe3 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 08:02:05 +0100 Subject: [PATCH 02/13] refactor: rename Socket to BoundSocket and fix format errors" --- src/lib.rs | 2 +- src/servers/udp/server.rs | 23 ++++++++++------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bb6826dd1..cf2834418 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -494,7 +494,7 @@ pub mod bootstrap; pub mod console; pub mod core; pub mod servers; -pub mod shared; +pub mod shared; #[macro_use] extern crate lazy_static; diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index af52e2de3..3fb494238 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -235,11 +235,11 @@ impl Drop for ActiveRequests { } /// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket. -struct Socket { +struct BoundSocket { socket: Arc, } -impl Socket { +impl BoundSocket { async fn new(addr: SocketAddr) -> Result> { let socket = tokio::net::UdpSocket::bind(addr).await; @@ -257,7 +257,7 @@ impl Socket { } } -impl Deref for Socket { +impl Deref for BoundSocket { type Target = tokio::net::UdpSocket; fn deref(&self) -> &Self::Target { @@ -265,7 +265,7 @@ impl Deref for Socket { } } -impl Debug for Socket { +impl Debug for BoundSocket { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let local_addr = match self.socket.local_addr() { Ok(socket) => format!("Receiving From: {socket}"), @@ -334,7 +334,7 @@ impl Udp { format!("Halting UDP Service Bound to Socket: {bind_to}"), )); - let socket = tokio::time::timeout(Duration::from_millis(5000), Socket::new(bind_to)) + let socket = tokio::time::timeout(Duration::from_millis(5000), BoundSocket::new(bind_to)) .await .expect("it should bind to the socket within five seconds"); @@ -543,17 +543,14 @@ impl Udp { #[cfg(test)] mod tests { - use std::{sync::Arc, time::Duration}; + use std::sync::Arc; + use std::time::Duration; use torrust_tracker_test_helpers::configuration::ephemeral_mode_public; - use crate::{ - bootstrap::app::initialize_with_configuration, - servers::{ - registar::Registar, - udp::server::{Launcher, UdpServer}, - }, - }; + use crate::bootstrap::app::initialize_with_configuration; + use crate::servers::registar::Registar; + use crate::servers::udp::server::{Launcher, UdpServer}; #[tokio::test] async fn it_should_be_able_to_start_and_stop() { From 7ff0cd249fe62adf4c8ba9b3c4815fb68d747b69 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 08:26:25 +0100 Subject: [PATCH 03/13] refactor: rename var --- src/servers/udp/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 3fb494238..c9f7e458f 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -357,7 +357,7 @@ impl Udp { let socket = socket.socket; - let direct = Receiver { + let receiver = Receiver { socket, tracker, data: RefCell::new([0; MAX_PACKET_SIZE]), @@ -368,7 +368,7 @@ impl Udp { let local_addr = local_addr.clone(); tokio::task::spawn(async move { tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown::task", local_addr, "(listening...)"); - let () = Self::run_udp_server_main(direct).await; + let () = Self::run_udp_server_main(receiver).await; }) }; From 16ae4fd14bdb03c8704c2af9ecb20873e6c396d3 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 08:51:48 +0100 Subject: [PATCH 04/13] refactor: rename vars and extract constructor --- src/servers/udp/server.rs | 69 ++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index c9f7e458f..229729038 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -33,7 +33,6 @@ use derive_more::Constructor; use futures::{Stream, StreamExt}; use ringbuf::traits::{Consumer, Observer, Producer}; use ringbuf::StaticRb; -use tokio::net::UdpSocket; use tokio::select; use tokio::sync::oneshot; use tokio::task::{AbortHandle, JoinHandle}; @@ -255,6 +254,10 @@ impl BoundSocket { socket: Arc::new(socket), }) } + + fn local_addr(&self) -> SocketAddr { + self.socket.local_addr().expect("it should get local address") + } } impl Deref for BoundSocket { @@ -277,11 +280,21 @@ impl Debug for BoundSocket { } struct Receiver { - socket: Arc, + bound_socket: Arc, tracker: Arc, data: RefCell<[u8; MAX_PACKET_SIZE]>, } +impl Receiver { + pub fn new(bound_socket: Arc, tracker: Arc) -> Self { + Receiver { + bound_socket, + tracker, + data: RefCell::new([0; MAX_PACKET_SIZE]), + } + } +} + impl Stream for Receiver { type Item = std::io::Result; @@ -289,7 +302,7 @@ impl Stream for Receiver { let mut buf = *self.data.borrow_mut(); let mut buf = tokio::io::ReadBuf::new(&mut buf); - let Poll::Ready(ready) = self.socket.poll_recv_from(cx, &mut buf) else { + let Poll::Ready(ready) = self.bound_socket.poll_recv_from(cx, &mut buf) else { return Poll::Pending; }; @@ -301,7 +314,7 @@ impl Stream for Receiver { Some(Ok(tokio::task::spawn(Udp::process_request( request, self.tracker.clone(), - self.socket.clone(), + self.bound_socket.clone(), )) .abort_handle())) } @@ -338,7 +351,7 @@ impl Udp { .await .expect("it should bind to the socket within five seconds"); - let socket = match socket { + let bound_socket = match socket { Ok(socket) => socket, Err(e) => { tracing::error!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", addr = %bind_to, err = %e, "panic! (error when building socket)" ); @@ -346,26 +359,21 @@ impl Udp { } }; - let address = socket.local_addr().expect("it should get the locally bound address"); - let local_addr = format!("udp://{address}"); + let address = bound_socket.local_addr(); + let local_udp_url = format!("udp://{address}"); // note: this log message is parsed by our container. i.e: // // `[UDP TRACKER][INFO] Starting on: udp://` // - tracing::info!(target: "UDP TRACKER", "Starting on: {local_addr}"); + tracing::info!(target: "UDP TRACKER", "Starting on: {local_udp_url}"); - let socket = socket.socket; + let receiver = Receiver::new(bound_socket.into(), tracker); - let receiver = Receiver { - socket, - tracker, - data: RefCell::new([0; MAX_PACKET_SIZE]), - }; + tracing::trace!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_udp_url, "(spawning main loop)"); - tracing::trace!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_addr, "(spawning main loop)"); let running = { - let local_addr = local_addr.clone(); + let local_addr = local_udp_url.clone(); tokio::task::spawn(async move { tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown::task", local_addr, "(listening...)"); let () = Self::run_udp_server_main(receiver).await; @@ -376,29 +384,29 @@ impl Udp { .send(Started { address }) .expect("the UDP Tracker service should not be dropped"); - tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_addr, "(started)"); + tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_udp_url, "(started)"); let stop = running.abort_handle(); select! { - _ = running => { tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_addr, "(stopped)"); }, - _ = halt_task => { tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown",local_addr, "(halting)"); } + _ = running => { tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_udp_url, "(stopped)"); }, + _ = halt_task => { tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown",local_udp_url, "(halting)"); } } stop.abort(); tokio::task::yield_now().await; // lets allow the other threads to complete. } - async fn run_udp_server_main(mut direct: Receiver) { + async fn run_udp_server_main(mut receiver: Receiver) { let reqs = &mut ActiveRequests::default(); - let addr = direct.socket.local_addr().expect("it should get local address"); + let addr = receiver.bound_socket.local_addr(); let local_addr = format!("udp://{addr}"); loop { if let Some(req) = { tracing::trace!(target: "UDP TRACKER: Udp::run_udp_server", local_addr, "(wait for request)"); - direct.next().await + receiver.next().await } { tracing::trace!(target: "UDP TRACKER: Udp::run_udp_server::loop", local_addr, "(in)"); @@ -474,24 +482,19 @@ impl Udp { } } - async fn process_request(request: UdpRequest, tracker: Arc, socket: Arc) { + async fn process_request(request: UdpRequest, tracker: Arc, socket: Arc) { tracing::trace!(target: "UDP TRACKER: Udp::process_request", request = %request.from, "(receiving)"); Self::process_valid_request(tracker, socket, request).await; } - async fn process_valid_request(tracker: Arc, socket: Arc, udp_request: UdpRequest) { + async fn process_valid_request(tracker: Arc, socket: Arc, udp_request: UdpRequest) { tracing::trace!(target: "UDP TRACKER: Udp::process_valid_request", "Making Response to {udp_request:?}"); let from = udp_request.from; - let response = handlers::handle_packet( - udp_request, - &tracker.clone(), - socket.local_addr().expect("it should get the local address"), - ) - .await; + let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.local_addr()).await; Self::send_response(&socket.clone(), from, response).await; } - async fn send_response(socket: &Arc, to: SocketAddr, response: Response) { + async fn send_response(bound_socket: &Arc, to: SocketAddr, response: Response) { let response_type = match &response { Response::Connect(_) => "Connect".to_string(), Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(), @@ -514,7 +517,7 @@ impl Udp { tracing::debug!(target: "UDP TRACKER: Udp::send_response", ?to, bytes_count = &inner[..position].len(), "(sending...)" ); tracing::trace!(target: "UDP TRACKER: Udp::send_response", ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "(sending...)"); - Self::send_packet(socket, &to, &inner[..position]).await; + Self::send_packet(bound_socket, &to, &inner[..position]).await; tracing::trace!(target: "UDP TRACKER: Udp::send_response", ?to, bytes_count = &inner[..position].len(), "(sent)"); } @@ -524,7 +527,7 @@ impl Udp { } } - async fn send_packet(socket: &Arc, remote_addr: &SocketAddr, payload: &[u8]) { + async fn send_packet(socket: &Arc, remote_addr: &SocketAddr, payload: &[u8]) { tracing::trace!(target: "UDP TRACKER: Udp::send_response", to = %remote_addr, ?payload, "(sending)"); // doesn't matter if it reaches or not From 0388e1d1439bbc1d2ef7b59bf225a2d152358a2b Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 13:20:17 +0100 Subject: [PATCH 05/13] refactor: extract consts for logging targets --- src/bootstrap/jobs/health_check_api.rs | 8 +-- src/bootstrap/jobs/udp_tracker.rs | 7 +-- src/console/ci/e2e/logs_parser.rs | 7 +-- src/servers/apis/mod.rs | 6 ++- src/servers/apis/routes.rs | 5 +- src/servers/apis/server.rs | 9 ++-- src/servers/health_check_api/mod.rs | 2 + src/servers/health_check_api/server.rs | 7 +-- src/servers/http/mod.rs | 2 + src/servers/http/server.rs | 5 +- src/servers/http/v1/routes.rs | 5 +- src/servers/udp/logging.rs | 13 ++--- src/servers/udp/mod.rs | 2 + src/servers/udp/server.rs | 54 ++++++++++--------- src/shared/bit_torrent/tracker/udp/client.rs | 12 +++-- tests/servers/health_check_api/environment.rs | 10 ++-- 16 files changed, 87 insertions(+), 67 deletions(-) diff --git a/src/bootstrap/jobs/health_check_api.rs b/src/bootstrap/jobs/health_check_api.rs index c22a4cf95..e79a6da77 100644 --- a/src/bootstrap/jobs/health_check_api.rs +++ b/src/bootstrap/jobs/health_check_api.rs @@ -20,7 +20,7 @@ use torrust_tracker_configuration::HealthCheckApi; use tracing::info; use super::Started; -use crate::servers::health_check_api::server; +use crate::servers::health_check_api::{server, HEALTH_CHECK_API_LOG_TARGET}; use crate::servers::registar::ServiceRegistry; use crate::servers::signals::Halted; @@ -44,18 +44,18 @@ pub async fn start_job(config: &HealthCheckApi, register: ServiceRegistry) -> Jo // Run the API server let join_handle = tokio::spawn(async move { - info!(target: "HEALTH CHECK API", "Starting on: {protocol}://{}", bind_addr); + info!(target: HEALTH_CHECK_API_LOG_TARGET, "Starting on: {protocol}://{}", bind_addr); let handle = server::start(bind_addr, tx_start, rx_halt, register); if let Ok(()) = handle.await { - info!(target: "HEALTH CHECK API", "Stopped server running on: {protocol}://{}", bind_addr); + info!(target: HEALTH_CHECK_API_LOG_TARGET, "Stopped server running on: {protocol}://{}", bind_addr); } }); // Wait until the server sends the started message match rx_start.await { - Ok(msg) => info!(target: "HEALTH CHECK API", "Started on: {protocol}://{}", msg.address), + Ok(msg) => info!(target: HEALTH_CHECK_API_LOG_TARGET, "Started on: {protocol}://{}", msg.address), Err(e) => panic!("the Health Check API server was dropped: {e}"), } diff --git a/src/bootstrap/jobs/udp_tracker.rs b/src/bootstrap/jobs/udp_tracker.rs index 2c09e6de2..ba39df2fe 100644 --- a/src/bootstrap/jobs/udp_tracker.rs +++ b/src/bootstrap/jobs/udp_tracker.rs @@ -15,6 +15,7 @@ use tracing::debug; use crate::core; use crate::servers::registar::ServiceRegistrationForm; use crate::servers::udp::server::{Launcher, UdpServer}; +use crate::servers::udp::UDP_TRACKER_LOG_TARGET; /// It starts a new UDP server with the provided configuration. /// @@ -35,8 +36,8 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc, form: S .expect("it should be able to start the udp tracker"); tokio::spawn(async move { - debug!(target: "UDP TRACKER", "Wait for launcher (UDP service) to finish ..."); - debug!(target: "UDP TRACKER", "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed()); + debug!(target: UDP_TRACKER_LOG_TARGET, "Wait for launcher (UDP service) to finish ..."); + debug!(target: UDP_TRACKER_LOG_TARGET, "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed()); assert!( !server.state.halt_task.is_closed(), @@ -49,6 +50,6 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc, form: S .await .expect("it should be able to join to the udp tracker task"); - debug!(target: "UDP TRACKER", "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed()); + debug!(target: UDP_TRACKER_LOG_TARGET, "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed()); }) } diff --git a/src/console/ci/e2e/logs_parser.rs b/src/console/ci/e2e/logs_parser.rs index 4886786de..8bf7974c1 100644 --- a/src/console/ci/e2e/logs_parser.rs +++ b/src/console/ci/e2e/logs_parser.rs @@ -2,10 +2,11 @@ use regex::Regex; use serde::{Deserialize, Serialize}; +use crate::servers::health_check_api::HEALTH_CHECK_API_LOG_TARGET; +use crate::servers::http::HTTP_TRACKER_LOG_TARGET; +use crate::servers::udp::UDP_TRACKER_LOG_TARGET; + const INFO_LOG_LEVEL: &str = "INFO"; -const UDP_TRACKER_LOG_TARGET: &str = "UDP TRACKER"; -const HTTP_TRACKER_LOG_TARGET: &str = "HTTP TRACKER"; -const HEALTH_CHECK_API_LOG_TARGET: &str = "HEALTH CHECK API"; #[derive(Serialize, Deserialize, Debug, Default)] pub struct RunningServices { diff --git a/src/servers/apis/mod.rs b/src/servers/apis/mod.rs index 6dae66c2d..b44ccab9f 100644 --- a/src/servers/apis/mod.rs +++ b/src/servers/apis/mod.rs @@ -157,6 +157,10 @@ pub mod routes; pub mod server; pub mod v1; +use serde::{Deserialize, Serialize}; + +pub const API_LOG_TARGET: &str = "API"; + /// The info hash URL path parameter. /// /// Some API endpoints require an info hash as a path parameter. @@ -169,8 +173,6 @@ pub mod v1; #[derive(Deserialize)] pub struct InfoHashParam(pub String); -use serde::{Deserialize, Serialize}; - /// The version of the HTTP Api. #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Eq, Debug)] pub enum Version { diff --git a/src/servers/apis/routes.rs b/src/servers/apis/routes.rs index 087bcfa4a..2001afc2f 100644 --- a/src/servers/apis/routes.rs +++ b/src/servers/apis/routes.rs @@ -27,6 +27,7 @@ use super::v1; use super::v1::context::health_check::handlers::health_check_handler; use super::v1::middlewares::auth::State; use crate::core::Tracker; +use crate::servers::apis::API_LOG_TARGET; const TIMEOUT: Duration = Duration::from_secs(5); @@ -60,7 +61,7 @@ pub fn router(tracker: Arc, access_tokens: Arc) -> Router .unwrap_or_default(); tracing::span!( - target: "API", + target: API_LOG_TARGET, tracing::Level::INFO, "request", method = %method, uri = %uri, request_id = %request_id); }) .on_response(|response: &Response, latency: Duration, _span: &Span| { @@ -73,7 +74,7 @@ pub fn router(tracker: Arc, access_tokens: Arc) -> Router let latency_ms = latency.as_millis(); tracing::span!( - target: "API", + target: API_LOG_TARGET, tracing::Level::INFO, "response", latency = %latency_ms, status = %status_code, request_id = %request_id); }), ) diff --git a/src/servers/apis/server.rs b/src/servers/apis/server.rs index 246660ab1..d47e5d542 100644 --- a/src/servers/apis/server.rs +++ b/src/servers/apis/server.rs @@ -37,6 +37,7 @@ use tracing::{debug, error, info}; use super::routes::router; use crate::bootstrap::jobs::Started; use crate::core::Tracker; +use crate::servers::apis::API_LOG_TARGET; use crate::servers::custom_axum_server::{self, TimeoutAcceptor}; use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::{graceful_shutdown, Halted}; @@ -121,11 +122,11 @@ impl ApiServer { let launcher = self.state.launcher; let task = tokio::spawn(async move { - debug!(target: "API", "Starting with launcher in spawned task ..."); + debug!(target: API_LOG_TARGET, "Starting with launcher in spawned task ..."); let _task = launcher.start(tracker, access_tokens, tx_start, rx_halt).await; - debug!(target: "API", "Started with launcher in spawned task"); + debug!(target: API_LOG_TARGET, "Started with launcher in spawned task"); launcher }); @@ -231,7 +232,7 @@ impl Launcher { let tls = self.tls.clone(); let protocol = if tls.is_some() { "https" } else { "http" }; - info!(target: "API", "Starting on {protocol}://{}", address); + info!(target: API_LOG_TARGET, "Starting on {protocol}://{}", address); let running = Box::pin(async { match tls { @@ -250,7 +251,7 @@ impl Launcher { } }); - info!(target: "API", "Started on {protocol}://{}", address); + info!(target: API_LOG_TARGET, "Started on {protocol}://{}", address); tx_start .send(Started { address }) diff --git a/src/servers/health_check_api/mod.rs b/src/servers/health_check_api/mod.rs index ec608387d..24c5232c8 100644 --- a/src/servers/health_check_api/mod.rs +++ b/src/servers/health_check_api/mod.rs @@ -2,3 +2,5 @@ pub mod handlers; pub mod resources; pub mod responses; pub mod server; + +pub const HEALTH_CHECK_API_LOG_TARGET: &str = "HEALTH CHECK API"; diff --git a/src/servers/health_check_api/server.rs b/src/servers/health_check_api/server.rs index f03753573..89fbafe45 100644 --- a/src/servers/health_check_api/server.rs +++ b/src/servers/health_check_api/server.rs @@ -22,6 +22,7 @@ use tracing::{debug, Level, Span}; use crate::bootstrap::jobs::Started; use crate::servers::health_check_api::handlers::health_check_handler; +use crate::servers::health_check_api::HEALTH_CHECK_API_LOG_TARGET; use crate::servers::registar::ServiceRegistry; use crate::servers::signals::{graceful_shutdown, Halted}; @@ -56,7 +57,7 @@ pub fn start( .unwrap_or_default(); tracing::span!( - target: "HEALTH CHECK API", + target: HEALTH_CHECK_API_LOG_TARGET, tracing::Level::INFO, "request", method = %method, uri = %uri, request_id = %request_id); }) .on_response(|response: &Response, latency: Duration, _span: &Span| { @@ -69,7 +70,7 @@ pub fn start( let latency_ms = latency.as_millis(); tracing::span!( - target: "HEALTH CHECK API", + target: HEALTH_CHECK_API_LOG_TARGET, tracing::Level::INFO, "response", latency = %latency_ms, status = %status_code, request_id = %request_id); }), ) @@ -80,7 +81,7 @@ pub fn start( let handle = Handle::new(); - debug!(target: "HEALTH CHECK API", "Starting service with graceful shutdown in a spawned task ..."); + debug!(target: HEALTH_CHECK_API_LOG_TARGET, "Starting service with graceful shutdown in a spawned task ..."); tokio::task::spawn(graceful_shutdown( handle.clone(), diff --git a/src/servers/http/mod.rs b/src/servers/http/mod.rs index e50e3c351..4ef5ca7ea 100644 --- a/src/servers/http/mod.rs +++ b/src/servers/http/mod.rs @@ -309,6 +309,8 @@ pub mod percent_encoding; pub mod server; pub mod v1; +pub const HTTP_TRACKER_LOG_TARGET: &str = "HTTP TRACKER"; + /// The version of the HTTP tracker. #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Eq, Debug)] pub enum Version { diff --git a/src/servers/http/server.rs b/src/servers/http/server.rs index 5798f7c10..9199573b0 100644 --- a/src/servers/http/server.rs +++ b/src/servers/http/server.rs @@ -13,6 +13,7 @@ use super::v1::routes::router; use crate::bootstrap::jobs::Started; use crate::core::Tracker; use crate::servers::custom_axum_server::{self, TimeoutAcceptor}; +use crate::servers::http::HTTP_TRACKER_LOG_TARGET; use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::{graceful_shutdown, Halted}; @@ -55,7 +56,7 @@ impl Launcher { let tls = self.tls.clone(); let protocol = if tls.is_some() { "https" } else { "http" }; - info!(target: "HTTP TRACKER", "Starting on: {protocol}://{}", address); + info!(target: HTTP_TRACKER_LOG_TARGET, "Starting on: {protocol}://{}", address); let app = router(tracker, address); @@ -76,7 +77,7 @@ impl Launcher { } }); - info!(target: "HTTP TRACKER", "Started on: {protocol}://{}", address); + info!(target: HTTP_TRACKER_LOG_TARGET, "Started on: {protocol}://{}", address); tx_start .send(Started { address }) diff --git a/src/servers/http/v1/routes.rs b/src/servers/http/v1/routes.rs index 14641dc1d..b2f37880c 100644 --- a/src/servers/http/v1/routes.rs +++ b/src/servers/http/v1/routes.rs @@ -20,6 +20,7 @@ use tracing::{Level, Span}; use super::handlers::{announce, health_check, scrape}; use crate::core::Tracker; +use crate::servers::http::HTTP_TRACKER_LOG_TARGET; const TIMEOUT: Duration = Duration::from_secs(5); @@ -56,7 +57,7 @@ pub fn router(tracker: Arc, server_socket_addr: SocketAddr) -> Router { .unwrap_or_default(); tracing::span!( - target:"HTTP TRACKER", + target: HTTP_TRACKER_LOG_TARGET, tracing::Level::INFO, "request", server_socket_addr= %server_socket_addr, method = %method, uri = %uri, request_id = %request_id); }) .on_response(move |response: &Response, latency: Duration, _span: &Span| { @@ -69,7 +70,7 @@ pub fn router(tracker: Arc, server_socket_addr: SocketAddr) -> Router { let latency_ms = latency.as_millis(); tracing::span!( - target: "HTTP TRACKER", + target: HTTP_TRACKER_LOG_TARGET, tracing::Level::INFO, "response", server_socket_addr= %server_socket_addr, latency = %latency_ms, status = %status_code, request_id = %request_id); }), ) diff --git a/src/servers/udp/logging.rs b/src/servers/udp/logging.rs index 9bbb48f6a..3891278d7 100644 --- a/src/servers/udp/logging.rs +++ b/src/servers/udp/logging.rs @@ -7,6 +7,7 @@ use aquatic_udp_protocol::{Request, Response, TransactionId}; use torrust_tracker_primitives::info_hash::InfoHash; use super::handlers::RequestId; +use crate::servers::udp::UDP_TRACKER_LOG_TARGET; pub fn log_request(request: &Request, request_id: &RequestId, server_socket_addr: &SocketAddr) { let action = map_action_name(request); @@ -17,7 +18,7 @@ pub fn log_request(request: &Request, request_id: &RequestId, server_socket_addr let transaction_id_str = transaction_id.0.to_string(); tracing::span!( - target: "UDP TRACKER", + target: UDP_TRACKER_LOG_TARGET, tracing::Level::INFO, "request", server_socket_addr = %server_socket_addr, action = %action, transaction_id = %transaction_id_str, request_id = %request_id); } Request::Announce(announce_request) => { @@ -27,7 +28,7 @@ pub fn log_request(request: &Request, request_id: &RequestId, server_socket_addr let info_hash_str = InfoHash::from_bytes(&announce_request.info_hash.0).to_hex_string(); tracing::span!( - target: "UDP TRACKER", + target: UDP_TRACKER_LOG_TARGET, tracing::Level::INFO, "request", server_socket_addr = %server_socket_addr, action = %action, transaction_id = %transaction_id_str, request_id = %request_id, connection_id = %connection_id_str, info_hash = %info_hash_str); } Request::Scrape(scrape_request) => { @@ -36,7 +37,7 @@ pub fn log_request(request: &Request, request_id: &RequestId, server_socket_addr let connection_id_str = scrape_request.connection_id.0.to_string(); tracing::span!( - target: "UDP TRACKER", + target: UDP_TRACKER_LOG_TARGET, tracing::Level::INFO, "request", server_socket_addr = %server_socket_addr, @@ -64,7 +65,7 @@ pub fn log_response( latency: Duration, ) { tracing::span!( - target: "UDP TRACKER", + target: UDP_TRACKER_LOG_TARGET, tracing::Level::INFO, "response", server_socket_addr = %server_socket_addr, @@ -75,12 +76,12 @@ pub fn log_response( pub fn log_bad_request(request_id: &RequestId) { tracing::span!( - target: "UDP TRACKER", + target: UDP_TRACKER_LOG_TARGET, tracing::Level::INFO, "bad request", request_id = %request_id); } pub fn log_error_response(request_id: &RequestId) { tracing::span!( - target: "UDP TRACKER", + target: UDP_TRACKER_LOG_TARGET, tracing::Level::INFO, "response", request_id = %request_id); } diff --git a/src/servers/udp/mod.rs b/src/servers/udp/mod.rs index 3062a4393..5c5460397 100644 --- a/src/servers/udp/mod.rs +++ b/src/servers/udp/mod.rs @@ -649,6 +649,8 @@ pub mod peer_builder; pub mod request; pub mod server; +pub const UDP_TRACKER_LOG_TARGET: &str = "UDP TRACKER"; + /// Number of bytes. pub type Bytes = u64; /// The port the peer is listening on. diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 229729038..e60e49ace 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -42,7 +42,7 @@ use crate::bootstrap::jobs::Started; use crate::core::Tracker; use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::{shutdown_signal_with_message, Halted}; -use crate::servers::udp::handlers; +use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET}; use crate::shared::bit_torrent::tracker::udp::client::check; use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE; @@ -150,7 +150,7 @@ impl UdpServer { }, }; - tracing::trace!(target: "UDP TRACKER: UdpServer::start", local_addr, "(running)"); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpServer::start (running)"); Ok(running_udp_server) } @@ -248,7 +248,7 @@ impl BoundSocket { }; let local_addr = format!("udp://{addr}"); - tracing::debug!(target: "UDP TRACKER: UdpSocket::new", local_addr, "(bound)"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpSocket::new (bound)"); Ok(Self { socket: Arc::new(socket), @@ -347,6 +347,8 @@ impl Udp { format!("Halting UDP Service Bound to Socket: {bind_to}"), )); + tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting on: {bind_to}"); + let socket = tokio::time::timeout(Duration::from_millis(5000), BoundSocket::new(bind_to)) .await .expect("it should bind to the socket within five seconds"); @@ -354,7 +356,7 @@ impl Udp { let bound_socket = match socket { Ok(socket) => socket, Err(e) => { - tracing::error!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", addr = %bind_to, err = %e, "panic! (error when building socket)" ); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, addr = %bind_to, err = %e, "Udp::run_with_graceful_shutdown panic! (error when building socket)" ); panic!("could not bind to socket!"); } }; @@ -364,18 +366,18 @@ impl Udp { // note: this log message is parsed by our container. i.e: // - // `[UDP TRACKER][INFO] Starting on: udp://` + // `INFO UDP TRACKER: Started on: udp://0.0.0.0:6969` // - tracing::info!(target: "UDP TRACKER", "Starting on: {local_udp_url}"); + tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Started on: {local_udp_url}"); let receiver = Receiver::new(bound_socket.into(), tracker); - tracing::trace!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_udp_url, "(spawning main loop)"); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)"); let running = { let local_addr = local_udp_url.clone(); tokio::task::spawn(async move { - tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown::task", local_addr, "(listening...)"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)"); let () = Self::run_udp_server_main(receiver).await; }) }; @@ -384,13 +386,13 @@ impl Udp { .send(Started { address }) .expect("the UDP Tracker service should not be dropped"); - tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_udp_url, "(started)"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (started)"); let stop = running.abort_handle(); select! { - _ = running => { tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown", local_udp_url, "(stopped)"); }, - _ = halt_task => { tracing::debug!(target: "UDP TRACKER: Udp::run_with_graceful_shutdown",local_udp_url, "(halting)"); } + _ = running => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (stopped)"); }, + _ = halt_task => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (halting)"); } } stop.abort(); @@ -405,19 +407,19 @@ impl Udp { loop { if let Some(req) = { - tracing::trace!(target: "UDP TRACKER: Udp::run_udp_server", local_addr, "(wait for request)"); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)"); receiver.next().await } { - tracing::trace!(target: "UDP TRACKER: Udp::run_udp_server::loop", local_addr, "(in)"); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (in)"); let req = match req { Ok(req) => req, Err(e) => { if e.kind() == std::io::ErrorKind::Interrupted { - tracing::warn!(target: "UDP TRACKER: Udp::run_udp_server::loop", local_addr, err = %e, "(interrupted)"); + tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop (interrupted)"); return; } - tracing::error!(target: "UDP TRACKER: Udp::run_udp_server::loop", local_addr, err = %e, "break: (got error)"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop break: (got error)"); break; } }; @@ -450,13 +452,13 @@ impl Udp { continue; } - tracing::debug!(target: "UDP TRACKER: Udp::run_udp_server::loop", local_addr, removed_count = finished, "(got unfinished task)"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)"); if finished == 0 { // we have _no_ finished tasks.. will abort the unfinished task to make space... h.abort(); - tracing::warn!(target: "UDP TRACKER: Udp::run_udp_server::loop", local_addr, "aborting request: (no finished tasks)"); + tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)"); break; } @@ -476,19 +478,19 @@ impl Udp { } else { tokio::task::yield_now().await; // the request iterator returned `None`. - tracing::error!(target: "UDP TRACKER: Udp::run_udp_server", local_addr, "breaking: (ran dry, should not happen in production!)"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)"); break; } } } async fn process_request(request: UdpRequest, tracker: Arc, socket: Arc) { - tracing::trace!(target: "UDP TRACKER: Udp::process_request", request = %request.from, "(receiving)"); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)"); Self::process_valid_request(tracker, socket, request).await; } async fn process_valid_request(tracker: Arc, socket: Arc, udp_request: UdpRequest) { - tracing::trace!(target: "UDP TRACKER: Udp::process_valid_request", "Making Response to {udp_request:?}"); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, "Udp::process_valid_request. Making Response to {udp_request:?}"); let from = udp_request.from; let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.local_addr()).await; Self::send_response(&socket.clone(), from, response).await; @@ -503,7 +505,7 @@ impl Udp { Response::Error(e) => format!("Error: {e:?}"), }; - tracing::debug!(target: "UDP TRACKER: Udp::send_response", target = ?to, response_type, "(sending)"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)"); let buffer = vec![0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(buffer); @@ -514,21 +516,21 @@ impl Udp { let position = cursor.position() as usize; let inner = cursor.get_ref(); - tracing::debug!(target: "UDP TRACKER: Udp::send_response", ?to, bytes_count = &inner[..position].len(), "(sending...)" ); - tracing::trace!(target: "UDP TRACKER: Udp::send_response", ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "(sending...)"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sending...)" ); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "Udp::send_response (sending...)"); Self::send_packet(bound_socket, &to, &inner[..position]).await; - tracing::trace!(target: "UDP TRACKER: Udp::send_response", ?to, bytes_count = &inner[..position].len(), "(sent)"); + tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sent)"); } Err(e) => { - tracing::error!(target: "UDP TRACKER: Udp::send_response", ?to, response_type, err = %e, "(error)"); + tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)"); } } } async fn send_packet(socket: &Arc, remote_addr: &SocketAddr, payload: &[u8]) { - tracing::trace!(target: "UDP TRACKER: Udp::send_response", to = %remote_addr, ?payload, "(sending)"); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)"); // doesn't matter if it reaches or not drop(socket.send_to(payload, remote_addr).await); diff --git a/src/shared/bit_torrent/tracker/udp/client.rs b/src/shared/bit_torrent/tracker/udp/client.rs index 900543462..dce596e08 100644 --- a/src/shared/bit_torrent/tracker/udp/client.rs +++ b/src/shared/bit_torrent/tracker/udp/client.rs @@ -13,6 +13,8 @@ use zerocopy::network_endian::I32; use crate::shared::bit_torrent::tracker::udp::{source_address, MAX_PACKET_SIZE}; +pub const UDP_CLIENT_LOG_TARGET: &str = "UDP CLIENT"; + /// Default timeout for sending and receiving packets. And waiting for sockets /// to be readable and writable. pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); @@ -82,7 +84,7 @@ impl UdpClient { /// - Can't write to the socket. /// - Can't send data. pub async fn send(&self, bytes: &[u8]) -> Result { - debug!(target: "UDP client", "sending {bytes:?} ..."); + debug!(target: UDP_CLIENT_LOG_TARGET, "sending {bytes:?} ..."); match time::timeout(self.timeout, self.socket.writable()).await { Ok(writable_result) => { @@ -115,7 +117,7 @@ impl UdpClient { pub async fn receive(&self) -> Result> { let mut response_buffer = [0u8; MAX_PACKET_SIZE]; - debug!(target: "UDP client", "receiving ..."); + debug!(target: UDP_CLIENT_LOG_TARGET, "receiving ..."); match time::timeout(self.timeout, self.socket.readable()).await { Ok(readable_result) => { @@ -138,7 +140,7 @@ impl UdpClient { let mut res: Vec = response_buffer.to_vec(); Vec::truncate(&mut res, size); - debug!(target: "UDP client", "{size} bytes received {res:?}"); + debug!(target: UDP_CLIENT_LOG_TARGET, "{size} bytes received {res:?}"); Ok(res) } @@ -168,7 +170,7 @@ impl UdpTrackerClient { /// /// Will return error if can't write request to bytes. pub async fn send(&self, request: Request) -> Result { - debug!(target: "UDP tracker client", "send request {request:?}"); + debug!(target: UDP_CLIENT_LOG_TARGET, "send request {request:?}"); // Write request into a buffer let request_buffer = vec![0u8; MAX_PACKET_SIZE]; @@ -196,7 +198,7 @@ impl UdpTrackerClient { pub async fn receive(&self) -> Result { let payload = self.udp_client.receive().await?; - debug!(target: "UDP tracker client", "received {} bytes. Response {payload:?}", payload.len()); + debug!(target: UDP_CLIENT_LOG_TARGET, "received {} bytes. Response {payload:?}", payload.len()); let response = Response::parse_bytes(&payload, true)?; diff --git a/tests/servers/health_check_api/environment.rs b/tests/servers/health_check_api/environment.rs index a50ad5156..cf0566d67 100644 --- a/tests/servers/health_check_api/environment.rs +++ b/tests/servers/health_check_api/environment.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use tokio::sync::oneshot::{self, Sender}; use tokio::task::JoinHandle; use torrust_tracker::bootstrap::jobs::Started; -use torrust_tracker::servers::health_check_api::server; +use torrust_tracker::servers::health_check_api::{server, HEALTH_CHECK_API_LOG_TARGET}; use torrust_tracker::servers::registar::Registar; use torrust_tracker::servers::signals::{self, Halted}; use torrust_tracker_configuration::HealthCheckApi; @@ -49,21 +49,21 @@ impl Environment { let register = self.registar.entries(); - debug!(target: "HEALTH CHECK API", "Spawning task to launch the service ..."); + debug!(target: HEALTH_CHECK_API_LOG_TARGET, "Spawning task to launch the service ..."); let server = tokio::spawn(async move { - debug!(target: "HEALTH CHECK API", "Starting the server in a spawned task ..."); + debug!(target: HEALTH_CHECK_API_LOG_TARGET, "Starting the server in a spawned task ..."); server::start(self.state.bind_to, tx_start, rx_halt, register) .await .expect("it should start the health check service"); - debug!(target: "HEALTH CHECK API", "Server started. Sending the binding {} ...", self.state.bind_to); + debug!(target: HEALTH_CHECK_API_LOG_TARGET, "Server started. Sending the binding {} ...", self.state.bind_to); self.state.bind_to }); - debug!(target: "HEALTH CHECK API", "Waiting for spawning task to send the binding ..."); + debug!(target: HEALTH_CHECK_API_LOG_TARGET, "Waiting for spawning task to send the binding ..."); let binding = rx_start.await.expect("it should send service binding").address; From b4b4515a9aa60ed5351c6f3f0c8b27ea01d9c0d6 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 13:41:55 +0100 Subject: [PATCH 06/13] refactor: extract const for logging targets And make it explicit the coupling between logs and `RunningServices` type. --- src/bootstrap/jobs/health_check_api.rs | 3 ++- src/console/ci/e2e/logs_parser.rs | 7 ++++--- src/servers/apis/server.rs | 3 ++- src/servers/http/server.rs | 3 ++- src/servers/logging.rs | 29 ++++++++++++++++++++++++++ src/servers/mod.rs | 1 + src/servers/udp/server.rs | 7 ++----- 7 files changed, 42 insertions(+), 11 deletions(-) create mode 100644 src/servers/logging.rs diff --git a/src/bootstrap/jobs/health_check_api.rs b/src/bootstrap/jobs/health_check_api.rs index e79a6da77..b4d4862ee 100644 --- a/src/bootstrap/jobs/health_check_api.rs +++ b/src/bootstrap/jobs/health_check_api.rs @@ -21,6 +21,7 @@ use tracing::info; use super::Started; use crate::servers::health_check_api::{server, HEALTH_CHECK_API_LOG_TARGET}; +use crate::servers::logging::STARTED_ON; use crate::servers::registar::ServiceRegistry; use crate::servers::signals::Halted; @@ -55,7 +56,7 @@ pub async fn start_job(config: &HealthCheckApi, register: ServiceRegistry) -> Jo // Wait until the server sends the started message match rx_start.await { - Ok(msg) => info!(target: HEALTH_CHECK_API_LOG_TARGET, "Started on: {protocol}://{}", msg.address), + Ok(msg) => info!(target: HEALTH_CHECK_API_LOG_TARGET, "{STARTED_ON}: {protocol}://{}", msg.address), Err(e) => panic!("the Health Check API server was dropped: {e}"), } diff --git a/src/console/ci/e2e/logs_parser.rs b/src/console/ci/e2e/logs_parser.rs index 8bf7974c1..37eb367b1 100644 --- a/src/console/ci/e2e/logs_parser.rs +++ b/src/console/ci/e2e/logs_parser.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::servers::health_check_api::HEALTH_CHECK_API_LOG_TARGET; use crate::servers::http::HTTP_TRACKER_LOG_TARGET; +use crate::servers::logging::STARTED_ON; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; const INFO_LOG_LEVEL: &str = "INFO"; @@ -65,9 +66,9 @@ impl RunningServices { let mut http_trackers: Vec = Vec::new(); let mut health_checks: Vec = Vec::new(); - let udp_re = Regex::new(r"Started on: udp://([0-9.]+:[0-9]+)").unwrap(); - let http_re = Regex::new(r"Started on: (https?://[0-9.]+:[0-9]+)").unwrap(); // DevSkim: ignore DS137138 - let health_re = Regex::new(r"Started on: (https?://[0-9.]+:[0-9]+)").unwrap(); // DevSkim: ignore DS137138 + let udp_re = Regex::new(&format!("{STARTED_ON}: {}", r"udp://([0-9.]+:[0-9]+)")).unwrap(); + let http_re = Regex::new(&format!("{STARTED_ON}: {}", r"(https?://[0-9.]+:[0-9]+)")).unwrap(); // DevSkim: ignore DS137138 + let health_re = Regex::new(&format!("{STARTED_ON}: {}", r"(https?://[0-9.]+:[0-9]+)")).unwrap(); // DevSkim: ignore DS137138 let ansi_escape_re = Regex::new(r"\x1b\[[0-9;]*m").unwrap(); for line in logs.lines() { diff --git a/src/servers/apis/server.rs b/src/servers/apis/server.rs index d47e5d542..967080bd5 100644 --- a/src/servers/apis/server.rs +++ b/src/servers/apis/server.rs @@ -39,6 +39,7 @@ use crate::bootstrap::jobs::Started; use crate::core::Tracker; use crate::servers::apis::API_LOG_TARGET; use crate::servers::custom_axum_server::{self, TimeoutAcceptor}; +use crate::servers::logging::STARTED_ON; use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::{graceful_shutdown, Halted}; @@ -251,7 +252,7 @@ impl Launcher { } }); - info!(target: API_LOG_TARGET, "Started on {protocol}://{}", address); + info!(target: API_LOG_TARGET, "{STARTED_ON} {protocol}://{}", address); tx_start .send(Started { address }) diff --git a/src/servers/http/server.rs b/src/servers/http/server.rs index 9199573b0..87f0e945b 100644 --- a/src/servers/http/server.rs +++ b/src/servers/http/server.rs @@ -14,6 +14,7 @@ use crate::bootstrap::jobs::Started; use crate::core::Tracker; use crate::servers::custom_axum_server::{self, TimeoutAcceptor}; use crate::servers::http::HTTP_TRACKER_LOG_TARGET; +use crate::servers::logging::STARTED_ON; use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::{graceful_shutdown, Halted}; @@ -77,7 +78,7 @@ impl Launcher { } }); - info!(target: HTTP_TRACKER_LOG_TARGET, "Started on: {protocol}://{}", address); + info!(target: HTTP_TRACKER_LOG_TARGET, "{STARTED_ON}: {protocol}://{}", address); tx_start .send(Started { address }) diff --git a/src/servers/logging.rs b/src/servers/logging.rs new file mode 100644 index 000000000..ad9ccbbcc --- /dev/null +++ b/src/servers/logging.rs @@ -0,0 +1,29 @@ +/// This is the prefix used in logs to identify a started service. +/// +/// For example: +/// +/// ```text +/// 2024-06-25T12:36:25.025312Z INFO UDP TRACKER: Started on: udp://0.0.0.0:6969 +/// 2024-06-25T12:36:25.025445Z INFO HTTP TRACKER: Started on: http://0.0.0.0:7070 +/// 2024-06-25T12:36:25.025527Z INFO API: Started on http://0.0.0.0:1212 +/// 2024-06-25T12:36:25.025580Z INFO HEALTH CHECK API: Started on: http://127.0.0.1:1313 +/// ``` +pub const STARTED_ON: &str = "Started on"; + +/* + +todo: we should use a field fot the URL. + +For example, instead of: + +``` +2024-06-25T12:36:25.025312Z INFO UDP TRACKER: Started on: udp://0.0.0.0:6969 +``` + +We should use something like: + +``` +2024-06-25T12:36:25.025312Z INFO UDP TRACKER started_at_url=udp://0.0.0.0:6969 +``` + +*/ diff --git a/src/servers/mod.rs b/src/servers/mod.rs index 0c9cc5dd8..705a4728e 100644 --- a/src/servers/mod.rs +++ b/src/servers/mod.rs @@ -3,6 +3,7 @@ pub mod apis; pub mod custom_axum_server; pub mod health_check_api; pub mod http; +pub mod logging; pub mod registar; pub mod signals; pub mod udp; diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index e60e49ace..5e2a67c85 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -40,6 +40,7 @@ use tokio::task::{AbortHandle, JoinHandle}; use super::UdpRequest; use crate::bootstrap::jobs::Started; use crate::core::Tracker; +use crate::servers::logging::STARTED_ON; use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; use crate::servers::signals::{shutdown_signal_with_message, Halted}; use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET}; @@ -364,11 +365,7 @@ impl Udp { let address = bound_socket.local_addr(); let local_udp_url = format!("udp://{address}"); - // note: this log message is parsed by our container. i.e: - // - // `INFO UDP TRACKER: Started on: udp://0.0.0.0:6969` - // - tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Started on: {local_udp_url}"); + tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}"); let receiver = Receiver::new(bound_socket.into(), tracker); From a5e2baf383edb593d6c8fe2e4477b8e6a61b466d Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 13:52:26 +0100 Subject: [PATCH 07/13] refactor: extract method --- src/servers/udp/server.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 5e2a67c85..53fbaca34 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -36,6 +36,7 @@ use ringbuf::StaticRb; use tokio::select; use tokio::sync::oneshot; use tokio::task::{AbortHandle, JoinHandle}; +use url::Url; use super::UdpRequest; use crate::bootstrap::jobs::Started; @@ -241,6 +242,9 @@ struct BoundSocket { impl BoundSocket { async fn new(addr: SocketAddr) -> Result> { + let bind_addr = format!("udp://{addr}"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, bind_addr, "UdpSocket::new (binding)"); + let socket = tokio::net::UdpSocket::bind(addr).await; let socket = match socket { @@ -259,6 +263,10 @@ impl BoundSocket { fn local_addr(&self) -> SocketAddr { self.socket.local_addr().expect("it should get local address") } + + fn url(&self) -> Url { + Url::parse(&format!("udp://{}", self.local_addr())).expect("UDP socket address should be valid") + } } impl Deref for BoundSocket { @@ -363,7 +371,7 @@ impl Udp { }; let address = bound_socket.local_addr(); - let local_udp_url = format!("udp://{address}"); + let local_udp_url = bound_socket.url().to_string(); tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}"); From 35b6c84fbb3d51365cd8e099225510d98494ac46 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 16:18:11 +0100 Subject: [PATCH 08/13] refactor: simplify UDP server receiver It only gets new UDP requests, whitout spwaning tasks to handle them. --- src/servers/udp/server.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 53fbaca34..7557bff0b 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -290,22 +290,20 @@ impl Debug for BoundSocket { struct Receiver { bound_socket: Arc, - tracker: Arc, data: RefCell<[u8; MAX_PACKET_SIZE]>, } impl Receiver { - pub fn new(bound_socket: Arc, tracker: Arc) -> Self { + pub fn new(bound_socket: Arc) -> Self { Receiver { bound_socket, - tracker, data: RefCell::new([0; MAX_PACKET_SIZE]), } } } impl Stream for Receiver { - type Item = std::io::Result; + type Item = std::io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut buf = *self.data.borrow_mut(); @@ -319,13 +317,7 @@ impl Stream for Receiver { Ok(from) => { let payload = buf.filled().to_vec(); let request = UdpRequest { payload, from }; - - Some(Ok(tokio::task::spawn(Udp::process_request( - request, - self.tracker.clone(), - self.bound_socket.clone(), - )) - .abort_handle())) + Some(Ok(request)) } Err(err) => Some(Err(err)), }; @@ -375,7 +367,7 @@ impl Udp { tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}"); - let receiver = Receiver::new(bound_socket.into(), tracker); + let receiver = Receiver::new(bound_socket.into()); tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)"); @@ -383,7 +375,7 @@ impl Udp { let local_addr = local_udp_url.clone(); tokio::task::spawn(async move { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)"); - let () = Self::run_udp_server_main(receiver).await; + let () = Self::run_udp_server_main(receiver, tracker.clone()).await; }) }; @@ -404,7 +396,7 @@ impl Udp { tokio::task::yield_now().await; // lets allow the other threads to complete. } - async fn run_udp_server_main(mut receiver: Receiver) { + async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc) { let reqs = &mut ActiveRequests::default(); let addr = receiver.bound_socket.local_addr(); @@ -429,12 +421,15 @@ impl Udp { } }; - if req.is_finished() { + let abort_handle = + tokio::task::spawn(Udp::process_request(req, tracker.clone(), receiver.bound_socket.clone())).abort_handle(); + + if abort_handle.is_finished() { continue; } // fill buffer with requests - let Err(req) = reqs.rb.try_push(req) else { + let Err(req) = reqs.rb.try_push(abort_handle) else { continue; }; From 61fb4b281d2d957be0292862d2517aebdc9dc1eb Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 16:45:54 +0100 Subject: [PATCH 09/13] refactor: move active requests logic to ActiveRequest type --- src/servers/udp/server.rs | 125 ++++++++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 45 deletions(-) diff --git a/src/servers/udp/server.rs b/src/servers/udp/server.rs index 7557bff0b..14dd0a0f6 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server.rs @@ -235,6 +235,67 @@ impl Drop for ActiveRequests { } } +impl ActiveRequests { + /// It inserts the abort handle for the UDP request processor tasks. + /// + /// If there is no room for the new task, it tries to make place: + /// + /// - Firstly, removing finished tasks. + /// - Secondly, removing the oldest unfinished tasks. + pub async fn force_push(&mut self, abort_handle: AbortHandle, local_addr: &str) { + // fill buffer with requests + let Err(abort_handle) = self.rb.try_push(abort_handle) else { + return; + }; + + let mut finished: u64 = 0; + let mut unfinished_task = None; + + // buffer is full.. lets make some space. + for h in self.rb.pop_iter() { + // remove some finished tasks + if h.is_finished() { + finished += 1; + continue; + } + + // task is unfinished.. give it another chance. + tokio::task::yield_now().await; + + // if now finished, we continue. + if h.is_finished() { + finished += 1; + continue; + } + + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)"); + + if finished == 0 { + // we have _no_ finished tasks.. will abort the unfinished task to make space... + h.abort(); + + tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)"); + break; + } + + // we have space, return unfinished task for re-entry. + unfinished_task = Some(h); + } + + // re-insert the previous unfinished task. + if let Some(h) = unfinished_task { + self.rb.try_push(h).expect("it was previously inserted"); + } + + // insert the new task. + if !abort_handle.is_finished() { + self.rb + .try_push(abort_handle) + .expect("it should remove at least one element."); + } + } +} + /// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket. struct BoundSocket { socket: Arc, @@ -421,60 +482,34 @@ impl Udp { } }; - let abort_handle = - tokio::task::spawn(Udp::process_request(req, tracker.clone(), receiver.bound_socket.clone())).abort_handle(); - - if abort_handle.is_finished() { - continue; - } - - // fill buffer with requests - let Err(req) = reqs.rb.try_push(abort_handle) else { - continue; - }; - - let mut finished: u64 = 0; - let mut unfinished_task = None; - // buffer is full.. lets make some space. - for h in reqs.rb.pop_iter() { - // remove some finished tasks - if h.is_finished() { - finished += 1; - continue; - } + /* code-review: - // task is unfinished.. give it another chance. - tokio::task::yield_now().await; + Does it make sense to spawn a new request processor task when + the ActiveRequests buffer is full? - // if now finished, we continue. - if h.is_finished() { - finished += 1; - continue; - } + We could store the UDP request in a secondary buffer and wait + until active tasks are finished. When a active request is finished + we can move a new UDP request from the pending to process requests + buffer to the active requests buffer. - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)"); + This forces us to define an explicit timeout for active requests. - if finished == 0 { - // we have _no_ finished tasks.. will abort the unfinished task to make space... - h.abort(); + In the current solution the timeout is dynamic, it depends on + the system load. With high load we can remove tasks without + giving them enough time to be processed. With low load we could + keep processing running longer than a reasonable time for + the client to receive the response. - tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)"); - break; - } + */ - // we have space, return unfinished task for re-entry. - unfinished_task = Some(h); - } + let abort_handle = + tokio::task::spawn(Udp::process_request(req, tracker.clone(), receiver.bound_socket.clone())).abort_handle(); - // re-insert the previous unfinished task. - if let Some(h) = unfinished_task { - reqs.rb.try_push(h).expect("it was previously inserted"); + if abort_handle.is_finished() { + continue; } - // insert the new task. - if !req.is_finished() { - reqs.rb.try_push(req).expect("it should remove at least one element."); - } + reqs.force_push(abort_handle, &local_addr).await; } else { tokio::task::yield_now().await; // the request iterator returned `None`. From 336e0e66f0c26c6393cc6701fa30ae0b83bf5aea Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 16:48:34 +0100 Subject: [PATCH 10/13] refactor: reorganize mod to extract new submods --- src/servers/udp/{server.rs => server/mod.rs} | 1 - 1 file changed, 1 deletion(-) rename src/servers/udp/{server.rs => server/mod.rs} (99%) diff --git a/src/servers/udp/server.rs b/src/servers/udp/server/mod.rs similarity index 99% rename from src/servers/udp/server.rs rename to src/servers/udp/server/mod.rs index 14dd0a0f6..36c377cc4 100644 --- a/src/servers/udp/server.rs +++ b/src/servers/udp/server/mod.rs @@ -16,7 +16,6 @@ //! because we want to be able to start and stop the server multiple times, and //! we want to know the bound address and the current state of the server. //! In production, the `Udp` launcher is used directly. -//! use std::cell::RefCell; use std::fmt::Debug; From c121bf2575ecef77e2ecf14c15b30dbb90e33031 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 16:51:07 +0100 Subject: [PATCH 11/13] refactor: rename UDP server types --- src/bootstrap/jobs/udp_tracker.rs | 4 ++-- src/servers/udp/server/mod.rs | 33 ++++++++++++++++--------------- tests/servers/udp/environment.rs | 4 ++-- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/bootstrap/jobs/udp_tracker.rs b/src/bootstrap/jobs/udp_tracker.rs index ba39df2fe..e694163a9 100644 --- a/src/bootstrap/jobs/udp_tracker.rs +++ b/src/bootstrap/jobs/udp_tracker.rs @@ -14,7 +14,7 @@ use tracing::debug; use crate::core; use crate::servers::registar::ServiceRegistrationForm; -use crate::servers::udp::server::{Launcher, UdpServer}; +use crate::servers::udp::server::{Spawner, UdpServer}; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; /// It starts a new UDP server with the provided configuration. @@ -30,7 +30,7 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET; pub async fn start_job(config: &UdpTracker, tracker: Arc, form: ServiceRegistrationForm) -> JoinHandle<()> { let bind_to = config.bind_address; - let server = UdpServer::new(Launcher::new(bind_to)) + let server = UdpServer::new(Spawner::new(bind_to)) .start(tracker, form) .await .expect("it should be able to start the udp tracker"); diff --git a/src/servers/udp/server/mod.rs b/src/servers/udp/server/mod.rs index 36c377cc4..3b1792b3d 100644 --- a/src/servers/udp/server/mod.rs +++ b/src/servers/udp/server/mod.rs @@ -96,7 +96,7 @@ pub struct UdpServer { /// A stopped UDP server state. pub struct Stopped { - launcher: Launcher, + launcher: Spawner, } /// A running UDP server state. @@ -105,13 +105,13 @@ pub struct Running { /// The address where the server is bound. pub binding: SocketAddr, pub halt_task: tokio::sync::oneshot::Sender, - pub task: JoinHandle, + pub task: JoinHandle, } impl UdpServer { /// Creates a new `UdpServer` instance in `stopped`state. #[must_use] - pub fn new(launcher: Launcher) -> Self { + pub fn new(launcher: Spawner) -> Self { Self { state: Stopped { launcher }, } @@ -140,7 +140,7 @@ impl UdpServer { let binding = rx_start.await.expect("it should be able to start the service").address; let local_addr = format!("udp://{binding}"); - form.send(ServiceRegistration::new(binding, Udp::check)) + form.send(ServiceRegistration::new(binding, Launcher::check)) .expect("it should be able to send service registration"); let running_udp_server: UdpServer = UdpServer { @@ -186,12 +186,12 @@ impl UdpServer { } #[derive(Constructor, Copy, Clone, Debug)] -pub struct Launcher { +pub struct Spawner { bind_to: SocketAddr, } -impl Launcher { - /// It starts the UDP server instance. +impl Spawner { + /// It spawns a new tasks to run the UDP server instance. /// /// # Panics /// @@ -201,10 +201,10 @@ impl Launcher { tracker: Arc, tx_start: oneshot::Sender, rx_halt: oneshot::Receiver, - ) -> JoinHandle { - let launcher = Launcher::new(self.bind_to); + ) -> JoinHandle { + let launcher = Spawner::new(self.bind_to); tokio::spawn(async move { - Udp::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await; + Launcher::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await; launcher }) } @@ -388,9 +388,9 @@ impl Stream for Receiver { /// A UDP server instance launcher. #[derive(Constructor)] -pub struct Udp; +pub struct Launcher; -impl Udp { +impl Launcher { /// It starts the UDP server instance with graceful shutdown. /// /// # Panics @@ -502,7 +502,8 @@ impl Udp { */ let abort_handle = - tokio::task::spawn(Udp::process_request(req, tracker.clone(), receiver.bound_socket.clone())).abort_handle(); + tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone())) + .abort_handle(); if abort_handle.is_finished() { continue; @@ -589,7 +590,7 @@ mod tests { use crate::bootstrap::app::initialize_with_configuration; use crate::servers::registar::Registar; - use crate::servers::udp::server::{Launcher, UdpServer}; + use crate::servers::udp::server::{Spawner, UdpServer}; #[tokio::test] async fn it_should_be_able_to_start_and_stop() { @@ -600,7 +601,7 @@ mod tests { let bind_to = config.bind_address; let register = &Registar::default(); - let stopped = UdpServer::new(Launcher::new(bind_to)); + let stopped = UdpServer::new(Spawner::new(bind_to)); let started = stopped .start(tracker, register.give_form()) @@ -622,7 +623,7 @@ mod tests { let bind_to = config.bind_address; let register = &Registar::default(); - let stopped = UdpServer::new(Launcher::new(bind_to)); + let stopped = UdpServer::new(Spawner::new(bind_to)); let started = stopped .start(tracker, register.give_form()) diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index 7b21defce..e8fb048ca 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use torrust_tracker::bootstrap::app::initialize_with_configuration; use torrust_tracker::core::Tracker; use torrust_tracker::servers::registar::Registar; -use torrust_tracker::servers::udp::server::{Launcher, Running, Stopped, UdpServer}; +use torrust_tracker::servers::udp::server::{Running, Spawner, Stopped, UdpServer}; use torrust_tracker::shared::bit_torrent::tracker::udp::client::DEFAULT_TIMEOUT; use torrust_tracker_configuration::{Configuration, UdpTracker}; use torrust_tracker_primitives::info_hash::InfoHash; @@ -36,7 +36,7 @@ impl Environment { let bind_to = config.bind_address; - let server = UdpServer::new(Launcher::new(bind_to)); + let server = UdpServer::new(Spawner::new(bind_to)); Self { config, From 89bb73576af3b97f104943a6a01b7b0c37ae2489 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 17:53:14 +0100 Subject: [PATCH 12/13] refactor: reorganize UDP server mod --- src/bootstrap/jobs/udp_tracker.rs | 5 +- src/servers/udp/handlers.rs | 6 +- src/servers/udp/mod.rs | 2 +- src/servers/udp/server/bound_socket.rs | 73 +++ src/servers/udp/server/launcher.rs | 219 +++++++++ src/servers/udp/server/mod.rs | 550 +---------------------- src/servers/udp/server/receiver.rs | 54 +++ src/servers/udp/server/request_buffer.rs | 95 ++++ src/servers/udp/server/spawner.rs | 36 ++ src/servers/udp/server/states.rs | 115 +++++ tests/servers/udp/environment.rs | 8 +- tests/servers/udp/mod.rs | 4 +- 12 files changed, 621 insertions(+), 546 deletions(-) create mode 100644 src/servers/udp/server/bound_socket.rs create mode 100644 src/servers/udp/server/launcher.rs create mode 100644 src/servers/udp/server/receiver.rs create mode 100644 src/servers/udp/server/request_buffer.rs create mode 100644 src/servers/udp/server/spawner.rs create mode 100644 src/servers/udp/server/states.rs diff --git a/src/bootstrap/jobs/udp_tracker.rs b/src/bootstrap/jobs/udp_tracker.rs index e694163a9..647461bfc 100644 --- a/src/bootstrap/jobs/udp_tracker.rs +++ b/src/bootstrap/jobs/udp_tracker.rs @@ -14,7 +14,8 @@ use tracing::debug; use crate::core; use crate::servers::registar::ServiceRegistrationForm; -use crate::servers::udp::server::{Spawner, UdpServer}; +use crate::servers::udp::server::spawner::Spawner; +use crate::servers::udp::server::Server; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; /// It starts a new UDP server with the provided configuration. @@ -30,7 +31,7 @@ use crate::servers::udp::UDP_TRACKER_LOG_TARGET; pub async fn start_job(config: &UdpTracker, tracker: Arc, form: ServiceRegistrationForm) -> JoinHandle<()> { let bind_to = config.bind_address; - let server = UdpServer::new(Spawner::new(bind_to)) + let server = Server::new(Spawner::new(bind_to)) .start(tracker, form) .await .expect("it should be able to start the udp tracker"); diff --git a/src/servers/udp/handlers.rs b/src/servers/udp/handlers.rs index f7e3aac64..12ae6a250 100644 --- a/src/servers/udp/handlers.rs +++ b/src/servers/udp/handlers.rs @@ -17,7 +17,7 @@ use uuid::Uuid; use zerocopy::network_endian::I32; use super::connection_cookie::{check, from_connection_id, into_connection_id, make}; -use super::UdpRequest; +use super::RawRequest; use crate::core::{statistics, ScrapeData, Tracker}; use crate::servers::udp::error::Error; use crate::servers::udp::logging::{log_bad_request, log_error_response, log_request, log_response}; @@ -33,7 +33,7 @@ use crate::shared::bit_torrent::common::MAX_SCRAPE_TORRENTS; /// - Delegating the request to the correct handler depending on the request type. /// /// It will return an `Error` response if the request is invalid. -pub(crate) async fn handle_packet(udp_request: UdpRequest, tracker: &Arc, addr: SocketAddr) -> Response { +pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc, addr: SocketAddr) -> Response { debug!("Handling Packets: {udp_request:?}"); let start_time = Instant::now(); @@ -304,7 +304,7 @@ fn handle_error(e: &Error, transaction_id: TransactionId) -> Response { pub struct RequestId(Uuid); impl RequestId { - fn make(_request: &UdpRequest) -> RequestId { + fn make(_request: &RawRequest) -> RequestId { RequestId(Uuid::new_v4()) } } diff --git a/src/servers/udp/mod.rs b/src/servers/udp/mod.rs index 5c5460397..8ea05d5b1 100644 --- a/src/servers/udp/mod.rs +++ b/src/servers/udp/mod.rs @@ -660,7 +660,7 @@ pub type Port = u16; pub type TransactionId = i64; #[derive(Clone, Debug)] -pub(crate) struct UdpRequest { +pub struct RawRequest { payload: Vec, from: SocketAddr, } diff --git a/src/servers/udp/server/bound_socket.rs b/src/servers/udp/server/bound_socket.rs new file mode 100644 index 000000000..cd416c7c5 --- /dev/null +++ b/src/servers/udp/server/bound_socket.rs @@ -0,0 +1,73 @@ +use std::fmt::Debug; +use std::net::SocketAddr; +use std::ops::Deref; +use std::sync::Arc; + +use url::Url; + +use crate::servers::udp::UDP_TRACKER_LOG_TARGET; + +/// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket. +pub struct BoundSocket { + socket: Arc, +} + +impl BoundSocket { + /// # Errors + /// + /// Will return an error if the socket can't be bound the the provided address. + pub async fn new(addr: SocketAddr) -> Result> { + let bind_addr = format!("udp://{addr}"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, bind_addr, "UdpSocket::new (binding)"); + + let socket = tokio::net::UdpSocket::bind(addr).await; + + let socket = match socket { + Ok(socket) => socket, + Err(e) => Err(e)?, + }; + + let local_addr = format!("udp://{addr}"); + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpSocket::new (bound)"); + + Ok(Self { + socket: Arc::new(socket), + }) + } + + /// # Panics + /// + /// Will panic if the socket can't get the address it was bound to. + #[must_use] + pub fn address(&self) -> SocketAddr { + self.socket.local_addr().expect("it should get local address") + } + + /// # Panics + /// + /// Will panic if the address the socket was bound to is not a valid address + /// to be used in a URL. + #[must_use] + pub fn url(&self) -> Url { + Url::parse(&format!("udp://{}", self.address())).expect("UDP socket address should be valid") + } +} + +impl Deref for BoundSocket { + type Target = tokio::net::UdpSocket; + + fn deref(&self) -> &Self::Target { + &self.socket + } +} + +impl Debug for BoundSocket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let local_addr = match self.socket.local_addr() { + Ok(socket) => format!("Receiving From: {socket}"), + Err(err) => format!("Socket Broken: {err}"), + }; + + f.debug_struct("UdpSocket").field("addr", &local_addr).finish_non_exhaustive() + } +} diff --git a/src/servers/udp/server/launcher.rs b/src/servers/udp/server/launcher.rs new file mode 100644 index 000000000..db448c2ff --- /dev/null +++ b/src/servers/udp/server/launcher.rs @@ -0,0 +1,219 @@ +use std::io::Cursor; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use aquatic_udp_protocol::Response; +use derive_more::Constructor; +use futures_util::StreamExt; +use tokio::select; +use tokio::sync::oneshot; + +use super::request_buffer::ActiveRequests; +use super::RawRequest; +use crate::bootstrap::jobs::Started; +use crate::core::Tracker; +use crate::servers::logging::STARTED_ON; +use crate::servers::registar::ServiceHealthCheckJob; +use crate::servers::signals::{shutdown_signal_with_message, Halted}; +use crate::servers::udp::server::bound_socket::BoundSocket; +use crate::servers::udp::server::receiver::Receiver; +use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET}; +use crate::shared::bit_torrent::tracker::udp::client::check; +use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE; + +/// A UDP server instance launcher. +#[derive(Constructor)] +pub struct Launcher; + +impl Launcher { + /// It starts the UDP server instance with graceful shutdown. + /// + /// # Panics + /// + /// It panics if unable to bind to udp socket, and get the address from the udp socket. + /// It also panics if unable to send address of socket. + pub async fn run_with_graceful_shutdown( + tracker: Arc, + bind_to: SocketAddr, + tx_start: oneshot::Sender, + rx_halt: oneshot::Receiver, + ) { + let halt_task = tokio::task::spawn(shutdown_signal_with_message( + rx_halt, + format!("Halting UDP Service Bound to Socket: {bind_to}"), + )); + + tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting on: {bind_to}"); + + let socket = tokio::time::timeout(Duration::from_millis(5000), BoundSocket::new(bind_to)) + .await + .expect("it should bind to the socket within five seconds"); + + let bound_socket = match socket { + Ok(socket) => socket, + Err(e) => { + tracing::error!(target: UDP_TRACKER_LOG_TARGET, addr = %bind_to, err = %e, "Udp::run_with_graceful_shutdown panic! (error when building socket)" ); + panic!("could not bind to socket!"); + } + }; + + let address = bound_socket.address(); + let local_udp_url = bound_socket.url().to_string(); + + tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}"); + + let receiver = Receiver::new(bound_socket.into()); + + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)"); + + let running = { + let local_addr = local_udp_url.clone(); + tokio::task::spawn(async move { + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)"); + let () = Self::run_udp_server_main(receiver, tracker.clone()).await; + }) + }; + + tx_start + .send(Started { address }) + .expect("the UDP Tracker service should not be dropped"); + + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (started)"); + + let stop = running.abort_handle(); + + select! { + _ = running => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (stopped)"); }, + _ = halt_task => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (halting)"); } + } + stop.abort(); + + tokio::task::yield_now().await; // lets allow the other threads to complete. + } + + #[must_use] + pub fn check(binding: &SocketAddr) -> ServiceHealthCheckJob { + let binding = *binding; + let info = format!("checking the udp tracker health check at: {binding}"); + + let job = tokio::spawn(async move { check(&binding).await }); + + ServiceHealthCheckJob::new(binding, info, job) + } + + async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc) { + let reqs = &mut ActiveRequests::default(); + + let addr = receiver.bound_socket_address(); + let local_addr = format!("udp://{addr}"); + + loop { + if let Some(req) = { + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)"); + receiver.next().await + } { + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (in)"); + + let req = match req { + Ok(req) => req, + Err(e) => { + if e.kind() == std::io::ErrorKind::Interrupted { + tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop (interrupted)"); + return; + } + tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop break: (got error)"); + break; + } + }; + + /* code-review: + + Does it make sense to spawn a new request processor task when + the ActiveRequests buffer is full? + + We could store the UDP request in a secondary buffer and wait + until active tasks are finished. When a active request is finished + we can move a new UDP request from the pending to process requests + buffer to the active requests buffer. + + This forces us to define an explicit timeout for active requests. + + In the current solution the timeout is dynamic, it depends on + the system load. With high load we can remove tasks without + giving them enough time to be processed. With low load we could + keep processing running longer than a reasonable time for + the client to receive the response. + + */ + + let abort_handle = + tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone())) + .abort_handle(); + + if abort_handle.is_finished() { + continue; + } + + reqs.force_push(abort_handle, &local_addr).await; + } else { + tokio::task::yield_now().await; + // the request iterator returned `None`. + tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)"); + break; + } + } + } + + async fn process_request(request: RawRequest, tracker: Arc, socket: Arc) { + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)"); + Self::process_valid_request(tracker, socket, request).await; + } + + async fn process_valid_request(tracker: Arc, socket: Arc, udp_request: RawRequest) { + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, "Udp::process_valid_request. Making Response to {udp_request:?}"); + let from = udp_request.from; + let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.address()).await; + Self::send_response(&socket.clone(), from, response).await; + } + + async fn send_response(bound_socket: &Arc, to: SocketAddr, response: Response) { + let response_type = match &response { + Response::Connect(_) => "Connect".to_string(), + Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(), + Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(), + Response::Scrape(_) => "Scrape".to_string(), + Response::Error(e) => format!("Error: {e:?}"), + }; + + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)"); + + let buffer = vec![0u8; MAX_PACKET_SIZE]; + let mut cursor = Cursor::new(buffer); + + match response.write_bytes(&mut cursor) { + Ok(()) => { + #[allow(clippy::cast_possible_truncation)] + let position = cursor.position() as usize; + let inner = cursor.get_ref(); + + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sending...)" ); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "Udp::send_response (sending...)"); + + Self::send_packet(bound_socket, &to, &inner[..position]).await; + + tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sent)"); + } + Err(e) => { + tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)"); + } + } + } + + async fn send_packet(bound_socket: &Arc, remote_addr: &SocketAddr, payload: &[u8]) { + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)"); + + // doesn't matter if it reaches or not + drop(bound_socket.send_to(payload, remote_addr).await); + } +} diff --git a/src/servers/udp/server/mod.rs b/src/servers/udp/server/mod.rs index 3b1792b3d..1bb9831ee 100644 --- a/src/servers/udp/server/mod.rs +++ b/src/servers/udp/server/mod.rs @@ -17,35 +17,16 @@ //! we want to know the bound address and the current state of the server. //! In production, the `Udp` launcher is used directly. -use std::cell::RefCell; use std::fmt::Debug; -use std::io::Cursor; -use std::net::SocketAddr; -use std::ops::Deref; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Duration; - -use aquatic_udp_protocol::Response; -use derive_more::Constructor; -use futures::{Stream, StreamExt}; -use ringbuf::traits::{Consumer, Observer, Producer}; -use ringbuf::StaticRb; -use tokio::select; -use tokio::sync::oneshot; -use tokio::task::{AbortHandle, JoinHandle}; -use url::Url; - -use super::UdpRequest; -use crate::bootstrap::jobs::Started; -use crate::core::Tracker; -use crate::servers::logging::STARTED_ON; -use crate::servers::registar::{ServiceHealthCheckJob, ServiceRegistration, ServiceRegistrationForm}; -use crate::servers::signals::{shutdown_signal_with_message, Halted}; -use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET}; -use crate::shared::bit_torrent::tracker::udp::client::check; -use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE; + +use super::RawRequest; + +pub mod bound_socket; +pub mod launcher; +pub mod receiver; +pub mod request_buffer; +pub mod spawner; +pub mod states; /// Error that can occur when starting or stopping the UDP server. /// @@ -64,21 +45,7 @@ pub enum UdpError { Error(String), } -/// A UDP server instance controller with no UDP instance running. -#[allow(clippy::module_name_repetitions)] -pub type StoppedUdpServer = UdpServer; - -/// A UDP server instance controller with a running UDP instance. -#[allow(clippy::module_name_repetitions)] -pub type RunningUdpServer = UdpServer; - -/// A UDP server instance controller. -/// -/// It's responsible for: -/// -/// - Keeping the initial configuration of the server. -/// - Starting and stopping the server. -/// - Keeping the state of the server: `running` or `stopped`. +/// A UDP server. /// /// It's an state machine. Configurations cannot be changed. This struct /// represents concrete configuration and state. It allows to start and stop the @@ -88,499 +55,11 @@ pub type RunningUdpServer = UdpServer; /// > reset to the initial value after stopping the server. This struct is not /// > intended to persist configurations between runs. #[allow(clippy::module_name_repetitions)] -pub struct UdpServer { +pub struct Server { /// The state of the server: `running` or `stopped`. pub state: S, } -/// A stopped UDP server state. - -pub struct Stopped { - launcher: Spawner, -} - -/// A running UDP server state. -#[derive(Debug, Constructor)] -pub struct Running { - /// The address where the server is bound. - pub binding: SocketAddr, - pub halt_task: tokio::sync::oneshot::Sender, - pub task: JoinHandle, -} - -impl UdpServer { - /// Creates a new `UdpServer` instance in `stopped`state. - #[must_use] - pub fn new(launcher: Spawner) -> Self { - Self { - state: Stopped { launcher }, - } - } - - /// It starts the server and returns a `UdpServer` controller in `running` - /// state. - /// - /// # Errors - /// - /// Will return `Err` if UDP can't bind to given bind address. - /// - /// # Panics - /// - /// It panics if unable to receive the bound socket address from service. - /// - pub async fn start(self, tracker: Arc, form: ServiceRegistrationForm) -> Result, std::io::Error> { - let (tx_start, rx_start) = tokio::sync::oneshot::channel::(); - let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::(); - - assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); - - // May need to wrap in a task to about a tokio bug. - let task = self.state.launcher.start(tracker, tx_start, rx_halt); - - let binding = rx_start.await.expect("it should be able to start the service").address; - let local_addr = format!("udp://{binding}"); - - form.send(ServiceRegistration::new(binding, Launcher::check)) - .expect("it should be able to send service registration"); - - let running_udp_server: UdpServer = UdpServer { - state: Running { - binding, - halt_task: tx_halt, - task, - }, - }; - - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpServer::start (running)"); - - Ok(running_udp_server) - } -} - -impl UdpServer { - /// It stops the server and returns a `UdpServer` controller in `stopped` - /// state. - /// - /// # Errors - /// - /// Will return `Err` if the oneshot channel to send the stop signal - /// has already been called once. - /// - /// # Panics - /// - /// It panics if unable to shutdown service. - pub async fn stop(self) -> Result, UdpError> { - self.state - .halt_task - .send(Halted::Normal) - .map_err(|e| UdpError::Error(e.to_string()))?; - - let launcher = self.state.task.await.expect("it should shutdown service"); - - let stopped_api_server: UdpServer = UdpServer { - state: Stopped { launcher }, - }; - - Ok(stopped_api_server) - } -} - -#[derive(Constructor, Copy, Clone, Debug)] -pub struct Spawner { - bind_to: SocketAddr, -} - -impl Spawner { - /// It spawns a new tasks to run the UDP server instance. - /// - /// # Panics - /// - /// It would panic if unable to resolve the `local_addr` from the supplied ´socket´. - pub fn start( - &self, - tracker: Arc, - tx_start: oneshot::Sender, - rx_halt: oneshot::Receiver, - ) -> JoinHandle { - let launcher = Spawner::new(self.bind_to); - tokio::spawn(async move { - Launcher::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await; - launcher - }) - } -} - -/// Ring-Buffer of Active Requests -#[derive(Default)] -struct ActiveRequests { - rb: StaticRb, // the number of requests we handle at the same time. -} - -impl std::fmt::Debug for ActiveRequests { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let (left, right) = &self.rb.as_slices(); - let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity()); - f.debug_struct("ActiveRequests").field("rb", &dbg).finish() - } -} - -impl Drop for ActiveRequests { - fn drop(&mut self) { - for h in self.rb.pop_iter() { - if !h.is_finished() { - h.abort(); - } - } - } -} - -impl ActiveRequests { - /// It inserts the abort handle for the UDP request processor tasks. - /// - /// If there is no room for the new task, it tries to make place: - /// - /// - Firstly, removing finished tasks. - /// - Secondly, removing the oldest unfinished tasks. - pub async fn force_push(&mut self, abort_handle: AbortHandle, local_addr: &str) { - // fill buffer with requests - let Err(abort_handle) = self.rb.try_push(abort_handle) else { - return; - }; - - let mut finished: u64 = 0; - let mut unfinished_task = None; - - // buffer is full.. lets make some space. - for h in self.rb.pop_iter() { - // remove some finished tasks - if h.is_finished() { - finished += 1; - continue; - } - - // task is unfinished.. give it another chance. - tokio::task::yield_now().await; - - // if now finished, we continue. - if h.is_finished() { - finished += 1; - continue; - } - - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)"); - - if finished == 0 { - // we have _no_ finished tasks.. will abort the unfinished task to make space... - h.abort(); - - tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)"); - break; - } - - // we have space, return unfinished task for re-entry. - unfinished_task = Some(h); - } - - // re-insert the previous unfinished task. - if let Some(h) = unfinished_task { - self.rb.try_push(h).expect("it was previously inserted"); - } - - // insert the new task. - if !abort_handle.is_finished() { - self.rb - .try_push(abort_handle) - .expect("it should remove at least one element."); - } - } -} - -/// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket. -struct BoundSocket { - socket: Arc, -} - -impl BoundSocket { - async fn new(addr: SocketAddr) -> Result> { - let bind_addr = format!("udp://{addr}"); - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, bind_addr, "UdpSocket::new (binding)"); - - let socket = tokio::net::UdpSocket::bind(addr).await; - - let socket = match socket { - Ok(socket) => socket, - Err(e) => Err(e)?, - }; - - let local_addr = format!("udp://{addr}"); - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpSocket::new (bound)"); - - Ok(Self { - socket: Arc::new(socket), - }) - } - - fn local_addr(&self) -> SocketAddr { - self.socket.local_addr().expect("it should get local address") - } - - fn url(&self) -> Url { - Url::parse(&format!("udp://{}", self.local_addr())).expect("UDP socket address should be valid") - } -} - -impl Deref for BoundSocket { - type Target = tokio::net::UdpSocket; - - fn deref(&self) -> &Self::Target { - &self.socket - } -} - -impl Debug for BoundSocket { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let local_addr = match self.socket.local_addr() { - Ok(socket) => format!("Receiving From: {socket}"), - Err(err) => format!("Socket Broken: {err}"), - }; - - f.debug_struct("UdpSocket").field("addr", &local_addr).finish_non_exhaustive() - } -} - -struct Receiver { - bound_socket: Arc, - data: RefCell<[u8; MAX_PACKET_SIZE]>, -} - -impl Receiver { - pub fn new(bound_socket: Arc) -> Self { - Receiver { - bound_socket, - data: RefCell::new([0; MAX_PACKET_SIZE]), - } - } -} - -impl Stream for Receiver { - type Item = std::io::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut buf = *self.data.borrow_mut(); - let mut buf = tokio::io::ReadBuf::new(&mut buf); - - let Poll::Ready(ready) = self.bound_socket.poll_recv_from(cx, &mut buf) else { - return Poll::Pending; - }; - - let res = match ready { - Ok(from) => { - let payload = buf.filled().to_vec(); - let request = UdpRequest { payload, from }; - Some(Ok(request)) - } - Err(err) => Some(Err(err)), - }; - - Poll::Ready(res) - } -} - -/// A UDP server instance launcher. -#[derive(Constructor)] -pub struct Launcher; - -impl Launcher { - /// It starts the UDP server instance with graceful shutdown. - /// - /// # Panics - /// - /// It panics if unable to bind to udp socket, and get the address from the udp socket. - /// It also panics if unable to send address of socket. - async fn run_with_graceful_shutdown( - tracker: Arc, - bind_to: SocketAddr, - tx_start: oneshot::Sender, - rx_halt: oneshot::Receiver, - ) { - let halt_task = tokio::task::spawn(shutdown_signal_with_message( - rx_halt, - format!("Halting UDP Service Bound to Socket: {bind_to}"), - )); - - tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting on: {bind_to}"); - - let socket = tokio::time::timeout(Duration::from_millis(5000), BoundSocket::new(bind_to)) - .await - .expect("it should bind to the socket within five seconds"); - - let bound_socket = match socket { - Ok(socket) => socket, - Err(e) => { - tracing::error!(target: UDP_TRACKER_LOG_TARGET, addr = %bind_to, err = %e, "Udp::run_with_graceful_shutdown panic! (error when building socket)" ); - panic!("could not bind to socket!"); - } - }; - - let address = bound_socket.local_addr(); - let local_udp_url = bound_socket.url().to_string(); - - tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}"); - - let receiver = Receiver::new(bound_socket.into()); - - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)"); - - let running = { - let local_addr = local_udp_url.clone(); - tokio::task::spawn(async move { - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)"); - let () = Self::run_udp_server_main(receiver, tracker.clone()).await; - }) - }; - - tx_start - .send(Started { address }) - .expect("the UDP Tracker service should not be dropped"); - - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (started)"); - - let stop = running.abort_handle(); - - select! { - _ = running => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (stopped)"); }, - _ = halt_task => { tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (halting)"); } - } - stop.abort(); - - tokio::task::yield_now().await; // lets allow the other threads to complete. - } - - async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc) { - let reqs = &mut ActiveRequests::default(); - - let addr = receiver.bound_socket.local_addr(); - let local_addr = format!("udp://{addr}"); - - loop { - if let Some(req) = { - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)"); - receiver.next().await - } { - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (in)"); - - let req = match req { - Ok(req) => req, - Err(e) => { - if e.kind() == std::io::ErrorKind::Interrupted { - tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop (interrupted)"); - return; - } - tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, err = %e, "Udp::run_udp_server::loop break: (got error)"); - break; - } - }; - - /* code-review: - - Does it make sense to spawn a new request processor task when - the ActiveRequests buffer is full? - - We could store the UDP request in a secondary buffer and wait - until active tasks are finished. When a active request is finished - we can move a new UDP request from the pending to process requests - buffer to the active requests buffer. - - This forces us to define an explicit timeout for active requests. - - In the current solution the timeout is dynamic, it depends on - the system load. With high load we can remove tasks without - giving them enough time to be processed. With low load we could - keep processing running longer than a reasonable time for - the client to receive the response. - - */ - - let abort_handle = - tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone())) - .abort_handle(); - - if abort_handle.is_finished() { - continue; - } - - reqs.force_push(abort_handle, &local_addr).await; - } else { - tokio::task::yield_now().await; - // the request iterator returned `None`. - tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)"); - break; - } - } - } - - async fn process_request(request: UdpRequest, tracker: Arc, socket: Arc) { - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)"); - Self::process_valid_request(tracker, socket, request).await; - } - - async fn process_valid_request(tracker: Arc, socket: Arc, udp_request: UdpRequest) { - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, "Udp::process_valid_request. Making Response to {udp_request:?}"); - let from = udp_request.from; - let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.local_addr()).await; - Self::send_response(&socket.clone(), from, response).await; - } - - async fn send_response(bound_socket: &Arc, to: SocketAddr, response: Response) { - let response_type = match &response { - Response::Connect(_) => "Connect".to_string(), - Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(), - Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(), - Response::Scrape(_) => "Scrape".to_string(), - Response::Error(e) => format!("Error: {e:?}"), - }; - - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)"); - - let buffer = vec![0u8; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(buffer); - - match response.write_bytes(&mut cursor) { - Ok(()) => { - #[allow(clippy::cast_possible_truncation)] - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sending...)" ); - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "Udp::send_response (sending...)"); - - Self::send_packet(bound_socket, &to, &inner[..position]).await; - - tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sent)"); - } - Err(e) => { - tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)"); - } - } - } - - async fn send_packet(socket: &Arc, remote_addr: &SocketAddr, payload: &[u8]) { - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)"); - - // doesn't matter if it reaches or not - drop(socket.send_to(payload, remote_addr).await); - } - - fn check(binding: &SocketAddr) -> ServiceHealthCheckJob { - let binding = *binding; - let info = format!("checking the udp tracker health check at: {binding}"); - - let job = tokio::spawn(async move { check(&binding).await }); - - ServiceHealthCheckJob::new(binding, info, job) - } -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -588,9 +67,10 @@ mod tests { use torrust_tracker_test_helpers::configuration::ephemeral_mode_public; + use super::spawner::Spawner; + use super::Server; use crate::bootstrap::app::initialize_with_configuration; use crate::servers::registar::Registar; - use crate::servers::udp::server::{Spawner, UdpServer}; #[tokio::test] async fn it_should_be_able_to_start_and_stop() { @@ -601,7 +81,7 @@ mod tests { let bind_to = config.bind_address; let register = &Registar::default(); - let stopped = UdpServer::new(Spawner::new(bind_to)); + let stopped = Server::new(Spawner::new(bind_to)); let started = stopped .start(tracker, register.give_form()) @@ -623,7 +103,7 @@ mod tests { let bind_to = config.bind_address; let register = &Registar::default(); - let stopped = UdpServer::new(Spawner::new(bind_to)); + let stopped = Server::new(Spawner::new(bind_to)); let started = stopped .start(tracker, register.give_form()) diff --git a/src/servers/udp/server/receiver.rs b/src/servers/udp/server/receiver.rs new file mode 100644 index 000000000..020ab7324 --- /dev/null +++ b/src/servers/udp/server/receiver.rs @@ -0,0 +1,54 @@ +use std::cell::RefCell; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures::Stream; + +use super::bound_socket::BoundSocket; +use super::RawRequest; +use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE; + +pub struct Receiver { + pub bound_socket: Arc, + data: RefCell<[u8; MAX_PACKET_SIZE]>, +} + +impl Receiver { + #[must_use] + pub fn new(bound_socket: Arc) -> Self { + Receiver { + bound_socket, + data: RefCell::new([0; MAX_PACKET_SIZE]), + } + } + + pub fn bound_socket_address(&self) -> SocketAddr { + self.bound_socket.address() + } +} + +impl Stream for Receiver { + type Item = std::io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut buf = *self.data.borrow_mut(); + let mut buf = tokio::io::ReadBuf::new(&mut buf); + + let Poll::Ready(ready) = self.bound_socket.poll_recv_from(cx, &mut buf) else { + return Poll::Pending; + }; + + let res = match ready { + Ok(from) => { + let payload = buf.filled().to_vec(); + let request = RawRequest { payload, from }; + Some(Ok(request)) + } + Err(err) => Some(Err(err)), + }; + + Poll::Ready(res) + } +} diff --git a/src/servers/udp/server/request_buffer.rs b/src/servers/udp/server/request_buffer.rs new file mode 100644 index 000000000..c1d4f2696 --- /dev/null +++ b/src/servers/udp/server/request_buffer.rs @@ -0,0 +1,95 @@ +use ringbuf::traits::{Consumer, Observer, Producer}; +use ringbuf::StaticRb; +use tokio::task::AbortHandle; + +use crate::servers::udp::UDP_TRACKER_LOG_TARGET; + +/// Ring-Buffer of Active Requests +#[derive(Default)] +pub struct ActiveRequests { + rb: StaticRb, // the number of requests we handle at the same time. +} + +impl std::fmt::Debug for ActiveRequests { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let (left, right) = &self.rb.as_slices(); + let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity()); + f.debug_struct("ActiveRequests").field("rb", &dbg).finish() + } +} + +impl Drop for ActiveRequests { + fn drop(&mut self) { + for h in self.rb.pop_iter() { + if !h.is_finished() { + h.abort(); + } + } + } +} + +impl ActiveRequests { + /// It inserts the abort handle for the UDP request processor tasks. + /// + /// If there is no room for the new task, it tries to make place: + /// + /// - Firstly, removing finished tasks. + /// - Secondly, removing the oldest unfinished tasks. + /// + /// # Panics + /// + /// Will panics if it can't make space for the new handle. + pub async fn force_push(&mut self, abort_handle: AbortHandle, local_addr: &str) { + // fill buffer with requests + let Err(abort_handle) = self.rb.try_push(abort_handle) else { + return; + }; + + let mut finished: u64 = 0; + let mut unfinished_task = None; + + // buffer is full.. lets make some space. + for h in self.rb.pop_iter() { + // remove some finished tasks + if h.is_finished() { + finished += 1; + continue; + } + + // task is unfinished.. give it another chance. + tokio::task::yield_now().await; + + // if now finished, we continue. + if h.is_finished() { + finished += 1; + continue; + } + + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)"); + + if finished == 0 { + // we have _no_ finished tasks.. will abort the unfinished task to make space... + h.abort(); + + tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)"); + + break; + } + + // we have space, return unfinished task for re-entry. + unfinished_task = Some(h); + } + + // re-insert the previous unfinished task. + if let Some(h) = unfinished_task { + self.rb.try_push(h).expect("it was previously inserted"); + } + + // insert the new task. + if !abort_handle.is_finished() { + self.rb + .try_push(abort_handle) + .expect("it should remove at least one element."); + } + } +} diff --git a/src/servers/udp/server/spawner.rs b/src/servers/udp/server/spawner.rs new file mode 100644 index 000000000..a36404fce --- /dev/null +++ b/src/servers/udp/server/spawner.rs @@ -0,0 +1,36 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use derive_more::Constructor; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; + +use super::launcher::Launcher; +use crate::bootstrap::jobs::Started; +use crate::core::Tracker; +use crate::servers::signals::Halted; + +#[derive(Constructor, Copy, Clone, Debug)] +pub struct Spawner { + pub bind_to: SocketAddr, +} + +impl Spawner { + /// It spawns a new tasks to run the UDP server instance. + /// + /// # Panics + /// + /// It would panic if unable to resolve the `local_addr` from the supplied ´socket´. + pub fn start( + &self, + tracker: Arc, + tx_start: oneshot::Sender, + rx_halt: oneshot::Receiver, + ) -> JoinHandle { + let launcher = Spawner::new(self.bind_to); + tokio::spawn(async move { + Launcher::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await; + launcher + }) + } +} diff --git a/src/servers/udp/server/states.rs b/src/servers/udp/server/states.rs new file mode 100644 index 000000000..919646d7b --- /dev/null +++ b/src/servers/udp/server/states.rs @@ -0,0 +1,115 @@ +use std::fmt::Debug; +use std::net::SocketAddr; +use std::sync::Arc; + +use derive_more::Constructor; +use tokio::task::JoinHandle; + +use super::spawner::Spawner; +use super::{Server, UdpError}; +use crate::bootstrap::jobs::Started; +use crate::core::Tracker; +use crate::servers::registar::{ServiceRegistration, ServiceRegistrationForm}; +use crate::servers::signals::Halted; +use crate::servers::udp::server::launcher::Launcher; +use crate::servers::udp::UDP_TRACKER_LOG_TARGET; + +/// A UDP server instance controller with no UDP instance running. +#[allow(clippy::module_name_repetitions)] +pub type StoppedUdpServer = Server; + +/// A UDP server instance controller with a running UDP instance. +#[allow(clippy::module_name_repetitions)] +pub type RunningUdpServer = Server; + +/// A stopped UDP server state. + +pub struct Stopped { + pub launcher: Spawner, +} + +/// A running UDP server state. +#[derive(Debug, Constructor)] +pub struct Running { + /// The address where the server is bound. + pub binding: SocketAddr, + pub halt_task: tokio::sync::oneshot::Sender, + pub task: JoinHandle, +} + +impl Server { + /// Creates a new `UdpServer` instance in `stopped`state. + #[must_use] + pub fn new(launcher: Spawner) -> Self { + Self { + state: Stopped { launcher }, + } + } + + /// It starts the server and returns a `UdpServer` controller in `running` + /// state. + /// + /// # Errors + /// + /// Will return `Err` if UDP can't bind to given bind address. + /// + /// # Panics + /// + /// It panics if unable to receive the bound socket address from service. + /// + pub async fn start(self, tracker: Arc, form: ServiceRegistrationForm) -> Result, std::io::Error> { + let (tx_start, rx_start) = tokio::sync::oneshot::channel::(); + let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::(); + + assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); + + // May need to wrap in a task to about a tokio bug. + let task = self.state.launcher.start(tracker, tx_start, rx_halt); + + let binding = rx_start.await.expect("it should be able to start the service").address; + let local_addr = format!("udp://{binding}"); + + form.send(ServiceRegistration::new(binding, Launcher::check)) + .expect("it should be able to send service registration"); + + let running_udp_server: Server = Server { + state: Running { + binding, + halt_task: tx_halt, + task, + }, + }; + + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpServer::start (running)"); + + Ok(running_udp_server) + } +} + +impl Server { + /// It stops the server and returns a `UdpServer` controller in `stopped` + /// state. + /// + /// # Errors + /// + /// Will return `Err` if the oneshot channel to send the stop signal + /// has already been called once. + /// + /// # Panics + /// + /// It panics if unable to shutdown service. + pub async fn stop(self) -> Result, UdpError> { + self.state + .halt_task + .send(Halted::Normal) + .map_err(|e| UdpError::Error(e.to_string()))?; + + let launcher = self.state.task.await.expect("it should shutdown service"); + + let stopped_api_server: Server = Server { + state: Stopped { launcher }, + }; + + Ok(stopped_api_server) + } +} diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index e8fb048ca..2232cb0e0 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -4,7 +4,9 @@ use std::sync::Arc; use torrust_tracker::bootstrap::app::initialize_with_configuration; use torrust_tracker::core::Tracker; use torrust_tracker::servers::registar::Registar; -use torrust_tracker::servers::udp::server::{Running, Spawner, Stopped, UdpServer}; +use torrust_tracker::servers::udp::server::spawner::Spawner; +use torrust_tracker::servers::udp::server::states::{Running, Stopped}; +use torrust_tracker::servers::udp::server::Server; use torrust_tracker::shared::bit_torrent::tracker::udp::client::DEFAULT_TIMEOUT; use torrust_tracker_configuration::{Configuration, UdpTracker}; use torrust_tracker_primitives::info_hash::InfoHash; @@ -14,7 +16,7 @@ pub struct Environment { pub config: Arc, pub tracker: Arc, pub registar: Registar, - pub server: UdpServer, + pub server: Server, } impl Environment { @@ -36,7 +38,7 @@ impl Environment { let bind_to = config.bind_address; - let server = UdpServer::new(Spawner::new(bind_to)); + let server = Server::new(Spawner::new(bind_to)); Self { config, diff --git a/tests/servers/udp/mod.rs b/tests/servers/udp/mod.rs index b13b82240..7eea8683f 100644 --- a/tests/servers/udp/mod.rs +++ b/tests/servers/udp/mod.rs @@ -1,7 +1,7 @@ -use torrust_tracker::servers::udp::server; +use torrust_tracker::servers::udp::server::states::Running; pub mod asserts; pub mod contract; pub mod environment; -pub type Started = environment::Environment; +pub type Started = environment::Environment; From f06976e33defa286e9856239f79f9a83f9d168c5 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Tue, 25 Jun 2024 18:02:01 +0100 Subject: [PATCH 13/13] docs: update some UDP server comments --- src/servers/udp/server/mod.rs | 24 +++--------------------- src/servers/udp/server/spawner.rs | 12 +++++++----- src/servers/udp/server/states.rs | 10 +++++----- 3 files changed, 15 insertions(+), 31 deletions(-) diff --git a/src/servers/udp/server/mod.rs b/src/servers/udp/server/mod.rs index 1bb9831ee..034f71beb 100644 --- a/src/servers/udp/server/mod.rs +++ b/src/servers/udp/server/mod.rs @@ -1,22 +1,4 @@ //! Module to handle the UDP server instances. -//! -//! There are two main types in this module: -//! -//! - [`UdpServer`]: a controller to start and stop the server. -//! - [`Udp`]: the server launcher. -//! -//! The `UdpServer` is an state machine for a given configuration. This struct -//! represents concrete configuration and state. It allows to start and -//! stop the server but always keeping the same configuration. -//! -//! The `Udp` is the server launcher. It's responsible for launching the UDP -//! but without keeping any state. -//! -//! For the time being, the `UdpServer` is only used for testing purposes, -//! because we want to be able to start and stop the server multiple times, and -//! we want to know the bound address and the current state of the server. -//! In production, the `Udp` launcher is used directly. - use std::fmt::Debug; use super::RawRequest; @@ -37,7 +19,7 @@ pub mod states; /// /// Some errors triggered while stopping the server are: /// -/// - The [`UdpServer`] cannot send the shutdown signal to the spawned UDP service thread. +/// - The [`Server`] cannot send the shutdown signal to the spawned UDP service thread. #[derive(Debug)] pub enum UdpError { /// Any kind of error starting or stopping the server. @@ -92,7 +74,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; - assert_eq!(stopped.state.launcher.bind_to, bind_to); + assert_eq!(stopped.state.spawner.bind_to, bind_to); } #[tokio::test] @@ -116,7 +98,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; - assert_eq!(stopped.state.launcher.bind_to, bind_to); + assert_eq!(stopped.state.spawner.bind_to, bind_to); } } diff --git a/src/servers/udp/server/spawner.rs b/src/servers/udp/server/spawner.rs index a36404fce..e4612fbe0 100644 --- a/src/servers/udp/server/spawner.rs +++ b/src/servers/udp/server/spawner.rs @@ -1,3 +1,4 @@ +//! A thin wrapper for tokio spawn to launch the UDP server launcher as a new task. use std::net::SocketAddr; use std::sync::Arc; @@ -16,21 +17,22 @@ pub struct Spawner { } impl Spawner { - /// It spawns a new tasks to run the UDP server instance. + /// It spawns a new task to run the UDP server instance. /// /// # Panics /// /// It would panic if unable to resolve the `local_addr` from the supplied ´socket´. - pub fn start( + pub fn spawn_launcher( &self, tracker: Arc, tx_start: oneshot::Sender, rx_halt: oneshot::Receiver, ) -> JoinHandle { - let launcher = Spawner::new(self.bind_to); + let spawner = Self::new(self.bind_to); + tokio::spawn(async move { - Launcher::run_with_graceful_shutdown(tracker, launcher.bind_to, tx_start, rx_halt).await; - launcher + Launcher::run_with_graceful_shutdown(tracker, spawner.bind_to, tx_start, rx_halt).await; + spawner }) } } diff --git a/src/servers/udp/server/states.rs b/src/servers/udp/server/states.rs index 919646d7b..d0a2e4e8a 100644 --- a/src/servers/udp/server/states.rs +++ b/src/servers/udp/server/states.rs @@ -25,7 +25,7 @@ pub type RunningUdpServer = Server; /// A stopped UDP server state. pub struct Stopped { - pub launcher: Spawner, + pub spawner: Spawner, } /// A running UDP server state. @@ -40,9 +40,9 @@ pub struct Running { impl Server { /// Creates a new `UdpServer` instance in `stopped`state. #[must_use] - pub fn new(launcher: Spawner) -> Self { + pub fn new(spawner: Spawner) -> Self { Self { - state: Stopped { launcher }, + state: Stopped { spawner }, } } @@ -64,7 +64,7 @@ impl Server { assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open"); // May need to wrap in a task to about a tokio bug. - let task = self.state.launcher.start(tracker, tx_start, rx_halt); + let task = self.state.spawner.spawn_launcher(tracker, tx_start, rx_halt); let binding = rx_start.await.expect("it should be able to start the service").address; let local_addr = format!("udp://{binding}"); @@ -107,7 +107,7 @@ impl Server { let launcher = self.state.task.await.expect("it should shutdown service"); let stopped_api_server: Server = Server { - state: Stopped { launcher }, + state: Stopped { spawner: launcher }, }; Ok(stopped_api_server)