forked from torrust/torrust-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmod.rs
More file actions
227 lines (185 loc) · 6.92 KB
/
mod.rs
File metadata and controls
227 lines (185 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
//! Module to handle the UDP server instances.
use std::fmt::Debug;
use derive_more::derive::Display;
use thiserror::Error;
use super::RawRequest;
pub mod bound_socket;
pub mod launcher;
pub mod processor;
pub mod receiver;
pub mod request_buffer;
pub mod spawner;
pub mod states;
/// Error that can occur when starting or stopping the UDP server.
///
/// Some errors triggered while starting the server are:
///
/// - The server cannot bind to the given address.
/// - It cannot get the bound address.
///
/// Some errors triggered while stopping the server are:
///
/// - The [`Server`] cannot send the shutdown signal to the spawned UDP service thread.
#[derive(Debug, Error)]
pub enum UdpError {
#[error("Any error to do with the socket")]
FailedToBindSocket(std::io::Error),
#[error("Any error to do with starting or stopping the sever")]
FailedToStartOrStopServer(String),
}
/// A UDP server.
///
/// It's an state machine. Configurations cannot be changed. This struct
/// represents concrete configuration and state. It allows to start and stop the
/// server but always keeping the same configuration.
///
/// > **NOTICE**: if the configurations changes after running the server it will
/// > reset to the initial value after stopping the server. This struct is not
/// > intended to persist configurations between runs.
#[allow(clippy::module_name_repetitions)]
#[derive(Debug, Display)]
pub struct Server<S>
where
S: std::fmt::Debug + std::fmt::Display,
{
/// The state of the server: `running` or `stopped`.
pub state: S,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer;
use torrust_server_lib::registar::Registar;
use torrust_tracker_configuration::{logging, Configuration};
use torrust_tracker_test_helpers::configuration::ephemeral_public;
use super::spawner::Spawner;
use super::Server;
use crate::container::UdpTrackerServerContainer;
fn initialize_global_services(configuration: &Configuration) {
initialize_static();
logging::setup(&configuration.logging);
}
fn initialize_static() {
torrust_tracker_clock::initialize_static();
bittorrent_udp_tracker_core::initialize_static();
}
#[tokio::test]
async fn it_should_be_able_to_start_and_stop() {
let cfg = Arc::new(ephemeral_public());
let core_config = Arc::new(cfg.core.clone());
let udp_tracker_config = Arc::new(
cfg.udp_trackers
.clone()
.expect("no UDP services array config provided")
.first()
.expect("no UDP test service config provided")
.clone(),
);
initialize_global_services(&cfg);
let udp_trackers = cfg.udp_trackers.clone().expect("missing UDP trackers configuration");
let config = &udp_trackers[0];
let bind_to = config.bind_address;
let register = &Registar::default();
let stopped = Server::new(Spawner::new(bind_to));
let udp_tracker_core_container = UdpTrackerCoreContainer::initialize(&core_config, &udp_tracker_config);
let udp_tracker_server_container = UdpTrackerServerContainer::initialize(&core_config);
let started = stopped
.start(
udp_tracker_core_container,
udp_tracker_server_container,
register.give_form(),
config.cookie_lifetime,
)
.await
.expect("it should start the server");
let stopped = started.stop().await.expect("it should stop the server");
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(stopped.state.spawner.bind_to, bind_to);
}
#[tokio::test]
async fn it_should_be_able_to_start_and_stop_with_wait() {
let cfg = Arc::new(ephemeral_public());
let core_config = Arc::new(cfg.core.clone());
let udp_tracker_config = Arc::new(
cfg.udp_trackers
.clone()
.expect("no UDP services array config provided")
.first()
.expect("no UDP test service config provided")
.clone(),
);
initialize_global_services(&cfg);
let bind_to = udp_tracker_config.bind_address;
let register = &Registar::default();
let stopped = Server::new(Spawner::new(bind_to));
let udp_tracker_core_container = UdpTrackerCoreContainer::initialize(&core_config, &udp_tracker_config);
let udp_tracker_server_container = UdpTrackerServerContainer::initialize(&core_config);
let started = stopped
.start(
udp_tracker_core_container,
udp_tracker_server_container,
register.give_form(),
udp_tracker_config.cookie_lifetime,
)
.await
.expect("it should start the server");
tokio::time::sleep(Duration::from_secs(1)).await;
let stopped = started.stop().await.expect("it should stop the server");
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(stopped.state.spawner.bind_to, bind_to);
}
}
/// Todo: submit test to tokio documentation.
#[cfg(test)]
mod test_tokio {
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
#[tokio::test]
async fn test_barrier_with_aborted_tasks() {
// Create a barrier that requires 10 tasks to proceed.
let barrier = Arc::new(Barrier::new(10));
let mut tasks = JoinSet::default();
let mut handles = Vec::default();
// Set Barrier to 9/10.
for _ in 0..9 {
let c = barrier.clone();
handles.push(tasks.spawn(async move {
c.wait().await;
}));
}
// Abort two tasks: Barrier: 7/10.
for _ in 0..2 {
if let Some(handle) = handles.pop() {
handle.abort();
}
}
// Spawn a single task: Barrier 8/10.
let c = barrier.clone();
handles.push(tasks.spawn(async move {
c.wait().await;
}));
// give a chance fro the barrier to release.
tokio::time::sleep(Duration::from_millis(50)).await;
// assert that the barrier isn't removed, i.e. 8, not 10.
for h in &handles {
assert!(!h.is_finished());
}
// Spawn two more tasks to trigger the barrier release: Barrier 10/10.
for _ in 0..2 {
let c = barrier.clone();
handles.push(tasks.spawn(async move {
c.wait().await;
}));
}
// give a chance fro the barrier to release.
tokio::time::sleep(Duration::from_millis(50)).await;
// assert that the barrier has been triggered
for h in &handles {
assert!(h.is_finished());
}
tasks.shutdown().await;
}
}