diff --git a/src/servers/udp/server/launcher.rs b/src/servers/udp/server/launcher.rs index db448c2ff..bb7c7d70f 100644 --- a/src/servers/udp/server/launcher.rs +++ b/src/servers/udp/server/launcher.rs @@ -103,7 +103,7 @@ impl Launcher { } async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc) { - let reqs = &mut ActiveRequests::default(); + let active_requests = &mut ActiveRequests::default(); let addr = receiver.bound_socket_address(); let local_addr = format!("udp://{addr}"); @@ -127,27 +127,18 @@ impl Launcher { } }; - /* code-review: - - Does it make sense to spawn a new request processor task when - the ActiveRequests buffer is full? - - We could store the UDP request in a secondary buffer and wait - until active tasks are finished. When a active request is finished - we can move a new UDP request from the pending to process requests - buffer to the active requests buffer. - - This forces us to define an explicit timeout for active requests. - - In the current solution the timeout is dynamic, it depends on - the system load. With high load we can remove tasks without - giving them enough time to be processed. With low load we could - keep processing running longer than a reasonable time for - the client to receive the response. - - */ - - let abort_handle = + // We spawn the new task even if there active requests buffer is + // full. This could seem counterintuitive because we are accepting + // more request and consuming more memory even if the server is + // already busy. However, we "force_push" the new tasks in the + // buffer. That means, in the worst scenario we will abort a + // running task to make place for the new task. + // + // Once concern could be to reach an starvation point were we + // are only adding and removing tasks without given them the + // chance to finish. However, the buffer is yielding before + // aborting one tasks, giving it the chance to finish. + let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone())) .abort_handle(); @@ -155,9 +146,10 @@ impl Launcher { continue; } - reqs.force_push(abort_handle, &local_addr).await; + active_requests.force_push(abort_handle, &local_addr).await; } else { tokio::task::yield_now().await; + // the request iterator returned `None`. tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)"); break; diff --git a/src/servers/udp/server/request_buffer.rs b/src/servers/udp/server/request_buffer.rs index c1d4f2696..ffbd9565d 100644 --- a/src/servers/udp/server/request_buffer.rs +++ b/src/servers/udp/server/request_buffer.rs @@ -4,10 +4,15 @@ use tokio::task::AbortHandle; use crate::servers::udp::UDP_TRACKER_LOG_TARGET; -/// Ring-Buffer of Active Requests +/// A ring buffer for managing active UDP request abort handles. +/// +/// The `ActiveRequests` struct maintains a fixed-size ring buffer of abort +/// handles for UDP request processor tasks. It ensures that at most 50 requests +/// are handled concurrently, and provides mechanisms to handle buffer overflow +/// by removing finished or oldest unfinished tasks. #[derive(Default)] pub struct ActiveRequests { - rb: StaticRb, // the number of requests we handle at the same time. + rb: StaticRb, // The number of requests handled simultaneously. } impl std::fmt::Debug for ActiveRequests { @@ -29,67 +34,107 @@ impl Drop for ActiveRequests { } impl ActiveRequests { - /// It inserts the abort handle for the UDP request processor tasks. + /// Inserts an abort handle for a UDP request processor task. /// - /// If there is no room for the new task, it tries to make place: + /// If the buffer is full, this method attempts to make space by: /// - /// - Firstly, removing finished tasks. - /// - Secondly, removing the oldest unfinished tasks. + /// 1. Removing finished tasks. + /// 2. Removing the oldest unfinished task if no finished tasks are found. /// /// # Panics /// - /// Will panics if it can't make space for the new handle. - pub async fn force_push(&mut self, abort_handle: AbortHandle, local_addr: &str) { - // fill buffer with requests - let Err(abort_handle) = self.rb.try_push(abort_handle) else { - return; - }; - - let mut finished: u64 = 0; - let mut unfinished_task = None; - - // buffer is full.. lets make some space. - for h in self.rb.pop_iter() { - // remove some finished tasks - if h.is_finished() { - finished += 1; - continue; + /// This method will panic if it cannot make space for adding a new handle. + /// + /// # Arguments + /// + /// * `abort_handle` - The `AbortHandle` for the UDP request processor task. + /// * `local_addr` - A string slice representing the local address for logging. + pub async fn force_push(&mut self, new_task: AbortHandle, local_addr: &str) { + // Attempt to add the new handle to the buffer. + match self.rb.try_push(new_task) { + Ok(()) => { + // Successfully added the task, no further action needed. } - - // task is unfinished.. give it another chance. - tokio::task::yield_now().await; - - // if now finished, we continue. - if h.is_finished() { - finished += 1; - continue; + Err(new_task) => { + // Buffer is full, attempt to make space. + + let mut finished: u64 = 0; + let mut unfinished_task = None; + + for old_task in self.rb.pop_iter() { + // We found a finished tasks ... increase the counter and + // continue searching for more and ... + if old_task.is_finished() { + finished += 1; + continue; + } + + // The current removed tasks is not finished. + + // Give it a second chance to finish. + tokio::task::yield_now().await; + + // Recheck if it finished ... increase the counter and + // continue searching for more and ... + if old_task.is_finished() { + finished += 1; + continue; + } + + // At this point we found a "definitive" unfinished task. + + // Log unfinished task. + tracing::debug!( + target: UDP_TRACKER_LOG_TARGET, + local_addr, + removed_count = finished, + "Udp::run_udp_server::loop (got unfinished task)" + ); + + // If no finished tasks were found, abort the current + // unfinished task. + if finished == 0 { + // We make place aborting this task. + old_task.abort(); + + tracing::warn!( + target: UDP_TRACKER_LOG_TARGET, + local_addr, + "Udp::run_udp_server::loop aborting request: (no finished tasks)" + ); + + break; + } + + // At this point we found at least one finished task, but the + // current one is not finished and it was removed from the + // buffer, so we need to re-insert in in the buffer. + + // Save the unfinished task for re-entry. + unfinished_task = Some(old_task); + } + + // After this point there can't be a race condition because only + // one thread owns the active buffer. There is no way for the + // buffer to be full again. That means the "expects" should + // never happen. + + // Reinsert the unfinished task if any. + if let Some(h) = unfinished_task { + self.rb.try_push(h).expect("it was previously inserted"); + } + + // Insert the new task. + // + // Notice that space has already been made for this new task in + // the buffer. One or many old task have already been finished + // or yielded, freeing space in the buffer. Or a single + // unfinished task has been aborted to make space for this new + // task. + if !new_task.is_finished() { + self.rb.try_push(new_task).expect("it should have space for this new task."); + } } - - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)"); - - if finished == 0 { - // we have _no_ finished tasks.. will abort the unfinished task to make space... - h.abort(); - - tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)"); - - break; - } - - // we have space, return unfinished task for re-entry. - unfinished_task = Some(h); - } - - // re-insert the previous unfinished task. - if let Some(h) = unfinished_task { - self.rb.try_push(h).expect("it was previously inserted"); - } - - // insert the new task. - if !abort_handle.is_finished() { - self.rb - .try_push(abort_handle) - .expect("it should remove at least one element."); - } + }; } }