diff --git a/tests/common/fixtures.rs b/tests/common/fixtures.rs index 2abaca244..1ead0db0c 100644 --- a/tests/common/fixtures.rs +++ b/tests/common/fixtures.rs @@ -9,6 +9,7 @@ pub struct PeerBuilder { } impl PeerBuilder { + #[allow(dead_code)] pub fn default() -> PeerBuilder { Self { peer: default_peer_for_testing(), @@ -44,6 +45,7 @@ impl PeerBuilder { } } +#[allow(dead_code)] fn default_peer_for_testing() -> Peer { Peer { peer_id: peer::Id(*b"-qB00000000000000000"), @@ -56,6 +58,7 @@ fn default_peer_for_testing() -> Peer { } } +#[allow(dead_code)] pub fn invalid_info_hashes() -> Vec { [ "0".to_string(), diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 810620359..b57996292 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,2 +1,3 @@ pub mod fixtures; pub mod http; +pub mod udp; diff --git a/tests/common/udp.rs b/tests/common/udp.rs new file mode 100644 index 000000000..3d84e2b97 --- /dev/null +++ b/tests/common/udp.rs @@ -0,0 +1,41 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use tokio::net::UdpSocket; + +/// A generic UDP client +pub struct Client { + pub socket: Arc, +} + +impl Client { + #[allow(dead_code)] + pub async fn connected(remote_socket_addr: &SocketAddr, local_socket_addr: &SocketAddr) -> Client { + let client = Client::bind(local_socket_addr).await; + client.connect(remote_socket_addr).await; + client + } + + pub async fn bind(local_socket_addr: &SocketAddr) -> Self { + let socket = UdpSocket::bind(local_socket_addr).await.unwrap(); + Self { + socket: Arc::new(socket), + } + } + + pub async fn connect(&self, remote_address: &SocketAddr) { + self.socket.connect(remote_address).await.unwrap(); + } + + #[allow(dead_code)] + pub async fn send(&self, bytes: &[u8]) -> usize { + self.socket.writable().await.unwrap(); + self.socket.send(bytes).await.unwrap() + } + + #[allow(dead_code)] + pub async fn receive(&self, bytes: &mut [u8]) -> usize { + self.socket.readable().await.unwrap(); + self.socket.recv(bytes).await.unwrap() + } +} diff --git a/tests/udp.rs b/tests/udp.rs deleted file mode 100644 index 408f4f795..000000000 --- a/tests/udp.rs +++ /dev/null @@ -1,310 +0,0 @@ -/// Integration tests for UDP tracker server -/// -/// cargo test `udp_tracker_server` -- --nocapture -extern crate rand; - -mod udp_tracker_server { - use core::panic; - use std::io::Cursor; - use std::net::Ipv4Addr; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; - - use aquatic_udp_protocol::{ - AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, InfoHash, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, - Port, Request, Response, ScrapeRequest, TransactionId, - }; - use rand::{thread_rng, Rng}; - use tokio::net::UdpSocket; - use tokio::task::JoinHandle; - use torrust_tracker::config::{ephemeral_configuration, Configuration}; - use torrust_tracker::jobs::udp_tracker; - use torrust_tracker::tracker::statistics::Keeper; - use torrust_tracker::udp::MAX_PACKET_SIZE; - use torrust_tracker::{ephemeral_instance_keys, logging, static_time, tracker}; - - fn tracker_configuration() -> Arc { - Arc::new(ephemeral_configuration()) - } - - pub fn ephemeral_random_client_port() -> u16 { - // todo: this may produce random test failures because two tests can try to bind the same port. - // We could create a pool of available ports (with read/write lock) - let mut rng = thread_rng(); - rng.gen_range(49152..65535) - } - - pub struct UdpServer { - pub started: AtomicBool, - pub job: Option>, - pub bind_address: Option, - } - - impl UdpServer { - pub fn new() -> Self { - Self { - started: AtomicBool::new(false), - job: None, - bind_address: None, - } - } - - pub fn start(&mut self, configuration: &Arc) { - if !self.started.load(Ordering::Relaxed) { - // Set the time of Torrust app starting - lazy_static::initialize(&static_time::TIME_AT_APP_START); - - // Initialize the Ephemeral Instance Random Seed - lazy_static::initialize(&ephemeral_instance_keys::RANDOM_SEED); - - // Initialize stats tracker - let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); - - // Initialize Torrust tracker - let tracker = match tracker::Tracker::new(&configuration.clone(), Some(stats_event_sender), stats_repository) { - Ok(tracker) => Arc::new(tracker), - Err(error) => { - panic!("{}", error) - } - }; - - // Initialize logging - logging::setup(configuration); - - let udp_tracker_config = &configuration.udp_trackers[0]; - - // Start the UDP tracker job - self.job = Some(udp_tracker::start_job(udp_tracker_config, tracker)); - - self.bind_address = Some(udp_tracker_config.bind_address.clone()); - - self.started.store(true, Ordering::Relaxed); - } - } - } - - fn new_running_udp_server(configuration: &Arc) -> UdpServer { - let mut udp_server = UdpServer::new(); - udp_server.start(configuration); - udp_server - } - - struct UdpClient { - socket: Arc, - } - - impl UdpClient { - async fn bind(local_address: &str) -> Self { - let socket = UdpSocket::bind(local_address).await.unwrap(); - Self { - socket: Arc::new(socket), - } - } - - async fn connect(&self, remote_address: &str) { - self.socket.connect(remote_address).await.unwrap(); - } - - async fn send(&self, bytes: &[u8]) -> usize { - self.socket.writable().await.unwrap(); - self.socket.send(bytes).await.unwrap() - } - - async fn receive(&self, bytes: &mut [u8]) -> usize { - self.socket.readable().await.unwrap(); - self.socket.recv(bytes).await.unwrap() - } - } - - /// Creates a new `UdpClient` connected to a Udp server - async fn new_connected_udp_client(remote_address: &str) -> UdpClient { - let client = UdpClient::bind(&source_address(ephemeral_random_client_port())).await; - client.connect(remote_address).await; - client - } - - struct UdpTrackerClient { - pub udp_client: UdpClient, - } - - impl UdpTrackerClient { - async fn send(&self, request: Request) -> usize { - // Write request into a buffer - let request_buffer = vec![0u8; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(request_buffer); - - let request_data = match request.write(&mut cursor) { - Ok(_) => { - #[allow(clippy::cast_possible_truncation)] - let position = cursor.position() as usize; - let inner_request_buffer = cursor.get_ref(); - // Return slice which contains written request data - &inner_request_buffer[..position] - } - Err(e) => panic!("could not write request to bytes: {e}."), - }; - - self.udp_client.send(request_data).await - } - - async fn receive(&self) -> Response { - let mut response_buffer = [0u8; MAX_PACKET_SIZE]; - - let payload_size = self.udp_client.receive(&mut response_buffer).await; - - Response::from_bytes(&response_buffer[..payload_size], true).unwrap() - } - } - - /// Creates a new `UdpTrackerClient` connected to a Udp Tracker server - async fn new_connected_udp_tracker_client(remote_address: &str) -> UdpTrackerClient { - let udp_client = new_connected_udp_client(remote_address).await; - UdpTrackerClient { udp_client } - } - - fn empty_udp_request() -> [u8; MAX_PACKET_SIZE] { - [0; MAX_PACKET_SIZE] - } - - fn empty_buffer() -> [u8; MAX_PACKET_SIZE] { - [0; MAX_PACKET_SIZE] - } - - /// Generates the source address for the UDP client - fn source_address(port: u16) -> String { - format!("127.0.0.1:{port}") - } - - fn is_error_response(response: &Response, error_message: &str) -> bool { - match response { - Response::Error(error_response) => error_response.message.starts_with(error_message), - _ => false, - } - } - - fn is_connect_response(response: &Response, transaction_id: TransactionId) -> bool { - match response { - Response::Connect(connect_response) => connect_response.transaction_id == transaction_id, - _ => false, - } - } - - fn is_ipv4_announce_response(response: &Response) -> bool { - matches!(response, Response::AnnounceIpv4(_)) - } - - fn is_scrape_response(response: &Response) -> bool { - matches!(response, Response::Scrape(_)) - } - - #[tokio::test] - async fn should_return_a_bad_request_response_when_the_client_sends_an_empty_request() { - let configuration = tracker_configuration(); - - let udp_server = new_running_udp_server(&configuration); - - let client = new_connected_udp_client(&udp_server.bind_address.unwrap()).await; - - client.send(&empty_udp_request()).await; - - let mut buffer = empty_buffer(); - client.receive(&mut buffer).await; - let response = Response::from_bytes(&buffer, true).unwrap(); - - assert!(is_error_response(&response, "bad request")); - } - - #[tokio::test] - async fn should_return_a_connect_response_when_the_client_sends_a_connection_request() { - let configuration = tracker_configuration(); - - let udp_server = new_running_udp_server(&configuration); - - let client = new_connected_udp_tracker_client(&udp_server.bind_address.unwrap()).await; - - let connect_request = ConnectRequest { - transaction_id: TransactionId(123), - }; - - client.send(connect_request.into()).await; - - let response = client.receive().await; - - assert!(is_connect_response(&response, TransactionId(123))); - } - - async fn send_connection_request(transaction_id: TransactionId, client: &UdpTrackerClient) -> ConnectionId { - let connect_request = ConnectRequest { transaction_id }; - - client.send(connect_request.into()).await; - - let response = client.receive().await; - - match response { - Response::Connect(connect_response) => connect_response.connection_id, - _ => panic!("error connecting to udp server {:?}", response), - } - } - - #[tokio::test] - async fn should_return_an_announce_response_when_the_client_sends_an_announce_request() { - let configuration = tracker_configuration(); - - let udp_server = new_running_udp_server(&configuration); - - let client = new_connected_udp_tracker_client(&udp_server.bind_address.unwrap()).await; - - let connection_id = send_connection_request(TransactionId(123), &client).await; - - // Send announce request - - let announce_request = AnnounceRequest { - connection_id: ConnectionId(connection_id.0), - transaction_id: TransactionId(123i32), - info_hash: InfoHash([0u8; 20]), - peer_id: PeerId([255u8; 20]), - bytes_downloaded: NumberOfBytes(0i64), - bytes_uploaded: NumberOfBytes(0i64), - bytes_left: NumberOfBytes(0i64), - event: AnnounceEvent::Started, - ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)), - key: PeerKey(0u32), - peers_wanted: NumberOfPeers(1i32), - port: Port(client.udp_client.socket.local_addr().unwrap().port()), - }; - - client.send(announce_request.into()).await; - - let response = client.receive().await; - - assert!(is_ipv4_announce_response(&response)); - } - - #[tokio::test] - async fn should_return_a_scrape_response_when_the_client_sends_a_scrape_request() { - let configuration = tracker_configuration(); - - let udp_server = new_running_udp_server(&configuration); - - let client = new_connected_udp_tracker_client(&udp_server.bind_address.unwrap()).await; - - let connection_id = send_connection_request(TransactionId(123), &client).await; - - // Send scrape request - - // Full scrapes are not allowed so it will return "bad request" error with empty vector - let info_hashes = vec![InfoHash([0u8; 20])]; - - let scrape_request = ScrapeRequest { - connection_id: ConnectionId(connection_id.0), - transaction_id: TransactionId(123i32), - info_hashes, - }; - - client.send(scrape_request.into()).await; - - let response = client.receive().await; - - assert!(is_scrape_response(&response)); - } -} diff --git a/tests/udp/asserts.rs b/tests/udp/asserts.rs new file mode 100644 index 000000000..bf8fb6728 --- /dev/null +++ b/tests/udp/asserts.rs @@ -0,0 +1,23 @@ +use aquatic_udp_protocol::{Response, TransactionId}; + +pub fn is_error_response(response: &Response, error_message: &str) -> bool { + match response { + Response::Error(error_response) => error_response.message.starts_with(error_message), + _ => false, + } +} + +pub fn is_connect_response(response: &Response, transaction_id: TransactionId) -> bool { + match response { + Response::Connect(connect_response) => connect_response.transaction_id == transaction_id, + _ => false, + } +} + +pub fn is_ipv4_announce_response(response: &Response) -> bool { + matches!(response, Response::AnnounceIpv4(_)) +} + +pub fn is_scrape_response(response: &Response) -> bool { + matches!(response, Response::Scrape(_)) +} diff --git a/tests/udp/client.rs b/tests/udp/client.rs new file mode 100644 index 000000000..3cb4d6134 --- /dev/null +++ b/tests/udp/client.rs @@ -0,0 +1,65 @@ +use std::io::Cursor; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +use aquatic_udp_protocol::{Request, Response}; +use rand::{thread_rng, Rng}; +use torrust_tracker::udp::MAX_PACKET_SIZE; + +use crate::common::udp::Client as UdpClient; + +/// Creates a new generic UDP client connected to a generic UDP server +pub async fn new_udp_client_connected(remote_address: &SocketAddr) -> UdpClient { + let local_address = loopback_socket_address(ephemeral_random_client_port()); + UdpClient::connected(remote_address, &local_address).await +} + +/// Creates a new UDP tracker client connected to a UDP tracker server +pub async fn new_udp_tracker_client_connected(remote_address: &SocketAddr) -> Client { + let udp_client = new_udp_client_connected(remote_address).await; + Client { udp_client } +} + +pub fn ephemeral_random_client_port() -> u16 { + // todo: this may produce random test failures because two tests can try to bind the same port. + // We could create a pool of available ports (with read/write lock) + let mut rng = thread_rng(); + rng.gen_range(49152..65535) +} + +fn loopback_socket_address(port: u16) -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port) +} + +/// A UDP tracker client +pub struct Client { + pub udp_client: UdpClient, // A generic UDP client +} + +impl Client { + pub async fn send(&self, request: Request) -> usize { + // Write request into a buffer + let request_buffer = vec![0u8; MAX_PACKET_SIZE]; + let mut cursor = Cursor::new(request_buffer); + + let request_data = match request.write(&mut cursor) { + Ok(_) => { + #[allow(clippy::cast_possible_truncation)] + let position = cursor.position() as usize; + let inner_request_buffer = cursor.get_ref(); + // Return slice which contains written request data + &inner_request_buffer[..position] + } + Err(e) => panic!("could not write request to bytes: {e}."), + }; + + self.udp_client.send(request_data).await + } + + pub async fn receive(&self) -> Response { + let mut response_buffer = [0u8; MAX_PACKET_SIZE]; + + let payload_size = self.udp_client.receive(&mut response_buffer).await; + + Response::from_bytes(&response_buffer[..payload_size], true).unwrap() + } +} diff --git a/tests/udp/mod.rs b/tests/udp/mod.rs new file mode 100644 index 000000000..16a77bb99 --- /dev/null +++ b/tests/udp/mod.rs @@ -0,0 +1,3 @@ +pub mod asserts; +pub mod client; +pub mod server; diff --git a/tests/udp/server.rs b/tests/udp/server.rs new file mode 100644 index 000000000..401d4cf92 --- /dev/null +++ b/tests/udp/server.rs @@ -0,0 +1,67 @@ +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use tokio::task::JoinHandle; +use torrust_tracker::config::{ephemeral_configuration, Configuration}; +use torrust_tracker::jobs::udp_tracker; +use torrust_tracker::tracker::statistics::Keeper; +use torrust_tracker::{ephemeral_instance_keys, logging, static_time, tracker}; + +pub fn start_udp_tracker(configuration: &Arc) -> Server { + let mut udp_server = Server::new(); + udp_server.start(configuration); + udp_server +} + +pub fn tracker_configuration() -> Arc { + Arc::new(ephemeral_configuration()) +} +pub struct Server { + pub started: AtomicBool, + pub job: Option>, + pub bind_address: Option, +} + +impl Server { + pub fn new() -> Self { + Self { + started: AtomicBool::new(false), + job: None, + bind_address: None, + } + } + + pub fn start(&mut self, configuration: &Arc) { + if !self.started.load(Ordering::Relaxed) { + // Set the time of Torrust app starting + lazy_static::initialize(&static_time::TIME_AT_APP_START); + + // Initialize the Ephemeral Instance Random Seed + lazy_static::initialize(&ephemeral_instance_keys::RANDOM_SEED); + + // Initialize stats tracker + let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); + + // Initialize Torrust tracker + let tracker = match tracker::Tracker::new(&configuration.clone(), Some(stats_event_sender), stats_repository) { + Ok(tracker) => Arc::new(tracker), + Err(error) => { + panic!("{}", error) + } + }; + + // Initialize logging + logging::setup(configuration); + + let udp_tracker_config = &configuration.udp_trackers[0]; + + // Start the UDP tracker job + self.job = Some(udp_tracker::start_job(udp_tracker_config, tracker)); + + self.bind_address = Some(udp_tracker_config.bind_address.parse().unwrap()); + + self.started.store(true, Ordering::Relaxed); + } + } +} diff --git a/tests/udp_tracker.rs b/tests/udp_tracker.rs new file mode 100644 index 000000000..0287d01b7 --- /dev/null +++ b/tests/udp_tracker.rs @@ -0,0 +1,175 @@ +/// Integration tests for UDP tracker server +/// +/// cargo test `udp_tracker_server` -- --nocapture +extern crate rand; + +mod common; +mod udp; + +mod udp_tracker_server { + + // UDP tracker documentation: + // + // BEP 15. UDP Tracker Protocol for BitTorrent + // https://www.bittorrent.org/beps/bep_0015.html + + use core::panic; + + use aquatic_udp_protocol::{ConnectRequest, ConnectionId, Response, TransactionId}; + use torrust_tracker::udp::MAX_PACKET_SIZE; + + use crate::udp::asserts::is_error_response; + use crate::udp::client::{new_udp_client_connected, Client}; + use crate::udp::server::{start_udp_tracker, tracker_configuration}; + + fn empty_udp_request() -> [u8; MAX_PACKET_SIZE] { + [0; MAX_PACKET_SIZE] + } + + fn empty_buffer() -> [u8; MAX_PACKET_SIZE] { + [0; MAX_PACKET_SIZE] + } + + async fn send_connection_request(transaction_id: TransactionId, client: &Client) -> ConnectionId { + let connect_request = ConnectRequest { transaction_id }; + + client.send(connect_request.into()).await; + + let response = client.receive().await; + + match response { + Response::Connect(connect_response) => connect_response.connection_id, + _ => panic!("error connecting to udp server {:?}", response), + } + } + + #[tokio::test] + async fn should_return_a_bad_request_response_when_the_client_sends_an_empty_request() { + let configuration = tracker_configuration(); + + let udp_server = start_udp_tracker(&configuration); + + let client = new_udp_client_connected(&udp_server.bind_address.unwrap()).await; + + client.send(&empty_udp_request()).await; + + let mut buffer = empty_buffer(); + client.receive(&mut buffer).await; + let response = Response::from_bytes(&buffer, true).unwrap(); + + assert!(is_error_response(&response, "bad request")); + } + + mod receiving_a_connection_request { + use aquatic_udp_protocol::{ConnectRequest, TransactionId}; + + use crate::udp::asserts::is_connect_response; + use crate::udp::client::new_udp_tracker_client_connected; + use crate::udp::server::{start_udp_tracker, tracker_configuration}; + + #[tokio::test] + async fn should_return_a_connect_response() { + let configuration = tracker_configuration(); + + let udp_server = start_udp_tracker(&configuration); + + let client = new_udp_tracker_client_connected(&udp_server.bind_address.unwrap()).await; + + let connect_request = ConnectRequest { + transaction_id: TransactionId(123), + }; + + client.send(connect_request.into()).await; + + let response = client.receive().await; + + assert!(is_connect_response(&response, TransactionId(123))); + } + } + + mod receiving_an_announce_request { + use std::net::Ipv4Addr; + + use aquatic_udp_protocol::{ + AnnounceEvent, AnnounceRequest, ConnectionId, InfoHash, NumberOfBytes, NumberOfPeers, PeerId, PeerKey, Port, + TransactionId, + }; + + use crate::udp::asserts::is_ipv4_announce_response; + use crate::udp::client::new_udp_tracker_client_connected; + use crate::udp::server::{start_udp_tracker, tracker_configuration}; + use crate::udp_tracker_server::send_connection_request; + + #[tokio::test] + async fn should_return_an_announce_response() { + let configuration = tracker_configuration(); + + let udp_server = start_udp_tracker(&configuration); + + let client = new_udp_tracker_client_connected(&udp_server.bind_address.unwrap()).await; + + let connection_id = send_connection_request(TransactionId(123), &client).await; + + // Send announce request + + let announce_request = AnnounceRequest { + connection_id: ConnectionId(connection_id.0), + transaction_id: TransactionId(123i32), + info_hash: InfoHash([0u8; 20]), + peer_id: PeerId([255u8; 20]), + bytes_downloaded: NumberOfBytes(0i64), + bytes_uploaded: NumberOfBytes(0i64), + bytes_left: NumberOfBytes(0i64), + event: AnnounceEvent::Started, + ip_address: Some(Ipv4Addr::new(0, 0, 0, 0)), + key: PeerKey(0u32), + peers_wanted: NumberOfPeers(1i32), + port: Port(client.udp_client.socket.local_addr().unwrap().port()), + }; + + client.send(announce_request.into()).await; + + let response = client.receive().await; + + assert!(is_ipv4_announce_response(&response)); + } + } + + mod receiving_an_scrape_request { + use aquatic_udp_protocol::{ConnectionId, InfoHash, ScrapeRequest, TransactionId}; + + use crate::udp::asserts::is_scrape_response; + use crate::udp::client::new_udp_tracker_client_connected; + use crate::udp::server::{start_udp_tracker, tracker_configuration}; + use crate::udp_tracker_server::send_connection_request; + + #[tokio::test] + async fn should_return_a_scrape_response() { + let configuration = tracker_configuration(); + + let udp_server = start_udp_tracker(&configuration); + + let client = new_udp_tracker_client_connected(&udp_server.bind_address.unwrap()).await; + + let connection_id = send_connection_request(TransactionId(123), &client).await; + + // Send scrape request + + // Full scrapes are not allowed you need to pass an array of info hashes otherwise + // it will return "bad request" error with empty vector + let info_hashes = vec![InfoHash([0u8; 20])]; + + let scrape_request = ScrapeRequest { + connection_id: ConnectionId(connection_id.0), + transaction_id: TransactionId(123i32), + info_hashes, + }; + + client.send(scrape_request.into()).await; + + let response = client.receive().await; + + assert!(is_scrape_response(&response)); + } + } +}