Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 66 additions & 11 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use aquatic_udp_protocol::Response;
use derive_more::Constructor;
use log::{debug, error, info, trace};
use ringbuf::traits::{Consumer, Observer, RingBuffer};
use ringbuf::traits::{Consumer, Observer, Producer};
use ringbuf::StaticRb;
use tokio::net::UdpSocket;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -202,11 +202,23 @@ impl Launcher {
}
}

#[derive(Default)]
struct ActiveRequests {
rb: StaticRb<AbortHandle, 50>, // the number of requests we handle at the same time.
}

impl ActiveRequests {
/// Creates a new [`ActiveRequests`] filled with finished tasks.
async fn new() -> Self {
let mut rb = StaticRb::default();

let () = while rb.try_push(tokio::task::spawn_blocking(|| ()).abort_handle()).is_ok() {};

task::yield_now().await;

Self { rb }
}
}

impl std::fmt::Debug for ActiveRequests {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (left, right) = &self.rb.as_slices();
Expand Down Expand Up @@ -280,15 +292,22 @@ impl Udp {
let tracker = tracker.clone();
let socket = socket.clone();

let reqs = &mut ActiveRequests::default();
let reqs = &mut ActiveRequests::new().await;

// Main Waiting Loop, awaits on async [`receive_request`].
loop {
if let Some(h) = reqs.rb.push_overwrite(
Self::spawn_request_processor(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone())
.abort_handle(),
) {
if !h.is_finished() {
task::yield_now().await;
for h in reqs.rb.iter_mut() {
if h.is_finished() {
std::mem::swap(
h,
&mut Self::spawn_request_processor(
Self::receive_request(socket.clone()).await,
tracker.clone(),
socket.clone(),
)
.abort_handle(),
);
} else {
// the task is still running, lets yield and give it a chance to flush.
tokio::task::yield_now().await;

Expand All @@ -299,6 +318,9 @@ impl Udp {
tracing::span!(
target: "UDP TRACKER",
tracing::Level::WARN, "request-aborted", server_socket_addr = %server_socket_addr);

// force-break a single thread, then loop again.
break;
}
}
}
Expand Down Expand Up @@ -396,13 +418,46 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use tokio::time::sleep;
use ringbuf::traits::{Consumer, Observer, RingBuffer};
use torrust_tracker_test_helpers::configuration::ephemeral_mode_public;

use super::ActiveRequests;
use crate::bootstrap::app::initialize_with_configuration;
use crate::servers::registar::Registar;
use crate::servers::udp::server::{Launcher, UdpServer};

#[tokio::test]
async fn it_should_return_to_the_start_of_the_ring_buffer() {
let mut a_req = ActiveRequests::new().await;

tokio::time::sleep(Duration::from_millis(10)).await;

let mut count: usize = 0;
let cap: usize = a_req.rb.capacity().into();

// Add a single pending task to check that the ring-buffer is looping correctly.
a_req
.rb
.push_overwrite(tokio::task::spawn(std::future::pending::<()>()).abort_handle());

count += 1;

for _ in 0..2 {
for h in a_req.rb.iter() {
let first = count % cap;
println!("{count},{first},{}", h.is_finished());

if first == 0 {
assert!(!h.is_finished());
} else {
assert!(h.is_finished());
}

count += 1;
}
}
}

#[tokio::test]
async fn it_should_be_able_to_start_and_stop() {
let cfg = Arc::new(ephemeral_mode_public());
Expand All @@ -423,7 +478,7 @@ mod tests {
.expect("it should start the server");
let stopped = started.stop().await.expect("it should stop the server");

sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;

assert_eq!(stopped.state.launcher.bind_to, bind_to);
}
Expand Down