diff --git a/Cargo.lock b/Cargo.lock index 05b439353..bd572c4fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2962,10 +2962,55 @@ dependencies = [ "thiserror", "tokio", "toml", + "torrust-tracker-configuration", + "torrust-tracker-located-error", + "torrust-tracker-primitives", + "torrust-tracker-test-helpers", "uuid 1.2.1", "warp", ] +[[package]] +name = "torrust-tracker-configuration" +version = "2.3.0" +dependencies = [ + "config", + "log", + "serde", + "serde_with", + "thiserror", + "toml", + "torrust-tracker-located-error", + "torrust-tracker-primitives", + "uuid 1.2.1", +] + +[[package]] +name = "torrust-tracker-located-error" +version = "2.3.0" +dependencies = [ + "log", + "thiserror", +] + +[[package]] +name = "torrust-tracker-primitives" +version = "2.3.0" +dependencies = [ + "derive_more", + "serde", +] + +[[package]] +name = "torrust-tracker-test-helpers" +version = "2.3.0" +dependencies = [ + "lazy_static", + "rand", + "tokio", + "torrust-tracker-configuration", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index 917bc9e31..ebff271c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,31 +1,19 @@ [package] -edition = "2021" name = "torrust-tracker" -version = "2.3.0" -license = "AGPL-3.0" -authors = ["Mick van Dijke "] description = "A feature rich BitTorrent tracker." -repository = "https://github.com/torrust/torrust-tracker" - -[profile.dev] -debug = 1 -opt-level = 1 -lto = "thin" +license = "AGPL-3.0" +authors.workspace = true +edition.workspace = true +version.workspace = true -[profile.release] -debug = 1 -opt-level = 3 -lto = "fat" +[workspace.package] +authors = ["Nautilus Cyberneering , Mick van Dijke "] +edition = "2021" +repository = "https://github.com/torrust/torrust-tracker" +version = "2.3.0" [dependencies] -tokio = { version = "1", features = [ - "rt-multi-thread", - "net", - "sync", - "macros", - "signal", -] } - +tokio = { version = "1", features = ["rt-multi-thread", "net", "sync", "macros", "signal"] } serde = { version = "1.0", features = ["derive"] } serde_bencode = "^0.2.3" serde_json = "1.0" @@ -34,35 +22,30 @@ hex = "0.4.3" percent-encoding = "2" binascii = "0.1" lazy_static = "1.4" - openssl = { version = "0.10", features = ["vendored"] } - warp = { version = "0.3", features = ["tls"] } - config = "0.13" toml = "0.5" - log = { version = "0.4", features = ["release_max_level_info"] } fern = "0.6" chrono = "0.4" - r2d2 = "0.8" r2d2_mysql = "21" r2d2_sqlite = { version = "0.21", features = ["bundled"] } - rand = "0.8" derive_more = "0.99" thiserror = "1.0" futures = "0.3" async-trait = "0.1" - aquatic_udp_protocol = "0.2" uuid = { version = "1", features = ["v4"] } axum = "0.6.1" axum-server = { version = "0.4.4", features = ["tls-rustls"] } axum-client-ip = "0.4.0" bip_bencode = "0.4.4" - +torrust-tracker-primitives = { path = "packages/primitives" } +torrust-tracker-configuration = { path = "packages/configuration" } +torrust-tracker-located-error = { path = "packages/located-error" } [dev-dependencies] mockall = "0.11" @@ -71,3 +54,22 @@ serde_urlencoded = "0.7.1" serde_repr = "0.1.10" serde_bytes = "0.11.8" local-ip-address = "0.5.1" +torrust-tracker-test-helpers = { path = "packages/test-helpers" } + +[workspace] +members = [ + "packages/configuration", + "packages/primitives", + "packages/test-helpers", + "packages/located-error", +] + +[profile.dev] +debug = 1 +opt-level = 1 +lto = "thin" + +[profile.release] +debug = 1 +opt-level = 3 +lto = "fat" diff --git a/packages/configuration/Cargo.toml b/packages/configuration/Cargo.toml new file mode 100644 index 000000000..a6f1740a0 --- /dev/null +++ b/packages/configuration/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "torrust-tracker-configuration" +version.workspace = true +authors.workspace = true +edition.workspace = true + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_with = "2.0" +config = "0.13" +toml = "0.5" +log = { version = "0.4", features = ["release_max_level_info"] } +thiserror = "1.0" +torrust-tracker-primitives = { path = "../primitives" } +torrust-tracker-located-error = { path = "../located-error" } +uuid = { version = "1", features = ["v4"] } diff --git a/src/config.rs b/packages/configuration/src/lib.rs similarity index 93% rename from src/config.rs rename to packages/configuration/src/lib.rs index 7ed0f9fa7..2121752c5 100644 --- a/src/config.rs +++ b/packages/configuration/src/lib.rs @@ -8,17 +8,13 @@ use std::{env, fs}; use config::{Config, ConfigError, File, FileFormat}; use log::warn; -use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, NoneAsEmptyString}; use thiserror::Error; -use {std, toml}; +use torrust_tracker_located_error::{Located, LocatedError}; +use torrust_tracker_primitives::{DatabaseDriver, TrackerMode}; -use crate::databases::driver::Driver; -use crate::located_error::{Located, LocatedError}; -use crate::tracker::mode; - -#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone)] pub struct UdpTracker { pub enabled: bool, pub bind_address: String, @@ -62,8 +58,8 @@ impl HttpApi { #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] pub struct Configuration { pub log_level: Option, - pub mode: mode::Mode, - pub db_driver: Driver, + pub mode: TrackerMode, + pub db_driver: DatabaseDriver, pub db_path: String, pub announce_interval: u32, pub min_announce_interval: u32, @@ -122,41 +118,34 @@ pub fn ephemeral_configuration() -> Configuration { }; // Ephemeral socket address for API - let api_port = random_port(); + let api_port = 0u16; config.http_api.enabled = true; config.http_api.bind_address = format!("127.0.0.1:{}", &api_port); // Ephemeral socket address for UDP tracker - let upd_port = random_port(); + let udp_port = 0u16; config.udp_trackers[0].enabled = true; - config.udp_trackers[0].bind_address = format!("127.0.0.1:{}", &upd_port); + config.udp_trackers[0].bind_address = format!("127.0.0.1:{}", &udp_port); // Ephemeral socket address for HTTP tracker - let http_port = random_port(); + let http_port = 0u16; config.http_trackers[0].enabled = true; config.http_trackers[0].bind_address = format!("127.0.0.1:{}", &http_port); // Ephemeral sqlite database let temp_directory = env::temp_dir(); - let temp_file = temp_directory.join(format!("data_{}_{}_{}.db", &api_port, &upd_port, &http_port)); + let temp_file = temp_directory.join(format!("data_{}_{}_{}.db", &api_port, &udp_port, &http_port)); config.db_path = temp_file.to_str().unwrap().to_owned(); config } -fn random_port() -> u16 { - // todo: this may produce random test failures because two tests can try to bind the same port. - // We could create a pool of available ports (with read/write lock) - let mut rng = thread_rng(); - rng.gen_range(49152..65535) -} - impl Default for Configuration { fn default() -> Self { let mut configuration = Configuration { log_level: Option::from(String::from("info")), - mode: mode::Mode::Public, - db_driver: Driver::Sqlite3, + mode: TrackerMode::Public, + db_driver: DatabaseDriver::Sqlite3, db_path: String::from("./storage/database/data.db"), announce_interval: 120, min_announce_interval: 120, @@ -266,7 +255,7 @@ impl Configuration { #[cfg(test)] mod tests { - use crate::config::Configuration; + use crate::Configuration; #[cfg(test)] fn default_config_toml() -> String { @@ -325,7 +314,7 @@ mod tests { fn configuration_should_contain_the_external_ip() { let configuration = Configuration::default(); - assert_eq!(configuration.external_ip, Option::Some(String::from("0.0.0.0"))); + assert_eq!(configuration.external_ip, Some(String::from("0.0.0.0"))); } #[test] diff --git a/packages/located-error/Cargo.toml b/packages/located-error/Cargo.toml new file mode 100644 index 000000000..c4b2ef726 --- /dev/null +++ b/packages/located-error/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "torrust-tracker-located-error" +version.workspace = true +authors.workspace = true +edition.workspace = true + +[dependencies] +log = { version = "0.4", features = ["release_max_level_info"] } +thiserror = "1.0" diff --git a/src/located_error.rs b/packages/located-error/src/lib.rs similarity index 100% rename from src/located_error.rs rename to packages/located-error/src/lib.rs diff --git a/packages/primitives/Cargo.toml b/packages/primitives/Cargo.toml new file mode 100644 index 000000000..9aec28384 --- /dev/null +++ b/packages/primitives/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "torrust-tracker-primitives" +version.workspace = true +authors.workspace = true +edition.workspace = true + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +derive_more = "0.99" diff --git a/src/tracker/mode.rs b/packages/primitives/src/lib.rs similarity index 70% rename from src/tracker/mode.rs rename to packages/primitives/src/lib.rs index a0dba6e67..bcd48145f 100644 --- a/src/tracker/mode.rs +++ b/packages/primitives/src/lib.rs @@ -1,8 +1,14 @@ -use serde; use serde::{Deserialize, Serialize}; +// TODO: Move to the database crate once that gets its own crate. +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, derive_more::Display, Clone)] +pub enum DatabaseDriver { + Sqlite3, + MySQL, +} + #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Eq, Debug)] -pub enum Mode { +pub enum TrackerMode { // Will track every new info hash and serve every peer. #[serde(rename = "public")] Public, diff --git a/packages/test-helpers/Cargo.toml b/packages/test-helpers/Cargo.toml new file mode 100644 index 000000000..5be0e8aba --- /dev/null +++ b/packages/test-helpers/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "torrust-tracker-test-helpers" +version.workspace = true +authors.workspace = true +edition.workspace = true + +[dependencies] +tokio = { version = "1", features = ["rt-multi-thread", "net", "sync", "macros", "signal"] } +lazy_static = "1.4" +rand = "0.8.5" +torrust-tracker-configuration = { path = "../configuration"} diff --git a/packages/test-helpers/src/configuration.rs b/packages/test-helpers/src/configuration.rs new file mode 100644 index 000000000..f7c584d55 --- /dev/null +++ b/packages/test-helpers/src/configuration.rs @@ -0,0 +1,34 @@ +use std::env; + +use torrust_tracker_configuration::Configuration; + +use crate::random; + +/// This configuration is used for testing. It generates random config values so they do not collide +/// if you run more than one tracker at the same time. +/// +/// # Panics +/// +/// Will panic if it can't convert the temp file path to string +#[must_use] +pub fn ephemeral() -> Configuration { + let mut config = Configuration { + log_level: Some("off".to_owned()), + ..Default::default() + }; + + // Ephemeral socket addresses + let bind_addr = "127.0.0.1:0".to_string(); + + config.http_api.bind_address = bind_addr.to_string(); + config.udp_trackers[0].bind_address = bind_addr; + + // Ephemeral sqlite database + let temp_directory = env::temp_dir(); + let random_db_id = random::string(16); + let temp_file = temp_directory.join(format!("data_{random_db_id}.db")); + + config.db_path = temp_file.to_str().unwrap().to_owned(); + + config +} diff --git a/packages/test-helpers/src/lib.rs b/packages/test-helpers/src/lib.rs new file mode 100644 index 000000000..e0f350131 --- /dev/null +++ b/packages/test-helpers/src/lib.rs @@ -0,0 +1,2 @@ +pub mod configuration; +pub mod random; diff --git a/packages/test-helpers/src/random.rs b/packages/test-helpers/src/random.rs new file mode 100644 index 000000000..ffb2ccd6f --- /dev/null +++ b/packages/test-helpers/src/random.rs @@ -0,0 +1,7 @@ +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; + +/// Returns a random alphanumeric string of a certain size. +pub fn string(size: usize) -> String { + thread_rng().sample_iter(&Alphanumeric).take(size).map(char::from).collect() +} diff --git a/src/apis/middlewares/auth.rs b/src/apis/middlewares/auth.rs index 758ba1cda..f2745d42e 100644 --- a/src/apis/middlewares/auth.rs +++ b/src/apis/middlewares/auth.rs @@ -5,9 +5,9 @@ use axum::http::Request; use axum::middleware::Next; use axum::response::{IntoResponse, Response}; use serde::Deserialize; +use torrust_tracker_configuration::{Configuration, HttpApi}; use crate::apis::responses::unhandled_rejection_response; -use crate::config::{Configuration, HttpApi}; #[derive(Deserialize, Debug)] pub struct QueryParams { diff --git a/src/apis/server.rs b/src/apis/server.rs index bbb3e5852..5ec22f253 100644 --- a/src/apis/server.rs +++ b/src/apis/server.rs @@ -1,15 +1,156 @@ -use std::net::SocketAddr; +use std::net::{SocketAddr, TcpListener}; use std::sync::Arc; use axum_server::tls_rustls::RustlsConfig; use axum_server::Handle; use futures::Future; use log::info; +use tokio::task::JoinHandle; use warp::hyper; use super::routes::router; +use crate::signals::shutdown_signal_with_message; use crate::tracker::Tracker; +#[derive(Debug)] +pub enum Error { + Error(String), +} + +#[allow(clippy::module_name_repetitions)] +pub type StoppedApiServer = ApiServer; +#[allow(clippy::module_name_repetitions)] +pub type RunningApiServer = ApiServer; + +#[allow(clippy::module_name_repetitions)] +pub struct ApiServer { + pub cfg: torrust_tracker_configuration::HttpApi, + pub tracker: Arc, + pub state: S, +} + +pub struct Stopped; + +pub struct Running { + pub bind_address: SocketAddr, + stop_job_sender: tokio::sync::oneshot::Sender, + job: JoinHandle<()>, +} + +impl ApiServer { + pub fn new(cfg: torrust_tracker_configuration::HttpApi, tracker: Arc) -> Self { + Self { + cfg, + tracker, + state: Stopped {}, + } + } + + /// # Errors + /// + /// Will return `Err` if `TcpListener` can not bind to `bind_address`. + pub fn start(self) -> Result, Error> { + let listener = TcpListener::bind(&self.cfg.bind_address).map_err(|e| Error::Error(e.to_string()))?; + + let bind_address = listener.local_addr().map_err(|e| Error::Error(e.to_string()))?; + + let cfg = self.cfg.clone(); + let tracker = self.tracker.clone(); + + let (sender, receiver) = tokio::sync::oneshot::channel::(); + + let job = tokio::spawn(async move { + if let (true, Some(ssl_cert_path), Some(ssl_key_path)) = (cfg.ssl_enabled, cfg.ssl_cert_path, cfg.ssl_key_path) { + let tls_config = RustlsConfig::from_pem_file(ssl_cert_path, ssl_key_path) + .await + .expect("Could not read ssl cert and/or key."); + + start_tls_from_tcp_listener_with_graceful_shutdown(listener, tls_config, &tracker, receiver) + .await + .expect("Could not start from tcp listener with tls."); + } else { + start_from_tcp_listener_with_graceful_shutdown(listener, &tracker, receiver) + .await + .expect("Could not start from tcp listener."); + } + }); + + let running_api_server: ApiServer = ApiServer { + cfg: self.cfg, + tracker: self.tracker, + state: Running { + bind_address, + stop_job_sender: sender, + job, + }, + }; + + Ok(running_api_server) + } +} + +impl ApiServer { + /// # Errors + /// + /// Will return `Err` if the oneshot channel to send the stop signal + /// has already been called once. + pub async fn stop(self) -> Result, Error> { + self.state.stop_job_sender.send(1).map_err(|e| Error::Error(e.to_string()))?; + + let _ = self.state.job.await; + + let stopped_api_server: ApiServer = ApiServer { + cfg: self.cfg, + tracker: self.tracker, + state: Stopped {}, + }; + + Ok(stopped_api_server) + } +} + +pub fn start_from_tcp_listener_with_graceful_shutdown( + tcp_listener: TcpListener, + tracker: &Arc, + shutdown_signal: tokio::sync::oneshot::Receiver, +) -> impl Future> { + let app = router(tracker); + + let context = tcp_listener.local_addr().expect("Could not get context."); + + axum::Server::from_tcp(tcp_listener) + .expect("Could not bind to tcp listener.") + .serve(app.into_make_service()) + .with_graceful_shutdown(shutdown_signal_with_message( + shutdown_signal, + format!("Shutting down {context}.."), + )) +} + +pub fn start_tls_from_tcp_listener_with_graceful_shutdown( + tcp_listener: TcpListener, + tls_config: RustlsConfig, + tracker: &Arc, + shutdown_signal: tokio::sync::oneshot::Receiver, +) -> impl Future> { + let app = router(tracker); + + let context = tcp_listener.local_addr().expect("Could not get context."); + + let handle = Handle::new(); + + let cloned_handle = handle.clone(); + + tokio::spawn(async move { + shutdown_signal_with_message(shutdown_signal, format!("Shutting down {context}..")).await; + cloned_handle.shutdown(); + }); + + axum_server::from_tcp_rustls(tcp_listener, tls_config) + .handle(handle) + .serve(app.into_make_service()) +} + pub fn start(socket_addr: SocketAddr, tracker: &Arc) -> impl Future> { let app = router(tracker); @@ -41,3 +182,36 @@ pub fn start_tls( .handle(handle) .serve(app.into_make_service()) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use torrust_tracker_configuration::Configuration; + use torrust_tracker_test_helpers::configuration::ephemeral; + + use crate::apis::server::ApiServer; + use crate::tracker; + use crate::tracker::statistics; + + fn tracker_configuration() -> Arc { + Arc::new(ephemeral()) + } + + #[tokio::test] + async fn it_should_be_able_to_start_from_stopped_state_and_then_stop_again() { + let cfg = tracker_configuration(); + + let tracker = Arc::new(tracker::Tracker::new(&cfg, None, statistics::Repo::new()).unwrap()); + + let stopped_api_server = ApiServer::new(cfg.http_api.clone(), tracker); + + let running_api_server_result = stopped_api_server.start(); + + assert!(running_api_server_result.is_ok()); + + let running_api_server = running_api_server_result.unwrap(); + + assert!(running_api_server.stop().await.is_ok()); + } +} diff --git a/src/databases/driver.rs b/src/databases/driver.rs index c601f1866..4ce6ea515 100644 --- a/src/databases/driver.rs +++ b/src/databases/driver.rs @@ -1,30 +1,22 @@ -use serde::{Deserialize, Serialize}; +use torrust_tracker_primitives::DatabaseDriver; use super::error::Error; use super::mysql::Mysql; use super::sqlite::Sqlite; use super::{Builder, Database}; -#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, derive_more::Display, Clone)] -pub enum Driver { - Sqlite3, - MySQL, -} - -impl Driver { - /// . - /// - /// # Errors - /// - /// This function will return an error if unable to connect to the database. - pub fn build(&self, db_path: &str) -> Result, Error> { - let database = match self { - Driver::Sqlite3 => Builder::::build(db_path), - Driver::MySQL => Builder::::build(db_path), - }?; +/// . +/// +/// # Errors +/// +/// This function will return an error if unable to connect to the database. +pub fn build(driver: &DatabaseDriver, db_path: &str) -> Result, Error> { + let database = match driver { + DatabaseDriver::Sqlite3 => Builder::::build(db_path), + DatabaseDriver::MySQL => Builder::::build(db_path), + }?; - database.create_database_tables().expect("Could not create database tables."); + database.create_database_tables().expect("Could not create database tables."); - Ok(database) - } + Ok(database) } diff --git a/src/databases/error.rs b/src/databases/error.rs index 4bee82f19..68b732190 100644 --- a/src/databases/error.rs +++ b/src/databases/error.rs @@ -2,47 +2,46 @@ use std::panic::Location; use std::sync::Arc; use r2d2_mysql::mysql::UrlError; - -use super::driver::Driver; -use crate::located_error::{Located, LocatedError}; +use torrust_tracker_located_error::{Located, LocatedError}; +use torrust_tracker_primitives::DatabaseDriver; #[derive(thiserror::Error, Debug, Clone)] pub enum Error { #[error("The {driver} query unexpectedly returned nothing: {source}")] QueryReturnedNoRows { source: LocatedError<'static, dyn std::error::Error + Send + Sync>, - driver: Driver, + driver: DatabaseDriver, }, #[error("The {driver} query was malformed: {source}")] InvalidQuery { source: LocatedError<'static, dyn std::error::Error + Send + Sync>, - driver: Driver, + driver: DatabaseDriver, }, #[error("Unable to insert record into {driver} database, {location}")] InsertFailed { location: &'static Location<'static>, - driver: Driver, + driver: DatabaseDriver, }, #[error("Failed to remove record from {driver} database, error-code: {error_code}, {location}")] DeleteFailed { location: &'static Location<'static>, error_code: usize, - driver: Driver, + driver: DatabaseDriver, }, #[error("Failed to connect to {driver} database: {source}")] ConnectionError { source: LocatedError<'static, UrlError>, - driver: Driver, + driver: DatabaseDriver, }, #[error("Failed to create r2d2 {driver} connection pool: {source}")] ConnectionPool { source: LocatedError<'static, r2d2::Error>, - driver: Driver, + driver: DatabaseDriver, }, } @@ -52,11 +51,11 @@ impl From for Error { match err { r2d2_sqlite::rusqlite::Error::QueryReturnedNoRows => Error::QueryReturnedNoRows { source: (Arc::new(err) as Arc).into(), - driver: Driver::Sqlite3, + driver: DatabaseDriver::Sqlite3, }, _ => Error::InvalidQuery { source: (Arc::new(err) as Arc).into(), - driver: Driver::Sqlite3, + driver: DatabaseDriver::Sqlite3, }, } } @@ -68,7 +67,7 @@ impl From for Error { let e: Arc = Arc::new(err); Error::InvalidQuery { source: e.into(), - driver: Driver::MySQL, + driver: DatabaseDriver::MySQL, } } } @@ -78,14 +77,14 @@ impl From for Error { fn from(err: UrlError) -> Self { Self::ConnectionError { source: Located(err).into(), - driver: Driver::MySQL, + driver: DatabaseDriver::MySQL, } } } -impl From<(r2d2::Error, Driver)> for Error { +impl From<(r2d2::Error, DatabaseDriver)> for Error { #[track_caller] - fn from(e: (r2d2::Error, Driver)) -> Self { + fn from(e: (r2d2::Error, DatabaseDriver)) -> Self { let (err, driver) = e; Self::ConnectionPool { source: Located(err).into(), diff --git a/src/databases/mysql.rs b/src/databases/mysql.rs index ac54ebb82..503215071 100644 --- a/src/databases/mysql.rs +++ b/src/databases/mysql.rs @@ -7,14 +7,14 @@ use r2d2::Pool; use r2d2_mysql::mysql::prelude::Queryable; use r2d2_mysql::mysql::{params, Opts, OptsBuilder}; use r2d2_mysql::MysqlConnectionManager; +use torrust_tracker_primitives::DatabaseDriver; -use super::driver::Driver; use crate::databases::{Database, Error}; use crate::protocol::common::AUTH_KEY_LENGTH; use crate::protocol::info_hash::InfoHash; use crate::tracker::auth; -const DRIVER: Driver = Driver::MySQL; +const DRIVER: DatabaseDriver = DatabaseDriver::MySQL; pub struct Mysql { pool: Pool, diff --git a/src/databases/sqlite.rs b/src/databases/sqlite.rs index 3425b15c8..4dda91658 100644 --- a/src/databases/sqlite.rs +++ b/src/databases/sqlite.rs @@ -4,14 +4,14 @@ use std::str::FromStr; use async_trait::async_trait; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; +use torrust_tracker_primitives::DatabaseDriver; -use super::driver::Driver; use crate::databases::{Database, Error}; use crate::protocol::clock::DurationSinceUnixEpoch; use crate::protocol::info_hash::InfoHash; use crate::tracker::auth; -const DRIVER: Driver = Driver::Sqlite3; +const DRIVER: DatabaseDriver = DatabaseDriver::Sqlite3; pub struct Sqlite { pool: Pool, @@ -24,7 +24,7 @@ impl Database for Sqlite { /// Will return `r2d2::Error` if `db_path` is not able to create `SqLite` database. fn new(db_path: &str) -> Result { let cm = SqliteConnectionManager::file(db_path); - Pool::new(cm).map_or_else(|err| Err((err, Driver::Sqlite3).into()), |pool| Ok(Sqlite { pool })) + Pool::new(cm).map_or_else(|err| Err((err, DatabaseDriver::Sqlite3).into()), |pool| Ok(Sqlite { pool })) } fn create_database_tables(&self) -> Result<(), Error> { diff --git a/src/http/axum_implementation/requests/announce.rs b/src/http/axum_implementation/requests/announce.rs index 0f9a6fbfe..6e357ea6d 100644 --- a/src/http/axum_implementation/requests/announce.rs +++ b/src/http/axum_implementation/requests/announce.rs @@ -3,11 +3,11 @@ use std::panic::Location; use std::str::FromStr; use thiserror::Error; +use torrust_tracker_located_error::{Located, LocatedError}; use crate::http::axum_implementation::query::{ParseQueryError, Query}; use crate::http::axum_implementation::responses; use crate::http::percent_encoding::{percent_decode_info_hash, percent_decode_peer_id}; -use crate::located_error::{Located, LocatedError}; use crate::protocol::info_hash::{ConversionError, InfoHash}; use crate::tracker::peer::{self, IdConversionError}; diff --git a/src/http/warp_implementation/error.rs b/src/http/warp_implementation/error.rs index f07c32f6d..55b22c27a 100644 --- a/src/http/warp_implementation/error.rs +++ b/src/http/warp_implementation/error.rs @@ -1,10 +1,9 @@ use std::panic::Location; use thiserror::Error; +use torrust_tracker_located_error::LocatedError; use warp::reject::Reject; -use crate::located_error::LocatedError; - #[derive(Error, Debug)] pub enum Error { #[error("tracker server error: {source}")] diff --git a/src/http/warp_implementation/filter_helpers.rs b/src/http/warp_implementation/filter_helpers.rs index 89188d868..583d38352 100644 --- a/src/http/warp_implementation/filter_helpers.rs +++ b/src/http/warp_implementation/filter_helpers.rs @@ -3,8 +3,7 @@ use std::panic::Location; use std::str::FromStr; use thiserror::Error; - -use crate::located_error::{Located, LocatedError}; +use torrust_tracker_located_error::{Located, LocatedError}; #[derive(Error, Debug)] pub enum XForwardedForParseError { diff --git a/src/jobs/http_tracker.rs b/src/jobs/http_tracker.rs index aa96af884..ce546f608 100644 --- a/src/jobs/http_tracker.rs +++ b/src/jobs/http_tracker.rs @@ -5,8 +5,8 @@ use axum_server::tls_rustls::RustlsConfig; use log::{info, warn}; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use torrust_tracker_configuration::HttpTracker; -use crate::config::HttpTracker; use crate::http::axum_implementation::server; use crate::http::warp_implementation::server::Http; use crate::http::Version; diff --git a/src/jobs/torrent_cleanup.rs b/src/jobs/torrent_cleanup.rs index 073ceda61..4c4ed1f53 100644 --- a/src/jobs/torrent_cleanup.rs +++ b/src/jobs/torrent_cleanup.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use chrono::Utc; use log::info; use tokio::task::JoinHandle; +use torrust_tracker_configuration::Configuration; -use crate::config::Configuration; use crate::tracker; #[must_use] diff --git a/src/jobs/tracker_apis.rs b/src/jobs/tracker_apis.rs index 00e39eeba..85bb1b59f 100644 --- a/src/jobs/tracker_apis.rs +++ b/src/jobs/tracker_apis.rs @@ -4,9 +4,9 @@ use axum_server::tls_rustls::RustlsConfig; use log::info; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use torrust_tracker_configuration::HttpApi; use crate::apis::server; -use crate::config::HttpApi; use crate::tracker; #[derive(Debug)] diff --git a/src/jobs/udp_tracker.rs b/src/jobs/udp_tracker.rs index d0907c976..468f6dbbd 100644 --- a/src/jobs/udp_tracker.rs +++ b/src/jobs/udp_tracker.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use log::{error, info, warn}; use tokio::task::JoinHandle; +use torrust_tracker_configuration::UdpTracker; -use crate::config::UdpTracker; use crate::tracker; use crate::udp::server::Udp; diff --git a/src/lib.rs b/src/lib.rs index cbda2854c..f80bcfb6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,7 @@ pub mod apis; -pub mod config; pub mod databases; pub mod http; pub mod jobs; -pub mod located_error; pub mod logging; pub mod protocol; pub mod setup; @@ -32,3 +30,47 @@ pub mod ephemeral_instance_keys { pub static ref RANDOM_SEED: Seed = Rng::gen(&mut ThreadRng::default()); } } + +pub mod signals { + use log::info; + + /// Resolves on `ctrl_c` or the `terminate` signal. + pub async fn global_shutdown_signal() { + let ctrl_c = async { + tokio::signal::ctrl_c().await.expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {} + } + } + + /// Resolves when the `stop_receiver` or the `global_shutdown_signal()` resolves. + pub async fn shutdown_signal(stop_receiver: tokio::sync::oneshot::Receiver) { + let stop = async { stop_receiver.await.expect("Failed to install stop signal.") }; + + tokio::select! { + _ = stop => {}, + _ = global_shutdown_signal() => {} + } + } + + /// Same as `shutdown_signal()`, but shows a message when it resolves. + pub async fn shutdown_signal_with_message(stop_receiver: tokio::sync::oneshot::Receiver, message: String) { + shutdown_signal(stop_receiver).await; + + info!("{message}"); + } +} diff --git a/src/logging.rs b/src/logging.rs index 4d16f7670..83e2c9360 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -2,8 +2,7 @@ use std::str::FromStr; use std::sync::Once; use log::{info, LevelFilter}; - -use crate::config::Configuration; +use torrust_tracker_configuration::Configuration; static INIT: Once = Once::new(); diff --git a/src/main.rs b/src/main.rs index 199e8f5c5..fcb8331a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,9 +2,9 @@ use std::env; use std::sync::Arc; use log::info; -use torrust_tracker::config::Configuration; use torrust_tracker::stats::setup_statistics; use torrust_tracker::{ephemeral_instance_keys, logging, setup, static_time, tracker}; +use torrust_tracker_configuration::Configuration; #[tokio::main] async fn main() { diff --git a/src/setup.rs b/src/setup.rs index 3461667cc..c4aff03bb 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use log::warn; use tokio::task::JoinHandle; +use torrust_tracker_configuration::Configuration; -use crate::config::Configuration; use crate::http::Version; use crate::jobs::{http_tracker, torrent_cleanup, tracker_apis, udp_tracker}; use crate::tracker; diff --git a/src/tracker/auth.rs b/src/tracker/auth.rs index 197e0dc37..90aac6354 100644 --- a/src/tracker/auth.rs +++ b/src/tracker/auth.rs @@ -10,8 +10,8 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; use thiserror::Error; +use torrust_tracker_located_error::LocatedError; -use crate::located_error::LocatedError; use crate::protocol::clock::{Current, DurationSinceUnixEpoch, Time, TimeNow}; use crate::protocol::common::AUTH_KEY_LENGTH; diff --git a/src/tracker/error.rs b/src/tracker/error.rs index 51bcbf3bb..5057e7371 100644 --- a/src/tracker/error.rs +++ b/src/tracker/error.rs @@ -1,6 +1,6 @@ use std::panic::Location; -use crate::located_error::LocatedError; +use torrust_tracker_located_error::LocatedError; #[derive(thiserror::Error, Debug, Clone)] pub enum Error { diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index e01fe6a19..4f8b12fb4 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -1,6 +1,5 @@ pub mod auth; pub mod error; -pub mod mode; pub mod peer; pub mod services; pub mod statistics; @@ -15,18 +14,18 @@ use std::time::Duration; use tokio::sync::mpsc::error::SendError; use tokio::sync::{RwLock, RwLockReadGuard}; +use torrust_tracker_configuration::Configuration; +use torrust_tracker_primitives::TrackerMode; use self::error::Error; use self::peer::Peer; use self::torrent::SwamStats; -use crate::config::Configuration; -use crate::databases::driver::Driver; use crate::databases::{self, Database}; use crate::protocol::info_hash::InfoHash; pub struct Tracker { pub config: Arc, - mode: mode::Mode, + mode: TrackerMode, keys: RwLock>, whitelist: RwLock>, torrents: RwLock>, @@ -59,7 +58,7 @@ impl Tracker { stats_event_sender: Option>, stats_repository: statistics::Repo, ) -> Result { - let database = Driver::build(&config.db_driver, &config.db_path)?; + let database = databases::driver::build(&config.db_driver, &config.db_path)?; Ok(Tracker { config: config.clone(), @@ -74,15 +73,15 @@ impl Tracker { } pub fn is_public(&self) -> bool { - self.mode == mode::Mode::Public + self.mode == TrackerMode::Public } pub fn is_private(&self) -> bool { - self.mode == mode::Mode::Private || self.mode == mode::Mode::PrivateListed + self.mode == TrackerMode::Private || self.mode == TrackerMode::PrivateListed } pub fn is_whitelisted(&self) -> bool { - self.mode == mode::Mode::Listed || self.mode == mode::Mode::PrivateListed + self.mode == TrackerMode::Listed || self.mode == TrackerMode::PrivateListed } /// It handles an announce request @@ -418,12 +417,14 @@ fn assign_ip_address_to_peer(remote_client_ip: &IpAddr, tracker_external_ip: Opt mod tests { use std::sync::Arc; + use torrust_tracker_configuration::Configuration; + use torrust_tracker_test_helpers::configuration::ephemeral; + use super::statistics::Keeper; use super::{TorrentsMetrics, Tracker}; - use crate::config::{ephemeral_configuration, Configuration}; pub fn tracker_configuration() -> Arc { - Arc::new(ephemeral_configuration()) + Arc::new(ephemeral()) } pub fn tracker_factory() -> Tracker { diff --git a/src/tracker/services/common.rs b/src/tracker/services/common.rs index 8757e6a21..39aa3cc0b 100644 --- a/src/tracker/services/common.rs +++ b/src/tracker/services/common.rs @@ -1,6 +1,7 @@ use std::sync::Arc; -use crate::config::Configuration; +use torrust_tracker_configuration::Configuration; + use crate::tracker::statistics::Keeper; use crate::tracker::Tracker; diff --git a/src/tracker/services/statistics.rs b/src/tracker/services/statistics.rs index 745f5563c..c0aaf9c64 100644 --- a/src/tracker/services/statistics.rs +++ b/src/tracker/services/statistics.rs @@ -36,13 +36,15 @@ pub async fn get_metrics(tracker: Arc) -> TrackerMetrics { mod tests { use std::sync::Arc; - use crate::config::{ephemeral_configuration, Configuration}; + use torrust_tracker_configuration::Configuration; + use torrust_tracker_test_helpers::configuration::ephemeral; + use crate::tracker; use crate::tracker::services::common::tracker_factory; use crate::tracker::services::statistics::{get_metrics, TrackerMetrics}; pub fn tracker_configuration() -> Arc { - Arc::new(ephemeral_configuration()) + Arc::new(ephemeral()) } #[tokio::test] diff --git a/src/tracker/services/torrent.rs b/src/tracker/services/torrent.rs index e2353876e..ce652a091 100644 --- a/src/tracker/services/torrent.rs +++ b/src/tracker/services/torrent.rs @@ -137,14 +137,16 @@ mod tests { use std::str::FromStr; use std::sync::Arc; - use crate::config::{ephemeral_configuration, Configuration}; + use torrust_tracker_configuration::Configuration; + use torrust_tracker_test_helpers::configuration::ephemeral; + use crate::protocol::info_hash::InfoHash; use crate::tracker::services::common::tracker_factory; use crate::tracker::services::torrent::tests::sample_peer; use crate::tracker::services::torrent::{get_torrent_info, Info}; pub fn tracker_configuration() -> Arc { - Arc::new(ephemeral_configuration()) + Arc::new(ephemeral()) } #[tokio::test] @@ -190,14 +192,16 @@ mod tests { use std::str::FromStr; use std::sync::Arc; - use crate::config::{ephemeral_configuration, Configuration}; + use torrust_tracker_configuration::Configuration; + use torrust_tracker_test_helpers::configuration::ephemeral; + use crate::protocol::info_hash::InfoHash; use crate::tracker::services::common::tracker_factory; use crate::tracker::services::torrent::tests::sample_peer; use crate::tracker::services::torrent::{get_torrents, BasicInfo, Pagination}; pub fn tracker_configuration() -> Arc { - Arc::new(ephemeral_configuration()) + Arc::new(ephemeral()) } #[tokio::test] diff --git a/src/udp/error.rs b/src/udp/error.rs index de66eb2bf..a6381cc78 100644 --- a/src/udp/error.rs +++ b/src/udp/error.rs @@ -1,8 +1,7 @@ use std::panic::Location; use thiserror::Error; - -use crate::located_error::LocatedError; +use torrust_tracker_located_error::LocatedError; #[derive(Error, Debug)] pub enum Error { diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 8978beb70..9bd88c92e 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -258,31 +258,33 @@ mod tests { use std::sync::Arc; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; + use torrust_tracker_configuration::Configuration; + use torrust_tracker_primitives::TrackerMode; + use torrust_tracker_test_helpers::configuration::ephemeral; - use crate::config::{ephemeral_configuration, Configuration}; use crate::protocol::clock::{Current, Time}; - use crate::tracker::{self, mode, peer, statistics}; + use crate::tracker::{self, peer, statistics}; fn tracker_configuration() -> Arc { Arc::new(default_testing_tracker_configuration()) } fn default_testing_tracker_configuration() -> Configuration { - ephemeral_configuration() + ephemeral() } fn initialized_public_tracker() -> Arc { - let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(mode::Mode::Public).into()); + let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Public).into()); initialized_tracker(&configuration) } fn initialized_private_tracker() -> Arc { - let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(mode::Mode::Private).into()); + let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Private).into()); initialized_tracker(&configuration) } fn initialized_whitelisted_tracker() -> Arc { - let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(mode::Mode::Listed).into()); + let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Listed).into()); initialized_tracker(&configuration) } @@ -362,7 +364,7 @@ mod tests { self } - pub fn with_mode(mut self, mode: mode::Mode) -> Self { + pub fn with_mode(mut self, mode: TrackerMode) -> Self { self.configuration.mode = mode; self } diff --git a/src/udp/server.rs b/src/udp/server.rs index e85c81e9d..f74468189 100644 --- a/src/udp/server.rs +++ b/src/udp/server.rs @@ -1,25 +1,113 @@ +use std::future::Future; use std::io::Cursor; use std::net::SocketAddr; use std::sync::Arc; use aquatic_udp_protocol::Response; +use futures::pin_mut; use log::{debug, error, info}; use tokio::net::UdpSocket; +use tokio::task::JoinHandle; -use crate::tracker; +use crate::signals::shutdown_signal; +use crate::tracker::Tracker; use crate::udp::handlers::handle_packet; use crate::udp::MAX_PACKET_SIZE; +#[derive(Debug)] +pub enum Error { + Error(String), +} + +#[allow(clippy::module_name_repetitions)] +pub type StoppedUdpServer = UdpServer; +#[allow(clippy::module_name_repetitions)] +pub type RunningUdpServer = UdpServer; + +#[allow(clippy::module_name_repetitions)] +pub struct UdpServer { + pub cfg: torrust_tracker_configuration::UdpTracker, + pub tracker: Arc, + pub state: S, +} + +pub struct Stopped; + +pub struct Running { + pub bind_address: SocketAddr, + stop_job_sender: tokio::sync::oneshot::Sender, + job: JoinHandle<()>, +} + +impl UdpServer { + pub fn new(cfg: torrust_tracker_configuration::UdpTracker, tracker: Arc) -> Self { + Self { + cfg, + tracker, + state: Stopped {}, + } + } + + /// # Errors + /// + /// Will return `Err` if UDP can't bind to given bind address. + pub async fn start(self) -> Result, Error> { + let udp = Udp::new(self.tracker.clone(), &self.cfg.bind_address) + .await + .map_err(|e| Error::Error(e.to_string()))?; + + let bind_address = udp.socket.local_addr().map_err(|e| Error::Error(e.to_string()))?; + + let (sender, receiver) = tokio::sync::oneshot::channel::(); + + let job = tokio::spawn(async move { + udp.start_with_graceful_shutdown(shutdown_signal(receiver)).await; + }); + + let running_udp_server: UdpServer = UdpServer { + cfg: self.cfg, + tracker: self.tracker, + state: Running { + bind_address, + stop_job_sender: sender, + job, + }, + }; + + Ok(running_udp_server) + } +} + +impl UdpServer { + /// # Errors + /// + /// Will return `Err` if the oneshot channel to send the stop signal + /// has already been called once. + pub async fn stop(self) -> Result, Error> { + self.state.stop_job_sender.send(1).map_err(|e| Error::Error(e.to_string()))?; + + let _ = self.state.job.await; + + let stopped_api_server: UdpServer = UdpServer { + cfg: self.cfg, + tracker: self.tracker, + state: Stopped {}, + }; + + Ok(stopped_api_server) + } +} + pub struct Udp { socket: Arc, - tracker: Arc, + tracker: Arc, } impl Udp { /// # Errors /// /// Will return `Err` unable to bind to the supplied `bind_address`. - pub async fn new(tracker: Arc, bind_address: &str) -> tokio::io::Result { + pub async fn new(tracker: Arc, bind_address: &str) -> tokio::io::Result { let socket = UdpSocket::bind(bind_address).await?; Ok(Udp { @@ -57,6 +145,41 @@ impl Udp { } } + /// # Panics + /// + /// It would panic if unable to resolve the `local_addr` from the supplied ´socket´. + async fn start_with_graceful_shutdown(&self, shutdown_signal: F) + where + F: Future, + { + // Pin the future so that it doesn't move to the first loop iteration. + pin_mut!(shutdown_signal); + + loop { + let mut data = [0; MAX_PACKET_SIZE]; + let socket = self.socket.clone(); + let tracker = self.tracker.clone(); + + tokio::select! { + _ = &mut shutdown_signal => { + info!("Stopping UDP server: {}..", self.socket.local_addr().unwrap()); + break; + } + Ok((valid_bytes, remote_addr)) = socket.recv_from(&mut data) => { + let payload = data[..valid_bytes].to_vec(); + + info!("Received {} bytes", payload.len()); + debug!("From: {}", &remote_addr); + debug!("Payload: {:?}", payload); + + let response = handle_packet(remote_addr, payload, tracker).await; + + Udp::send_response(socket, remote_addr, response).await; + } + } + } + } + async fn send_response(socket: Arc, remote_addr: SocketAddr, response: Response) { let buffer = vec![0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(buffer); diff --git a/tests/api/mod.rs b/tests/api/mod.rs index 8dd6f4c53..fcb24e491 100644 --- a/tests/api/mod.rs +++ b/tests/api/mod.rs @@ -5,7 +5,7 @@ use torrust_tracker::tracker::Tracker; pub mod asserts; pub mod client; pub mod connection_info; -pub mod server; +pub mod test_environment; /// It forces a database error by dropping all tables. /// That makes any query fail. diff --git a/tests/api/server.rs b/tests/api/server.rs deleted file mode 100644 index 0e23a4320..000000000 --- a/tests/api/server.rs +++ /dev/null @@ -1,78 +0,0 @@ -use core::panic; -use std::sync::Arc; - -use torrust_tracker::config::{ephemeral_configuration, Configuration}; -use torrust_tracker::jobs::tracker_apis; -use torrust_tracker::protocol::info_hash::InfoHash; -use torrust_tracker::tracker::peer::Peer; -use torrust_tracker::tracker::statistics::Keeper; -use torrust_tracker::{ephemeral_instance_keys, logging, static_time, tracker}; - -use super::connection_info::ConnectionInfo; - -pub fn tracker_configuration() -> Arc { - Arc::new(ephemeral_configuration()) -} - -pub async fn start_default_api() -> Server { - let configuration = tracker_configuration(); - start_custom_api(configuration.clone()).await -} - -pub async fn start_custom_api(configuration: Arc) -> Server { - let server = start(&configuration); - tracker_apis::start_job(&configuration.http_api, server.tracker.clone()).await; - server -} - -fn start(configuration: &Arc) -> Server { - let connection_info = ConnectionInfo::authenticated( - &configuration.http_api.bind_address.clone(), - &configuration.http_api.access_tokens.get_key_value("admin").unwrap().1.clone(), - ); - - // Set the time of Torrust app starting - lazy_static::initialize(&static_time::TIME_AT_APP_START); - - // Initialize the Ephemeral Instance Random Seed - lazy_static::initialize(&ephemeral_instance_keys::RANDOM_SEED); - - // Initialize stats tracker - let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); - - // Initialize Torrust tracker - let tracker = match tracker::Tracker::new(configuration, Some(stats_event_sender), stats_repository) { - Ok(tracker) => Arc::new(tracker), - Err(error) => { - panic!("{}", error) - } - }; - - // Initialize logging - logging::setup(configuration); - - Server { - tracker, - connection_info, - } -} - -pub struct Server { - pub tracker: Arc, - pub connection_info: ConnectionInfo, -} - -impl Server { - pub fn get_connection_info(&self) -> ConnectionInfo { - self.connection_info.clone() - } - - pub fn get_bind_address(&self) -> String { - self.connection_info.bind_address.clone() - } - - /// Add a torrent to the tracker - pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; - } -} diff --git a/tests/api/test_environment.rs b/tests/api/test_environment.rs new file mode 100644 index 000000000..ff143ec7a --- /dev/null +++ b/tests/api/test_environment.rs @@ -0,0 +1,134 @@ +use core::panic; +use std::sync::Arc; + +use torrust_tracker::apis::server::{ApiServer, RunningApiServer, StoppedApiServer}; +use torrust_tracker::protocol::info_hash::InfoHash; +use torrust_tracker::tracker::peer::Peer; +use torrust_tracker::tracker::statistics::Keeper; +use torrust_tracker::tracker::Tracker; +use torrust_tracker::{ephemeral_instance_keys, logging, static_time}; +use torrust_tracker_configuration::Configuration; +use torrust_tracker_test_helpers::configuration::ephemeral; + +use super::connection_info::ConnectionInfo; + +#[allow(clippy::module_name_repetitions, dead_code)] +pub type StoppedTestEnvironment = TestEnvironment; +#[allow(clippy::module_name_repetitions)] +pub type RunningTestEnvironment = TestEnvironment; + +pub struct TestEnvironment { + pub tracker: Arc, + pub state: S, +} + +#[allow(dead_code)] +pub struct Stopped { + api_server: StoppedApiServer, +} + +pub struct Running { + api_server: RunningApiServer, +} + +impl TestEnvironment { + /// Add a torrent to the tracker + pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &Peer) { + self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + } +} + +impl TestEnvironment { + #[allow(dead_code)] + pub fn new_stopped() -> Self { + let api_server = api_server(); + + Self { + tracker: api_server.tracker.clone(), + state: Stopped { api_server }, + } + } + + #[allow(dead_code)] + pub fn start(self) -> TestEnvironment { + TestEnvironment { + tracker: self.tracker, + state: Running { + api_server: self.state.api_server.start().unwrap(), + }, + } + } +} + +impl TestEnvironment { + pub fn new_running() -> Self { + let api_server = running_api_server(); + + Self { + tracker: api_server.tracker.clone(), + state: Running { api_server }, + } + } + + pub async fn stop(self) -> TestEnvironment { + TestEnvironment { + tracker: self.tracker, + state: Stopped { + api_server: self.state.api_server.stop().await.unwrap(), + }, + } + } + + pub fn get_connection_info(&self) -> ConnectionInfo { + ConnectionInfo { + bind_address: self.state.api_server.state.bind_address.to_string(), + api_token: self.state.api_server.cfg.access_tokens.get("admin").cloned(), + } + } +} + +#[allow(clippy::module_name_repetitions)] +pub fn running_test_environment() -> RunningTestEnvironment { + TestEnvironment::new_running() +} + +pub fn tracker_configuration() -> Arc { + Arc::new(ephemeral()) +} + +// TODO: Move to test-helpers crate once `Tracker` is isolated. +pub fn tracker_instance(configuration: &Arc) -> Arc { + // Set the time of Torrust app starting + lazy_static::initialize(&static_time::TIME_AT_APP_START); + + // Initialize the Ephemeral Instance Random Seed + lazy_static::initialize(&ephemeral_instance_keys::RANDOM_SEED); + + // Initialize stats tracker + let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); + + // Initialize Torrust tracker + let tracker = match Tracker::new(configuration, Some(stats_event_sender), stats_repository) { + Ok(tracker) => Arc::new(tracker), + Err(error) => { + panic!("{}", error) + } + }; + + // Initialize logging + logging::setup(configuration); + + tracker +} + +pub fn api_server() -> StoppedApiServer { + let config = tracker_configuration(); + + let tracker = tracker_instance(&config); + + ApiServer::new(config.http_api.clone(), tracker) +} + +pub fn running_api_server() -> RunningApiServer { + api_server().start().unwrap() +} diff --git a/tests/http/server.rs b/tests/http/server.rs index 1c8d1cb77..147ad93c1 100644 --- a/tests/http/server.rs +++ b/tests/http/server.rs @@ -2,35 +2,35 @@ use core::panic; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use torrust_tracker::config::{ephemeral_configuration, Configuration}; use torrust_tracker::http::Version; use torrust_tracker::jobs::http_tracker; use torrust_tracker::protocol::info_hash::InfoHash; -use torrust_tracker::tracker::mode::Mode; use torrust_tracker::tracker::peer::Peer; use torrust_tracker::tracker::statistics::Keeper; use torrust_tracker::{ephemeral_instance_keys, logging, static_time, tracker}; +use torrust_tracker_configuration::{ephemeral_configuration, Configuration}; +use torrust_tracker_primitives::TrackerMode; use super::connection_info::ConnectionInfo; /// Starts a HTTP tracker with mode "public" in settings pub async fn start_public_http_tracker(version: Version) -> Server { let mut configuration = ephemeral_configuration(); - configuration.mode = Mode::Public; + configuration.mode = TrackerMode::Public; start_custom_http_tracker(Arc::new(configuration), version).await } /// Starts a HTTP tracker with mode "listed" in settings pub async fn start_whitelisted_http_tracker(version: Version) -> Server { let mut configuration = ephemeral_configuration(); - configuration.mode = Mode::Listed; + configuration.mode = TrackerMode::Listed; start_custom_http_tracker(Arc::new(configuration), version).await } /// Starts a HTTP tracker with mode "private" in settings pub async fn start_private_http_tracker(version: Version) -> Server { let mut configuration = ephemeral_configuration(); - configuration.mode = Mode::Private; + configuration.mode = TrackerMode::Private; start_custom_http_tracker(Arc::new(configuration), version).await } diff --git a/tests/tracker_api.rs b/tests/tracker_api.rs index 193c6487c..ef29dfbb6 100644 --- a/tests/tracker_api.rs +++ b/tests/tracker_api.rs @@ -27,74 +27,84 @@ mod tracker_apis { mod authentication { use crate::api::asserts::{assert_token_not_valid, assert_unauthorized}; use crate::api::client::Client; - use crate::api::server::start_default_api; + use crate::api::test_environment::running_test_environment; use crate::common::http::{Query, QueryParam}; #[tokio::test] async fn should_authenticate_requests_by_using_a_token_query_param() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - let token = api_server.get_connection_info().api_token.unwrap(); + let token = test_env.get_connection_info().api_token.unwrap(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_request_with_query("stats", Query::params([QueryParam::new("token", &token)].to_vec())) .await; assert_eq!(response.status(), 200); + + test_env.stop().await; } #[tokio::test] async fn should_not_authenticate_requests_when_the_token_is_missing() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_request_with_query("stats", Query::default()) .await; assert_unauthorized(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_not_authenticate_requests_when_the_token_is_empty() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_request_with_query("stats", Query::params([QueryParam::new("token", "")].to_vec())) .await; assert_token_not_valid(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_not_authenticate_requests_when_the_token_is_invalid() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_request_with_query("stats", Query::params([QueryParam::new("token", "INVALID TOKEN")].to_vec())) .await; assert_token_not_valid(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_allow_the_token_query_param_to_be_at_any_position_in_the_url_query() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - let token = api_server.get_connection_info().api_token.unwrap(); + let token = test_env.get_connection_info().api_token.unwrap(); // At the beginning of the query component - let response = Client::new(api_server.get_connection_info()) - .get_request(&format!("torrents?token={}&limit=1", &token)) + let response = Client::new(test_env.get_connection_info()) + .get_request(&format!("torrents?token={token}&limit=1")) .await; assert_eq!(response.status(), 200); // At the end of the query component - let response = Client::new(api_server.get_connection_info()) - .get_request(&format!("torrents?limit=1&token={}", &token)) + let response = Client::new(test_env.get_connection_info()) + .get_request(&format!("torrents?limit=1&token={token}")) .await; assert_eq!(response.status(), 200); + + test_env.stop().await; } } @@ -107,21 +117,21 @@ mod tracker_apis { use crate::api::asserts::{assert_stats, assert_token_not_valid, assert_unauthorized}; use crate::api::client::Client; use crate::api::connection_info::{connection_with_invalid_token, connection_with_no_token}; - use crate::api::server::start_default_api; + use crate::api::test_environment::running_test_environment; use crate::common::fixtures::PeerBuilder; #[tokio::test] async fn should_allow_getting_tracker_statistics() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - api_server + test_env .add_torrent_peer( &InfoHash::from_str("9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d").unwrap(), &PeerBuilder::default().into(), ) .await; - let response = Client::new(api_server.get_connection_info()).get_tracker_statistics().await; + let response = Client::new(test_env.get_connection_info()).get_tracker_statistics().await; assert_stats( response, @@ -145,23 +155,29 @@ mod tracker_apis { }, ) .await; + + test_env.stop().await; } #[tokio::test] async fn should_not_allow_getting_tracker_statistics_for_unauthenticated_users() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - let response = Client::new(connection_with_invalid_token(&api_server.get_bind_address())) - .get_tracker_statistics() - .await; + let response = Client::new(connection_with_invalid_token( + test_env.get_connection_info().bind_address.as_str(), + )) + .get_tracker_statistics() + .await; assert_token_not_valid(response).await; - let response = Client::new(connection_with_no_token(&api_server.get_bind_address())) + let response = Client::new(connection_with_no_token(test_env.get_connection_info().bind_address.as_str())) .get_tracker_statistics() .await; assert_unauthorized(response).await; + + test_env.stop().await; } } @@ -179,21 +195,19 @@ mod tracker_apis { }; use crate::api::client::Client; use crate::api::connection_info::{connection_with_invalid_token, connection_with_no_token}; - use crate::api::server::start_default_api; + use crate::api::test_environment::running_test_environment; use crate::common::fixtures::PeerBuilder; use crate::common::http::{Query, QueryParam}; #[tokio::test] async fn should_allow_getting_torrents() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let info_hash = InfoHash::from_str("9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d").unwrap(); - api_server.add_torrent_peer(&info_hash, &PeerBuilder::default().into()).await; + test_env.add_torrent_peer(&info_hash, &PeerBuilder::default().into()).await; - let response = Client::new(api_server.get_connection_info()) - .get_torrents(Query::empty()) - .await; + let response = Client::new(test_env.get_connection_info()).get_torrents(Query::empty()).await; assert_torrent_list( response, @@ -206,24 +220,22 @@ mod tracker_apis { }], ) .await; + + test_env.stop().await; } #[tokio::test] async fn should_allow_limiting_the_torrents_in_the_result() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); // torrents are ordered alphabetically by infohashes let info_hash_1 = InfoHash::from_str("9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d").unwrap(); let info_hash_2 = InfoHash::from_str("0b3aea4adc213ce32295be85d3883a63bca25446").unwrap(); - api_server - .add_torrent_peer(&info_hash_1, &PeerBuilder::default().into()) - .await; - api_server - .add_torrent_peer(&info_hash_2, &PeerBuilder::default().into()) - .await; + test_env.add_torrent_peer(&info_hash_1, &PeerBuilder::default().into()).await; + test_env.add_torrent_peer(&info_hash_2, &PeerBuilder::default().into()).await; - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_torrents(Query::params([QueryParam::new("limit", "1")].to_vec())) .await; @@ -238,24 +250,22 @@ mod tracker_apis { }], ) .await; + + test_env.stop().await; } #[tokio::test] async fn should_allow_the_torrents_result_pagination() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); // torrents are ordered alphabetically by infohashes let info_hash_1 = InfoHash::from_str("9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d").unwrap(); let info_hash_2 = InfoHash::from_str("0b3aea4adc213ce32295be85d3883a63bca25446").unwrap(); - api_server - .add_torrent_peer(&info_hash_1, &PeerBuilder::default().into()) - .await; - api_server - .add_torrent_peer(&info_hash_2, &PeerBuilder::default().into()) - .await; + test_env.add_torrent_peer(&info_hash_1, &PeerBuilder::default().into()).await; + test_env.add_torrent_peer(&info_hash_2, &PeerBuilder::default().into()).await; - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_torrents(Query::params([QueryParam::new("offset", "1")].to_vec())) .await; @@ -270,66 +280,76 @@ mod tracker_apis { }], ) .await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_getting_torrents_when_the_offset_query_parameter_cannot_be_parsed() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let invalid_offsets = [" ", "-1", "1.1", "INVALID OFFSET"]; for invalid_offset in &invalid_offsets { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_torrents(Query::params([QueryParam::new("offset", invalid_offset)].to_vec())) .await; assert_bad_request(response, "Failed to deserialize query string: invalid digit found in string").await; } + + test_env.stop().await; } #[tokio::test] async fn should_fail_getting_torrents_when_the_limit_query_parameter_cannot_be_parsed() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let invalid_limits = [" ", "-1", "1.1", "INVALID LIMIT"]; for invalid_limit in &invalid_limits { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_torrents(Query::params([QueryParam::new("limit", invalid_limit)].to_vec())) .await; assert_bad_request(response, "Failed to deserialize query string: invalid digit found in string").await; } + + test_env.stop().await; } #[tokio::test] async fn should_not_allow_getting_torrents_for_unauthenticated_users() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - let response = Client::new(connection_with_invalid_token(&api_server.get_bind_address())) - .get_torrents(Query::empty()) - .await; + let response = Client::new(connection_with_invalid_token( + test_env.get_connection_info().bind_address.as_str(), + )) + .get_torrents(Query::empty()) + .await; assert_token_not_valid(response).await; - let response = Client::new(connection_with_no_token(&api_server.get_bind_address())) + let response = Client::new(connection_with_no_token(test_env.get_connection_info().bind_address.as_str())) .get_torrents(Query::default()) .await; assert_unauthorized(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_allow_getting_a_torrent_info() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let info_hash = InfoHash::from_str("9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d").unwrap(); let peer = PeerBuilder::default().into(); - api_server.add_torrent_peer(&info_hash, &peer).await; + test_env.add_torrent_peer(&info_hash, &peer).await; - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_torrent(&info_hash.to_string()) .await; @@ -344,27 +364,31 @@ mod tracker_apis { }, ) .await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_while_getting_a_torrent_info_when_the_torrent_does_not_exist() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let info_hash = InfoHash::from_str("9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d").unwrap(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_torrent(&info_hash.to_string()) .await; assert_torrent_not_known(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_getting_a_torrent_info_when_the_provided_infohash_is_invalid() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); for invalid_infohash in &invalid_infohashes_returning_bad_request() { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_torrent(invalid_infohash) .await; @@ -372,33 +396,39 @@ mod tracker_apis { } for invalid_infohash in &invalid_infohashes_returning_not_found() { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .get_torrent(invalid_infohash) .await; assert_not_found(response).await; } + + test_env.stop().await; } #[tokio::test] async fn should_not_allow_getting_a_torrent_info_for_unauthenticated_users() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let info_hash = InfoHash::from_str("9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d").unwrap(); - api_server.add_torrent_peer(&info_hash, &PeerBuilder::default().into()).await; + test_env.add_torrent_peer(&info_hash, &PeerBuilder::default().into()).await; - let response = Client::new(connection_with_invalid_token(&api_server.get_bind_address())) - .get_torrent(&info_hash.to_string()) - .await; + let response = Client::new(connection_with_invalid_token( + test_env.get_connection_info().bind_address.as_str(), + )) + .get_torrent(&info_hash.to_string()) + .await; assert_token_not_valid(response).await; - let response = Client::new(connection_with_no_token(&api_server.get_bind_address())) + let response = Client::new(connection_with_no_token(test_env.get_connection_info().bind_address.as_str())) .get_torrent(&info_hash.to_string()) .await; assert_unauthorized(response).await; + + test_env.stop().await; } } @@ -416,82 +446,92 @@ mod tracker_apis { use crate::api::client::Client; use crate::api::connection_info::{connection_with_invalid_token, connection_with_no_token}; use crate::api::force_database_error; - use crate::api::server::start_default_api; + use crate::api::test_environment::running_test_environment; #[tokio::test] async fn should_allow_whitelisting_a_torrent() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let info_hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .whitelist_a_torrent(&info_hash) .await; assert_ok(response).await; assert!( - api_server + test_env .tracker .is_info_hash_whitelisted(&InfoHash::from_str(&info_hash).unwrap()) .await ); + + test_env.stop().await; } #[tokio::test] async fn should_allow_whitelisting_a_torrent_that_has_been_already_whitelisted() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let info_hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); - let api_client = Client::new(api_server.get_connection_info()); + let api_client = Client::new(test_env.get_connection_info()); let response = api_client.whitelist_a_torrent(&info_hash).await; assert_ok(response).await; let response = api_client.whitelist_a_torrent(&info_hash).await; assert_ok(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_not_allow_whitelisting_a_torrent_for_unauthenticated_users() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let info_hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); - let response = Client::new(connection_with_invalid_token(&api_server.get_bind_address())) - .whitelist_a_torrent(&info_hash) - .await; + let response = Client::new(connection_with_invalid_token( + test_env.get_connection_info().bind_address.as_str(), + )) + .whitelist_a_torrent(&info_hash) + .await; assert_token_not_valid(response).await; - let response = Client::new(connection_with_no_token(&api_server.get_bind_address())) + let response = Client::new(connection_with_no_token(test_env.get_connection_info().bind_address.as_str())) .whitelist_a_torrent(&info_hash) .await; assert_unauthorized(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_when_the_torrent_cannot_be_whitelisted() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let info_hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); - force_database_error(&api_server.tracker); + force_database_error(&test_env.tracker); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .whitelist_a_torrent(&info_hash) .await; assert_failed_to_whitelist_torrent(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_whitelisting_a_torrent_when_the_provided_infohash_is_invalid() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); for invalid_infohash in &invalid_infohashes_returning_bad_request() { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .whitelist_a_torrent(invalid_infohash) .await; @@ -499,49 +539,55 @@ mod tracker_apis { } for invalid_infohash in &invalid_infohashes_returning_not_found() { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .whitelist_a_torrent(invalid_infohash) .await; assert_not_found(response).await; } + + test_env.stop().await; } #[tokio::test] async fn should_allow_removing_a_torrent_from_the_whitelist() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - api_server.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); + test_env.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .remove_torrent_from_whitelist(&hash) .await; assert_ok(response).await; - assert!(!api_server.tracker.is_info_hash_whitelisted(&info_hash).await); + assert!(!test_env.tracker.is_info_hash_whitelisted(&info_hash).await); + + test_env.stop().await; } #[tokio::test] async fn should_not_fail_trying_to_remove_a_non_whitelisted_torrent_from_the_whitelist() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let non_whitelisted_torrent_hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .remove_torrent_from_whitelist(&non_whitelisted_torrent_hash) .await; assert_ok(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_removing_a_torrent_from_the_whitelist_when_the_provided_infohash_is_invalid() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); for invalid_infohash in &invalid_infohashes_returning_bad_request() { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .remove_torrent_from_whitelist(invalid_infohash) .await; @@ -549,89 +595,101 @@ mod tracker_apis { } for invalid_infohash in &invalid_infohashes_returning_not_found() { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .remove_torrent_from_whitelist(invalid_infohash) .await; assert_not_found(response).await; } + + test_env.stop().await; } #[tokio::test] async fn should_fail_when_the_torrent_cannot_be_removed_from_the_whitelist() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - api_server.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); + test_env.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); - force_database_error(&api_server.tracker); + force_database_error(&test_env.tracker); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .remove_torrent_from_whitelist(&hash) .await; assert_failed_to_remove_torrent_from_whitelist(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_not_allow_removing_a_torrent_from_the_whitelist_for_unauthenticated_users() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - api_server.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); - let response = Client::new(connection_with_invalid_token(&api_server.get_bind_address())) - .remove_torrent_from_whitelist(&hash) - .await; + test_env.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); + let response = Client::new(connection_with_invalid_token( + test_env.get_connection_info().bind_address.as_str(), + )) + .remove_torrent_from_whitelist(&hash) + .await; assert_token_not_valid(response).await; - api_server.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); - let response = Client::new(connection_with_no_token(&api_server.get_bind_address())) + test_env.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); + let response = Client::new(connection_with_no_token(test_env.get_connection_info().bind_address.as_str())) .remove_torrent_from_whitelist(&hash) .await; assert_unauthorized(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_allow_reload_the_whitelist_from_the_database() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - api_server.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); + test_env.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); - let response = Client::new(api_server.get_connection_info()).reload_whitelist().await; + let response = Client::new(test_env.get_connection_info()).reload_whitelist().await; assert_ok(response).await; /* todo: this assert fails because the whitelist has not been reloaded yet. We could add a new endpoint GET /api/whitelist/:info_hash to check if a torrent is whitelisted and use that endpoint to check if the torrent is still there after reloading. assert!( - !(api_server + !(test_env .tracker .is_info_hash_whitelisted(&InfoHash::from_str(&info_hash).unwrap()) .await) ); */ + + test_env.stop().await; } #[tokio::test] async fn should_fail_when_the_whitelist_cannot_be_reloaded_from_the_database() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - api_server.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); + test_env.tracker.add_torrent_to_whitelist(&info_hash).await.unwrap(); - force_database_error(&api_server.tracker); + force_database_error(&test_env.tracker); - let response = Client::new(api_server.get_connection_info()).reload_whitelist().await; + let response = Client::new(test_env.get_connection_info()).reload_whitelist().await; assert_failed_to_reload_whitelist(response).await; + + test_env.stop().await; } } @@ -648,50 +706,52 @@ mod tracker_apis { use crate::api::client::Client; use crate::api::connection_info::{connection_with_invalid_token, connection_with_no_token}; use crate::api::force_database_error; - use crate::api::server::start_default_api; + use crate::api::test_environment::running_test_environment; #[tokio::test] async fn should_allow_generating_a_new_auth_key() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let seconds_valid = 60; - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .generate_auth_key(seconds_valid) .await; let auth_key_resource = assert_auth_key_utf8(response).await; // Verify the key with the tracker - assert!(api_server - .tracker - .verify_auth_key(&Key::from(auth_key_resource)) - .await - .is_ok()); + assert!(test_env.tracker.verify_auth_key(&Key::from(auth_key_resource)).await.is_ok()); + + test_env.stop().await; } #[tokio::test] async fn should_not_allow_generating_a_new_auth_key_for_unauthenticated_users() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let seconds_valid = 60; - let response = Client::new(connection_with_invalid_token(&api_server.get_bind_address())) - .generate_auth_key(seconds_valid) - .await; + let response = Client::new(connection_with_invalid_token( + test_env.get_connection_info().bind_address.as_str(), + )) + .generate_auth_key(seconds_valid) + .await; assert_token_not_valid(response).await; - let response = Client::new(connection_with_no_token(&api_server.get_bind_address())) + let response = Client::new(connection_with_no_token(test_env.get_connection_info().bind_address.as_str())) .generate_auth_key(seconds_valid) .await; assert_unauthorized(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_generating_a_new_auth_key_when_the_key_duration_is_invalid() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let invalid_key_durations = [ // "", it returns 404 @@ -700,49 +760,55 @@ mod tracker_apis { ]; for invalid_key_duration in &invalid_key_durations { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .post(&format!("key/{}", &invalid_key_duration)) .await; assert_invalid_key_duration_param(response, invalid_key_duration).await; } + + test_env.stop().await; } #[tokio::test] async fn should_fail_when_the_auth_key_cannot_be_generated() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); - force_database_error(&api_server.tracker); + force_database_error(&test_env.tracker); let seconds_valid = 60; - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .generate_auth_key(seconds_valid) .await; assert_failed_to_generate_key(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_allow_deleting_an_auth_key() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let seconds_valid = 60; - let auth_key = api_server + let auth_key = test_env .tracker .generate_auth_key(Duration::from_secs(seconds_valid)) .await .unwrap(); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .delete_auth_key(&auth_key.key) .await; assert_ok(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_deleting_an_auth_key_when_the_key_id_is_invalid() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let invalid_auth_key_ids = [ // "", it returns a 404 @@ -755,123 +821,139 @@ mod tracker_apis { ]; for invalid_auth_key_id in &invalid_auth_key_ids { - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .delete_auth_key(invalid_auth_key_id) .await; assert_invalid_auth_key_param(response, invalid_auth_key_id).await; } + + test_env.stop().await; } #[tokio::test] async fn should_fail_when_the_auth_key_cannot_be_deleted() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let seconds_valid = 60; - let auth_key = api_server + let auth_key = test_env .tracker .generate_auth_key(Duration::from_secs(seconds_valid)) .await .unwrap(); - force_database_error(&api_server.tracker); + force_database_error(&test_env.tracker); - let response = Client::new(api_server.get_connection_info()) + let response = Client::new(test_env.get_connection_info()) .delete_auth_key(&auth_key.key) .await; assert_failed_to_delete_key(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_not_allow_deleting_an_auth_key_for_unauthenticated_users() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let seconds_valid = 60; // Generate new auth key - let auth_key = api_server + let auth_key = test_env .tracker .generate_auth_key(Duration::from_secs(seconds_valid)) .await .unwrap(); - let response = Client::new(connection_with_invalid_token(&api_server.get_bind_address())) - .delete_auth_key(&auth_key.key) - .await; + let response = Client::new(connection_with_invalid_token( + test_env.get_connection_info().bind_address.as_str(), + )) + .delete_auth_key(&auth_key.key) + .await; assert_token_not_valid(response).await; // Generate new auth key - let auth_key = api_server + let auth_key = test_env .tracker .generate_auth_key(Duration::from_secs(seconds_valid)) .await .unwrap(); - let response = Client::new(connection_with_no_token(&api_server.get_bind_address())) + let response = Client::new(connection_with_no_token(test_env.get_connection_info().bind_address.as_str())) .delete_auth_key(&auth_key.key) .await; assert_unauthorized(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_allow_reloading_keys() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let seconds_valid = 60; - api_server + test_env .tracker .generate_auth_key(Duration::from_secs(seconds_valid)) .await .unwrap(); - let response = Client::new(api_server.get_connection_info()).reload_keys().await; + let response = Client::new(test_env.get_connection_info()).reload_keys().await; assert_ok(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_fail_when_keys_cannot_be_reloaded() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let seconds_valid = 60; - api_server + test_env .tracker .generate_auth_key(Duration::from_secs(seconds_valid)) .await .unwrap(); - force_database_error(&api_server.tracker); + force_database_error(&test_env.tracker); - let response = Client::new(api_server.get_connection_info()).reload_keys().await; + let response = Client::new(test_env.get_connection_info()).reload_keys().await; assert_failed_to_reload_keys(response).await; + + test_env.stop().await; } #[tokio::test] async fn should_not_allow_reloading_keys_for_unauthenticated_users() { - let api_server = start_default_api().await; + let test_env = running_test_environment(); let seconds_valid = 60; - api_server + test_env .tracker .generate_auth_key(Duration::from_secs(seconds_valid)) .await .unwrap(); - let response = Client::new(connection_with_invalid_token(&api_server.get_bind_address())) - .reload_keys() - .await; + let response = Client::new(connection_with_invalid_token( + test_env.get_connection_info().bind_address.as_str(), + )) + .reload_keys() + .await; assert_token_not_valid(response).await; - let response = Client::new(connection_with_no_token(&api_server.get_bind_address())) + let response = Client::new(connection_with_no_token(test_env.get_connection_info().bind_address.as_str())) .reload_keys() .await; assert_unauthorized(response).await; + + test_env.stop().await; } } } diff --git a/tests/udp/client.rs b/tests/udp/client.rs index 3cb4d6134..0bec03d03 100644 --- a/tests/udp/client.rs +++ b/tests/udp/client.rs @@ -1,41 +1,54 @@ use std::io::Cursor; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; use aquatic_udp_protocol::{Request, Response}; -use rand::{thread_rng, Rng}; +use tokio::net::UdpSocket; use torrust_tracker::udp::MAX_PACKET_SIZE; -use crate::common::udp::Client as UdpClient; +use crate::udp::source_address; -/// Creates a new generic UDP client connected to a generic UDP server -pub async fn new_udp_client_connected(remote_address: &SocketAddr) -> UdpClient { - let local_address = loopback_socket_address(ephemeral_random_client_port()); - UdpClient::connected(remote_address, &local_address).await +#[allow(clippy::module_name_repetitions)] +pub struct UdpClient { + pub socket: Arc, } -/// Creates a new UDP tracker client connected to a UDP tracker server -pub async fn new_udp_tracker_client_connected(remote_address: &SocketAddr) -> Client { - let udp_client = new_udp_client_connected(remote_address).await; - Client { udp_client } -} +impl UdpClient { + pub async fn bind(local_address: &str) -> Self { + let socket = UdpSocket::bind(local_address).await.unwrap(); + Self { + socket: Arc::new(socket), + } + } -pub fn ephemeral_random_client_port() -> u16 { - // todo: this may produce random test failures because two tests can try to bind the same port. - // We could create a pool of available ports (with read/write lock) - let mut rng = thread_rng(); - rng.gen_range(49152..65535) + pub async fn connect(&self, remote_address: &str) { + self.socket.connect(remote_address).await.unwrap(); + } + + pub async fn send(&self, bytes: &[u8]) -> usize { + self.socket.writable().await.unwrap(); + self.socket.send(bytes).await.unwrap() + } + + pub async fn receive(&self, bytes: &mut [u8]) -> usize { + self.socket.readable().await.unwrap(); + self.socket.recv(bytes).await.unwrap() + } } -fn loopback_socket_address(port: u16) -> SocketAddr { - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port) +/// Creates a new `UdpClient` connected to a Udp server +pub async fn new_udp_client_connected(remote_address: &str) -> UdpClient { + let port = 0; // Let OS choose an unused port. + let client = UdpClient::bind(&source_address(port)).await; + client.connect(remote_address).await; + client } -/// A UDP tracker client -pub struct Client { - pub udp_client: UdpClient, // A generic UDP client +#[allow(clippy::module_name_repetitions)] +pub struct UdpTrackerClient { + pub udp_client: UdpClient, } -impl Client { +impl UdpTrackerClient { pub async fn send(&self, request: Request) -> usize { // Write request into a buffer let request_buffer = vec![0u8; MAX_PACKET_SIZE]; @@ -63,3 +76,9 @@ impl Client { Response::from_bytes(&response_buffer[..payload_size], true).unwrap() } } + +/// Creates a new `UdpTrackerClient` connected to a Udp Tracker server +pub async fn new_udp_tracker_client_connected(remote_address: &str) -> UdpTrackerClient { + let udp_client = new_udp_client_connected(remote_address).await; + UdpTrackerClient { udp_client } +} diff --git a/tests/udp/mod.rs b/tests/udp/mod.rs index 16a77bb99..f45a4a4f9 100644 --- a/tests/udp/mod.rs +++ b/tests/udp/mod.rs @@ -1,3 +1,8 @@ pub mod asserts; pub mod client; -pub mod server; +pub mod test_environment; + +/// Generates the source address for the UDP client +fn source_address(port: u16) -> String { + format!("127.0.0.1:{port}") +} diff --git a/tests/udp/server.rs b/tests/udp/server.rs deleted file mode 100644 index 401d4cf92..000000000 --- a/tests/udp/server.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -use tokio::task::JoinHandle; -use torrust_tracker::config::{ephemeral_configuration, Configuration}; -use torrust_tracker::jobs::udp_tracker; -use torrust_tracker::tracker::statistics::Keeper; -use torrust_tracker::{ephemeral_instance_keys, logging, static_time, tracker}; - -pub fn start_udp_tracker(configuration: &Arc) -> Server { - let mut udp_server = Server::new(); - udp_server.start(configuration); - udp_server -} - -pub fn tracker_configuration() -> Arc { - Arc::new(ephemeral_configuration()) -} -pub struct Server { - pub started: AtomicBool, - pub job: Option>, - pub bind_address: Option, -} - -impl Server { - pub fn new() -> Self { - Self { - started: AtomicBool::new(false), - job: None, - bind_address: None, - } - } - - pub fn start(&mut self, configuration: &Arc) { - if !self.started.load(Ordering::Relaxed) { - // Set the time of Torrust app starting - lazy_static::initialize(&static_time::TIME_AT_APP_START); - - // Initialize the Ephemeral Instance Random Seed - lazy_static::initialize(&ephemeral_instance_keys::RANDOM_SEED); - - // Initialize stats tracker - let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); - - // Initialize Torrust tracker - let tracker = match tracker::Tracker::new(&configuration.clone(), Some(stats_event_sender), stats_repository) { - Ok(tracker) => Arc::new(tracker), - Err(error) => { - panic!("{}", error) - } - }; - - // Initialize logging - logging::setup(configuration); - - let udp_tracker_config = &configuration.udp_trackers[0]; - - // Start the UDP tracker job - self.job = Some(udp_tracker::start_job(udp_tracker_config, tracker)); - - self.bind_address = Some(udp_tracker_config.bind_address.parse().unwrap()); - - self.started.store(true, Ordering::Relaxed); - } - } -} diff --git a/tests/udp/test_environment.rs b/tests/udp/test_environment.rs new file mode 100644 index 000000000..e53a7a580 --- /dev/null +++ b/tests/udp/test_environment.rs @@ -0,0 +1,131 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use torrust_tracker::protocol::info_hash::InfoHash; +use torrust_tracker::tracker::peer::Peer; +use torrust_tracker::tracker::statistics::Keeper; +use torrust_tracker::tracker::Tracker; +use torrust_tracker::udp::server::{RunningUdpServer, StoppedUdpServer, UdpServer}; +use torrust_tracker::{ephemeral_instance_keys, logging, static_time}; +use torrust_tracker_configuration::Configuration; +use torrust_tracker_test_helpers::configuration::ephemeral; + +fn tracker_configuration() -> Arc { + Arc::new(ephemeral()) +} + +#[allow(clippy::module_name_repetitions, dead_code)] +pub type StoppedTestEnvironment = TestEnvironment; +#[allow(clippy::module_name_repetitions)] +pub type RunningTestEnvironment = TestEnvironment; + +pub struct TestEnvironment { + pub tracker: Arc, + pub state: S, +} + +#[allow(dead_code)] +pub struct Stopped { + api_server: StoppedUdpServer, +} + +pub struct Running { + api_server: RunningUdpServer, +} + +impl TestEnvironment { + /// Add a torrent to the tracker + #[allow(dead_code)] + pub async fn add_torrent(&self, info_hash: &InfoHash, peer: &Peer) { + self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + } +} + +impl TestEnvironment { + #[allow(dead_code)] + pub fn new_stopped() -> Self { + let udp_server = udp_server(); + + Self { + tracker: udp_server.tracker.clone(), + state: Stopped { api_server: udp_server }, + } + } + + #[allow(dead_code)] + pub async fn start(self) -> TestEnvironment { + TestEnvironment { + tracker: self.tracker, + state: Running { + api_server: self.state.api_server.start().await.unwrap(), + }, + } + } +} + +impl TestEnvironment { + pub async fn new_running() -> Self { + let udp_server = running_udp_server().await; + + Self { + tracker: udp_server.tracker.clone(), + state: Running { api_server: udp_server }, + } + } + + #[allow(dead_code)] + pub async fn stop(self) -> TestEnvironment { + TestEnvironment { + tracker: self.tracker, + state: Stopped { + api_server: self.state.api_server.stop().await.unwrap(), + }, + } + } + + pub fn bind_address(&self) -> SocketAddr { + self.state.api_server.state.bind_address + } +} + +#[allow(clippy::module_name_repetitions)] +pub async fn running_test_environment() -> RunningTestEnvironment { + TestEnvironment::new_running().await +} + +// TODO: Move to test-helpers crate once `Tracker` is isolated. +pub fn tracker_instance(configuration: &Arc) -> Arc { + // Set the time of Torrust app starting + lazy_static::initialize(&static_time::TIME_AT_APP_START); + + // Initialize the Ephemeral Instance Random Seed + lazy_static::initialize(&ephemeral_instance_keys::RANDOM_SEED); + + // Initialize stats tracker + let (stats_event_sender, stats_repository) = Keeper::new_active_instance(); + + // Initialize Torrust tracker + let tracker = match Tracker::new(configuration, Some(stats_event_sender), stats_repository) { + Ok(tracker) => Arc::new(tracker), + Err(error) => { + panic!("{}", error) + } + }; + + // Initialize logging + logging::setup(configuration); + + tracker +} + +pub fn udp_server() -> StoppedUdpServer { + let config = tracker_configuration(); + + let tracker = tracker_instance(&config); + + UdpServer::new(config.udp_trackers[0].clone(), tracker) +} + +pub async fn running_udp_server() -> RunningUdpServer { + udp_server().start().await.unwrap() +} diff --git a/tests/udp_tracker.rs b/tests/udp_tracker.rs index 0287d01b7..b7cc3bd6f 100644 --- a/tests/udp_tracker.rs +++ b/tests/udp_tracker.rs @@ -19,8 +19,8 @@ mod udp_tracker_server { use torrust_tracker::udp::MAX_PACKET_SIZE; use crate::udp::asserts::is_error_response; - use crate::udp::client::{new_udp_client_connected, Client}; - use crate::udp::server::{start_udp_tracker, tracker_configuration}; + use crate::udp::client::{new_udp_client_connected, UdpTrackerClient}; + use crate::udp::test_environment::running_test_environment; fn empty_udp_request() -> [u8; MAX_PACKET_SIZE] { [0; MAX_PACKET_SIZE] @@ -30,7 +30,7 @@ mod udp_tracker_server { [0; MAX_PACKET_SIZE] } - async fn send_connection_request(transaction_id: TransactionId, client: &Client) -> ConnectionId { + async fn send_connection_request(transaction_id: TransactionId, client: &UdpTrackerClient) -> ConnectionId { let connect_request = ConnectRequest { transaction_id }; client.send(connect_request.into()).await; @@ -45,11 +45,9 @@ mod udp_tracker_server { #[tokio::test] async fn should_return_a_bad_request_response_when_the_client_sends_an_empty_request() { - let configuration = tracker_configuration(); + let test_env = running_test_environment().await; - let udp_server = start_udp_tracker(&configuration); - - let client = new_udp_client_connected(&udp_server.bind_address.unwrap()).await; + let client = new_udp_client_connected(&test_env.bind_address().to_string()).await; client.send(&empty_udp_request()).await; @@ -65,15 +63,13 @@ mod udp_tracker_server { use crate::udp::asserts::is_connect_response; use crate::udp::client::new_udp_tracker_client_connected; - use crate::udp::server::{start_udp_tracker, tracker_configuration}; + use crate::udp::test_environment::running_test_environment; #[tokio::test] async fn should_return_a_connect_response() { - let configuration = tracker_configuration(); - - let udp_server = start_udp_tracker(&configuration); + let test_env = running_test_environment().await; - let client = new_udp_tracker_client_connected(&udp_server.bind_address.unwrap()).await; + let client = new_udp_tracker_client_connected(&test_env.bind_address().to_string()).await; let connect_request = ConnectRequest { transaction_id: TransactionId(123), @@ -97,16 +93,14 @@ mod udp_tracker_server { use crate::udp::asserts::is_ipv4_announce_response; use crate::udp::client::new_udp_tracker_client_connected; - use crate::udp::server::{start_udp_tracker, tracker_configuration}; + use crate::udp::test_environment::running_test_environment; use crate::udp_tracker_server::send_connection_request; #[tokio::test] async fn should_return_an_announce_response() { - let configuration = tracker_configuration(); + let test_env = running_test_environment().await; - let udp_server = start_udp_tracker(&configuration); - - let client = new_udp_tracker_client_connected(&udp_server.bind_address.unwrap()).await; + let client = new_udp_tracker_client_connected(&test_env.bind_address().to_string()).await; let connection_id = send_connection_request(TransactionId(123), &client).await; @@ -140,16 +134,14 @@ mod udp_tracker_server { use crate::udp::asserts::is_scrape_response; use crate::udp::client::new_udp_tracker_client_connected; - use crate::udp::server::{start_udp_tracker, tracker_configuration}; + use crate::udp::test_environment::running_test_environment; use crate::udp_tracker_server::send_connection_request; #[tokio::test] async fn should_return_a_scrape_response() { - let configuration = tracker_configuration(); - - let udp_server = start_udp_tracker(&configuration); + let test_env = running_test_environment().await; - let client = new_udp_tracker_client_connected(&udp_server.bind_address.unwrap()).await; + let client = new_udp_tracker_client_connected(&test_env.bind_address().to_string()).await; let connection_id = send_connection_request(TransactionId(123), &client).await;