forked from torrust/torrust-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest_buffer.rs
More file actions
146 lines (126 loc) · 5.56 KB
/
request_buffer.rs
File metadata and controls
146 lines (126 loc) · 5.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET;
use ringbuf::traits::{Consumer, Observer, Producer};
use ringbuf::StaticRb;
use tokio::task::AbortHandle;
/// A ring buffer for managing active UDP request abort handles.
///
/// The `ActiveRequests` struct maintains a fixed-size ring buffer of abort
/// handles for UDP request processor tasks. It ensures that at most 50 requests
/// are handled concurrently, and provides mechanisms to handle buffer overflow
/// by removing finished or oldest unfinished tasks.
#[derive(Default)]
pub struct ActiveRequests {
rb: StaticRb<AbortHandle, 50>, // The number of requests handled simultaneously.
}
impl std::fmt::Debug for ActiveRequests {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (left, right) = &self.rb.as_slices();
let dbg = format!("capacity: {}, left: {left:?}, right: {right:?}", &self.rb.capacity());
f.debug_struct("ActiveRequests").field("rb", &dbg).finish()
}
}
impl Drop for ActiveRequests {
fn drop(&mut self) {
for h in self.rb.pop_iter() {
if !h.is_finished() {
h.abort();
}
}
}
}
impl ActiveRequests {
/// Inserts an abort handle for a UDP request processor task.
///
/// If the buffer is full, this method attempts to make space by:
///
/// 1. Removing finished tasks.
/// 2. Removing the oldest unfinished task if no finished tasks are found.
///
/// Returns `true` if a task was removed, `false` otherwise.
///
/// # Panics
///
/// This method will panic if it cannot make space for adding a new handle.
///
/// # Arguments
///
/// * `abort_handle` - The `AbortHandle` for the UDP request processor task.
/// * `local_addr` - A string slice representing the local address for logging.
pub async fn force_push(&mut self, new_task: AbortHandle, local_addr: &str) -> bool {
// Attempt to add the new handle to the buffer.
match self.rb.try_push(new_task) {
Ok(()) => {
// Successfully added the task, no further action needed.
false
}
Err(new_task) => {
// Buffer is full, attempt to make space.
let mut finished: u64 = 0;
let mut unfinished_task = None;
let mut old_task_aborted = false;
for old_task in self.rb.pop_iter() {
// We found a finished tasks ... increase the counter and
// continue searching for more and ...
if old_task.is_finished() {
finished += 1;
continue;
}
// The current removed tasks is not finished.
// Give it a second chance to finish.
tokio::task::yield_now().await;
// Recheck if it finished ... increase the counter and
// continue searching for more and ...
if old_task.is_finished() {
finished += 1;
continue;
}
// At this point we found a "definitive" unfinished task.
// Log unfinished task.
tracing::debug!(
target: UDP_TRACKER_LOG_TARGET,
local_addr,
removed_count = finished,
"Udp::run_udp_server::loop (got unfinished task)"
);
// If no finished tasks were found, abort the current
// unfinished task.
if finished == 0 {
// We make place aborting this task.
old_task.abort();
old_task_aborted = true;
tracing::warn!(
target: UDP_TRACKER_LOG_TARGET,
local_addr,
"Udp::run_udp_server::loop aborting request: (no finished tasks)"
);
break;
}
// At this point we found at least one finished task, but the
// current one is not finished and it was removed from the
// buffer, so we need to re-insert in in the buffer.
// Save the unfinished task for re-entry.
unfinished_task = Some(old_task);
}
// After this point there can't be a race condition because only
// one thread owns the active buffer. There is no way for the
// buffer to be full again. That means the "expects" should
// never happen.
// Reinsert the unfinished task if any.
if let Some(h) = unfinished_task {
self.rb.try_push(h).expect("it was previously inserted");
}
// Insert the new task.
//
// Notice that space has already been made for this new task in
// the buffer. One or many old task have already been finished
// or yielded, freeing space in the buffer. Or a single
// unfinished task has been aborted to make space for this new
// task.
if !new_task.is_finished() {
self.rb.try_push(new_task).expect("it should have space for this new task.");
}
old_task_aborted
}
}
}
}