diff --git a/.git-blame-ignore b/.git-blame-ignore new file mode 100644 index 000000000..06c439a36 --- /dev/null +++ b/.git-blame-ignore @@ -0,0 +1,4 @@ +# https://git-scm.com/docs/git-blame#Documentation/git-blame.txt---ignore-revs-fileltfilegt + +# Format the world! +57bf2000e39dccfc2f8b6e41d6c6f3eac38a3886 diff --git a/.github/workflows/test_build_release.yml b/.github/workflows/test_build_release.yml index d848ed653..87f6a9488 100644 --- a/.github/workflows/test_build_release.yml +++ b/.github/workflows/test_build_release.yml @@ -4,7 +4,23 @@ name: CI on: [push, pull_request] jobs: + format: + runs-on: ubuntu-latest + env: + CARGO_TERM_COLOR: always + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + components: rustfmt + - uses: Swatinem/rust-cache@v1 + - name: Check Rust Formatting + run: cargo fmt --check + test: + needs: format runs-on: ubuntu-latest env: CARGO_TERM_COLOR: always @@ -18,7 +34,7 @@ jobs: - uses: Swatinem/rust-cache@v1 - uses: taiki-e/install-action@cargo-llvm-cov - uses: taiki-e/install-action@nextest - - name: Run tests + - name: Run Tests run: cargo llvm-cov nextest build: @@ -37,9 +53,9 @@ jobs: profile: minimal toolchain: stable - uses: Swatinem/rust-cache@v1 - - name: Build torrust tracker + - name: Build Torrust Tracker run: cargo build --release - - name: Upload build artifact + - name: Upload Build Artifact uses: actions/upload-artifact@v2 with: name: torrust-tracker @@ -49,7 +65,7 @@ jobs: needs: build runs-on: ubuntu-latest steps: - - name: Download build artifact + - name: Download Build Artifact uses: actions/download-artifact@v2 with: name: torrust-tracker diff --git a/.gitignore b/.gitignore index 99a07430b..e2956b2d6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ /.idea/ /config.toml /data.db +/.vscode/launch.json diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 000000000..3e878b271 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,4 @@ +max_width = 130 +imports_granularity = "Module" +group_imports = "StdExternalCrate" + diff --git a/src/api/server.rs b/src/api/server.rs index 19ceac92a..cc6c905e4 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -4,10 +4,10 @@ use std::net::SocketAddr; use std::sync::Arc; use serde::{Deserialize, Serialize}; -use warp::{Filter, filters, reply, serve}; +use warp::{filters, reply, serve, Filter}; -use crate::protocol::common::*; use crate::peer::TorrentPeer; +use crate::protocol::common::*; use crate::tracker::tracker::TorrentTracker; #[derive(Deserialize, Debug)] @@ -55,7 +55,7 @@ enum ActionStatus<'a> { impl warp::reject::Reject for ActionStatus<'static> {} -fn authenticate(tokens: HashMap) -> impl Filter + Clone { +fn authenticate(tokens: HashMap) -> impl Filter + Clone { #[derive(Deserialize)] struct AuthToken { token: Option, @@ -67,18 +67,20 @@ fn authenticate(tokens: HashMap) -> impl Filter()) - .and_then(|tokens: Arc>, token: AuthToken| { - async move { - match token.token { - Some(token) => { - if !tokens.contains(&token) { - return Err(warp::reject::custom(ActionStatus::Err { reason: "token not valid".into() })); - } - - Ok(()) + .and_then(|tokens: Arc>, token: AuthToken| async move { + match token.token { + Some(token) => { + if !tokens.contains(&token) { + return Err(warp::reject::custom(ActionStatus::Err { + reason: "token not valid".into(), + })); } - None => Err(warp::reject::custom(ActionStatus::Err { reason: "unauthorized".into() })) + + Ok(()) } + None => Err(warp::reject::custom(ActionStatus::Err { + reason: "unauthorized".into(), + })), } }) .untuple_one() @@ -96,30 +98,28 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = api_torrents.clone(); (limits, tracker) }) - .and_then(|(limits, tracker): (TorrentInfoQuery, Arc)| { - async move { - let offset = limits.offset.unwrap_or(0); - let limit = min(limits.limit.unwrap_or(1000), 4000); - - let db = tracker.get_torrents().await; - let results: Vec<_> = db - .iter() - .map(|(info_hash, torrent_entry)| { - let (seeders, completed, leechers) = torrent_entry.get_stats(); - Torrent { - info_hash, - seeders, - completed, - leechers, - peers: None, - } - }) - .skip(offset as usize) - .take(limit as usize) - .collect(); - - Result::<_, warp::reject::Rejection>::Ok(reply::json(&results)) - } + .and_then(|(limits, tracker): (TorrentInfoQuery, Arc)| async move { + let offset = limits.offset.unwrap_or(0); + let limit = min(limits.limit.unwrap_or(1000), 4000); + + let db = tracker.get_torrents().await; + let results: Vec<_> = db + .iter() + .map(|(info_hash, torrent_entry)| { + let (seeders, completed, leechers) = torrent_entry.get_stats(); + Torrent { + info_hash, + seeders, + completed, + leechers, + peers: None, + } + }) + .skip(offset as usize) + .take(limit as usize) + .collect(); + + Result::<_, warp::reject::Rejection>::Ok(reply::json(&results)) }); // GET /api/stats @@ -132,57 +132,55 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = api_stats.clone(); tracker }) - .and_then(|tracker: Arc| { - async move { - let mut results = Stats { - torrents: 0, - seeders: 0, - completed: 0, - leechers: 0, - tcp4_connections_handled: 0, - tcp4_announces_handled: 0, - tcp4_scrapes_handled: 0, - tcp6_connections_handled: 0, - tcp6_announces_handled: 0, - tcp6_scrapes_handled: 0, - udp4_connections_handled: 0, - udp4_announces_handled: 0, - udp4_scrapes_handled: 0, - udp6_connections_handled: 0, - udp6_announces_handled: 0, - udp6_scrapes_handled: 0, - }; - - let db = tracker.get_torrents().await; - - let _: Vec<_> = db - .iter() - .map(|(_info_hash, torrent_entry)| { - let (seeders, completed, leechers) = torrent_entry.get_stats(); - results.seeders += seeders; - results.completed += completed; - results.leechers += leechers; - results.torrents += 1; - }) - .collect(); - - let stats = tracker.get_stats().await; - - results.tcp4_connections_handled = stats.tcp4_connections_handled as u32; - results.tcp4_announces_handled = stats.tcp4_announces_handled as u32; - results.tcp4_scrapes_handled = stats.tcp4_scrapes_handled as u32; - results.tcp6_connections_handled = stats.tcp6_connections_handled as u32; - results.tcp6_announces_handled = stats.tcp6_announces_handled as u32; - results.tcp6_scrapes_handled = stats.tcp6_scrapes_handled as u32; - results.udp4_connections_handled = stats.udp4_connections_handled as u32; - results.udp4_announces_handled = stats.udp4_announces_handled as u32; - results.udp4_scrapes_handled = stats.udp4_scrapes_handled as u32; - results.udp6_connections_handled = stats.udp6_connections_handled as u32; - results.udp6_announces_handled = stats.udp6_announces_handled as u32; - results.udp6_scrapes_handled = stats.udp6_scrapes_handled as u32; - - Result::<_, warp::reject::Rejection>::Ok(reply::json(&results)) - } + .and_then(|tracker: Arc| async move { + let mut results = Stats { + torrents: 0, + seeders: 0, + completed: 0, + leechers: 0, + tcp4_connections_handled: 0, + tcp4_announces_handled: 0, + tcp4_scrapes_handled: 0, + tcp6_connections_handled: 0, + tcp6_announces_handled: 0, + tcp6_scrapes_handled: 0, + udp4_connections_handled: 0, + udp4_announces_handled: 0, + udp4_scrapes_handled: 0, + udp6_connections_handled: 0, + udp6_announces_handled: 0, + udp6_scrapes_handled: 0, + }; + + let db = tracker.get_torrents().await; + + let _: Vec<_> = db + .iter() + .map(|(_info_hash, torrent_entry)| { + let (seeders, completed, leechers) = torrent_entry.get_stats(); + results.seeders += seeders; + results.completed += completed; + results.leechers += leechers; + results.torrents += 1; + }) + .collect(); + + let stats = tracker.get_stats().await; + + results.tcp4_connections_handled = stats.tcp4_connections_handled as u32; + results.tcp4_announces_handled = stats.tcp4_announces_handled as u32; + results.tcp4_scrapes_handled = stats.tcp4_scrapes_handled as u32; + results.tcp6_connections_handled = stats.tcp6_connections_handled as u32; + results.tcp6_announces_handled = stats.tcp6_announces_handled as u32; + results.tcp6_scrapes_handled = stats.tcp6_scrapes_handled as u32; + results.udp4_connections_handled = stats.udp4_connections_handled as u32; + results.udp4_announces_handled = stats.udp4_announces_handled as u32; + results.udp4_scrapes_handled = stats.udp4_scrapes_handled as u32; + results.udp6_connections_handled = stats.udp6_connections_handled as u32; + results.udp6_announces_handled = stats.udp6_announces_handled as u32; + results.udp6_scrapes_handled = stats.udp6_scrapes_handled as u32; + + Result::<_, warp::reject::Rejection>::Ok(reply::json(&results)) }); // GET /api/torrent/:info_hash @@ -196,28 +194,26 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = t2.clone(); (info_hash, tracker) }) - .and_then(|(info_hash, tracker): (InfoHash, Arc)| { - async move { - let db = tracker.get_torrents().await; - let torrent_entry_option = db.get(&info_hash); + .and_then(|(info_hash, tracker): (InfoHash, Arc)| async move { + let db = tracker.get_torrents().await; + let torrent_entry_option = db.get(&info_hash); - if torrent_entry_option.is_none() { - return Result::<_, warp::reject::Rejection>::Ok(reply::json(&"torrent not known")) - } + if torrent_entry_option.is_none() { + return Result::<_, warp::reject::Rejection>::Ok(reply::json(&"torrent not known")); + } - let torrent_entry = torrent_entry_option.unwrap(); - let (seeders, completed, leechers) = torrent_entry.get_stats(); + let torrent_entry = torrent_entry_option.unwrap(); + let (seeders, completed, leechers) = torrent_entry.get_stats(); - let peers = torrent_entry.get_peers(None); + let peers = torrent_entry.get_peers(None); - Ok(reply::json(&Torrent { - info_hash: &info_hash, - seeders, - completed, - leechers, - peers: Some(peers), - })) - } + Ok(reply::json(&Torrent { + info_hash: &info_hash, + seeders, + completed, + leechers, + peers: Some(peers), + })) }); // DELETE /api/whitelist/:info_hash @@ -231,12 +227,12 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = t3.clone(); (info_hash, tracker) }) - .and_then(|(info_hash, tracker): (InfoHash, Arc)| { - async move { - match tracker.remove_torrent_from_whitelist(&info_hash).await { - Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), - Err(_) => Err(warp::reject::custom(ActionStatus::Err { reason: "failed to remove torrent from whitelist".into() })) - } + .and_then(|(info_hash, tracker): (InfoHash, Arc)| async move { + match tracker.remove_torrent_from_whitelist(&info_hash).await { + Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), + Err(_) => Err(warp::reject::custom(ActionStatus::Err { + reason: "failed to remove torrent from whitelist".into(), + })), } }); @@ -251,12 +247,12 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = t4.clone(); (info_hash, tracker) }) - .and_then(|(info_hash, tracker): (InfoHash, Arc)| { - async move { - match tracker.add_torrent_to_whitelist(&info_hash).await { - Ok(..) => Ok(warp::reply::json(&ActionStatus::Ok)), - Err(..) => Err(warp::reject::custom(ActionStatus::Err { reason: "failed to whitelist torrent".into() })) - } + .and_then(|(info_hash, tracker): (InfoHash, Arc)| async move { + match tracker.add_torrent_to_whitelist(&info_hash).await { + Ok(..) => Ok(warp::reply::json(&ActionStatus::Ok)), + Err(..) => Err(warp::reject::custom(ActionStatus::Err { + reason: "failed to whitelist torrent".into(), + })), } }); @@ -271,12 +267,12 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = t5.clone(); (seconds_valid, tracker) }) - .and_then(|(seconds_valid, tracker): (u64, Arc)| { - async move { - match tracker.generate_auth_key(seconds_valid).await { - Ok(auth_key) => Ok(warp::reply::json(&auth_key)), - Err(..) => Err(warp::reject::custom(ActionStatus::Err { reason: "failed to generate key".into() })) - } + .and_then(|(seconds_valid, tracker): (u64, Arc)| async move { + match tracker.generate_auth_key(seconds_valid).await { + Ok(auth_key) => Ok(warp::reply::json(&auth_key)), + Err(..) => Err(warp::reject::custom(ActionStatus::Err { + reason: "failed to generate key".into(), + })), } }); @@ -291,12 +287,12 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = t6.clone(); (key, tracker) }) - .and_then(|(key, tracker): (String, Arc)| { - async move { - match tracker.remove_auth_key(&key).await { - Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), - Err(_) => Err(warp::reject::custom(ActionStatus::Err { reason: "failed to delete key".into() })) - } + .and_then(|(key, tracker): (String, Arc)| async move { + match tracker.remove_auth_key(&key).await { + Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), + Err(_) => Err(warp::reject::custom(ActionStatus::Err { + reason: "failed to delete key".into(), + })), } }); @@ -311,12 +307,12 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = t7.clone(); tracker }) - .and_then(|tracker: Arc| { - async move { - match tracker.load_whitelist().await { - Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), - Err(_) => Err(warp::reject::custom(ActionStatus::Err { reason: "failed to reload whitelist".into() })) - } + .and_then(|tracker: Arc| async move { + match tracker.load_whitelist().await { + Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), + Err(_) => Err(warp::reject::custom(ActionStatus::Err { + reason: "failed to reload whitelist".into(), + })), } }); @@ -331,34 +327,31 @@ pub fn start(socket_addr: SocketAddr, tracker: Arc) -> impl warp let tracker = t8.clone(); tracker }) - .and_then(|tracker: Arc| { - async move { - match tracker.load_keys().await { - Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), - Err(_) => Err(warp::reject::custom(ActionStatus::Err { reason: "failed to reload keys".into() })) - } + .and_then(|tracker: Arc| async move { + match tracker.load_keys().await { + Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), + Err(_) => Err(warp::reject::custom(ActionStatus::Err { + reason: "failed to reload keys".into(), + })), } }); - let api_routes = - filters::path::path("api") - .and(view_torrent_list - .or(delete_torrent) - .or(view_torrent_info) - .or(view_stats_list) - .or(add_torrent) - .or(create_key) - .or(delete_key) - .or(reload_whitelist) - .or(reload_keys) - ); + let api_routes = filters::path::path("api").and( + view_torrent_list + .or(delete_torrent) + .or(view_torrent_info) + .or(view_stats_list) + .or(add_torrent) + .or(create_key) + .or(delete_key) + .or(reload_whitelist) + .or(reload_keys), + ); let server = api_routes.and(authenticate(tracker.config.http_api.access_tokens.clone())); let (_addr, api_server) = serve(server).bind_with_graceful_shutdown(socket_addr, async move { - tokio::signal::ctrl_c() - .await - .expect("Failed to listen to shutdown signal."); + tokio::signal::ctrl_c().await.expect("Failed to listen to shutdown signal."); }); api_server diff --git a/src/config.rs b/src/config.rs index 005705f78..c094eb2f9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,3 @@ -use std; use std::collections::HashMap; use std::fs; use std::net::IpAddr; @@ -8,7 +7,7 @@ use std::str::FromStr; use config::{Config, ConfigError, File}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, NoneAsEmptyString}; -use toml; +use {std, toml}; use crate::databases::database::DatabaseDrivers; use crate::mode::TrackerMode; @@ -70,7 +69,7 @@ impl std::fmt::Display for ConfigurationError { match self { ConfigurationError::IOError(e) => e.fmt(f), ConfigurationError::ParseError(e) => e.fmt(f), - _ => write!(f, "{:?}", self) + _ => write!(f, "{:?}", self), } } } @@ -78,16 +77,13 @@ impl std::fmt::Display for ConfigurationError { impl std::error::Error for ConfigurationError {} impl Configuration { - pub fn get_ext_ip(&self) -> Option { match &self.external_ip { None => None, - Some(external_ip) => { - match IpAddr::from_str(external_ip) { - Ok(external_ip) => Some(external_ip), - Err(_) => None - } - } + Some(external_ip) => match IpAddr::from_str(external_ip) { + Ok(external_ip) => Some(external_ip), + Err(_) => None, + }, } } @@ -111,24 +107,23 @@ impl Configuration { http_api: HttpApiConfig { enabled: true, bind_address: String::from("127.0.0.1:1212"), - access_tokens: [(String::from("admin"), String::from("MyAccessToken"))].iter().cloned().collect(), + access_tokens: [(String::from("admin"), String::from("MyAccessToken"))] + .iter() + .cloned() + .collect(), }, }; - configuration.udp_trackers.push( - UdpTrackerConfig { - enabled: false, - bind_address: String::from("0.0.0.0:6969"), - } - ); - configuration.http_trackers.push( - HttpTrackerConfig { - enabled: false, - bind_address: String::from("0.0.0.0:6969"), - ssl_enabled: false, - ssl_cert_path: None, - ssl_key_path: None, - } - ); + configuration.udp_trackers.push(UdpTrackerConfig { + enabled: false, + bind_address: String::from("0.0.0.0:6969"), + }); + configuration.http_trackers.push(HttpTrackerConfig { + enabled: false, + bind_address: String::from("0.0.0.0:6969"), + ssl_enabled: false, + ssl_cert_path: None, + ssl_key_path: None, + }); configuration } @@ -142,10 +137,14 @@ impl Configuration { eprintln!("Creating config file.."); let config = Configuration::default(); let _ = config.save_to_file(path); - return Err(ConfigError::Message(format!("Please edit the config.TOML in the root folder and restart the tracker."))); + return Err(ConfigError::Message(format!( + "Please edit the config.TOML in the root folder and restart the tracker." + ))); } - let torrust_config: Configuration = config.try_into().map_err(|e| ConfigError::Message(format!("Errors while processing config: {}.", e)))?; + let torrust_config: Configuration = config + .try_into() + .map_err(|e| ConfigError::Message(format!("Errors while processing config: {}.", e)))?; Ok(torrust_config) } @@ -193,7 +192,11 @@ mod tests { [http_api.access_tokens] admin = "MyAccessToken" - "#.lines().map(|line| line.trim_start()).collect::>().join("\n"); + "# + .lines() + .map(|line| line.trim_start()) + .collect::>() + .join("\n"); config } @@ -219,11 +222,12 @@ mod tests { #[test] fn configuration_should_be_saved_in_a_toml_config_file() { - use std::env; - use crate::Configuration; - use std::fs; + use std::{env, fs}; + use uuid::Uuid; + use crate::Configuration; + // Build temp config file path let temp_directory = env::temp_dir(); let temp_file = temp_directory.join(format!("test_config_{}.toml", Uuid::new_v4())); @@ -234,7 +238,9 @@ mod tests { let default_configuration = Configuration::default(); - default_configuration.save_to_file(&path).expect("Could not save configuration to file"); + default_configuration + .save_to_file(&path) + .expect("Could not save configuration to file"); let contents = fs::read_to_string(&path).expect("Something went wrong reading the file"); @@ -242,16 +248,17 @@ mod tests { } #[cfg(test)] - fn create_temp_config_file_with_default_config()-> String { + fn create_temp_config_file_with_default_config() -> String { use std::env; use std::fs::File; use std::io::Write; + use uuid::Uuid; // Build temp config file path let temp_directory = env::temp_dir(); let temp_file = temp_directory.join(format!("test_config_{}.toml", Uuid::new_v4())); - + // Convert to argument type for Configuration::load_from_file let config_file_path = temp_file.clone(); let path = config_file_path.to_string_lossy().to_string(); @@ -282,4 +289,4 @@ mod tests { assert_eq!(format!("{}", error), "TrackerModeIncompatible"); } -} \ No newline at end of file +} diff --git a/src/databases/database.rs b/src/databases/database.rs index 915c5381e..adc735fd2 100644 --- a/src/databases/database.rs +++ b/src/databases/database.rs @@ -2,10 +2,10 @@ use async_trait::async_trait; use derive_more::{Display, Error}; use serde::{Deserialize, Serialize}; -use crate::InfoHash; -use crate::tracker::key::AuthKey; use crate::databases::mysql::MysqlDatabase; use crate::databases::sqlite::SqliteDatabase; +use crate::tracker::key::AuthKey; +use crate::InfoHash; #[derive(Serialize, Deserialize, PartialEq, Debug)] pub enum DatabaseDrivers { @@ -70,7 +70,7 @@ impl From for Error { fn from(e: r2d2_sqlite::rusqlite::Error) -> Self { match e { r2d2_sqlite::rusqlite::Error::QueryReturnedNoRows => Error::QueryReturnedNoRows, - _ => Error::InvalidQuery + _ => Error::InvalidQuery, } } } diff --git a/src/databases/mod.rs b/src/databases/mod.rs index 119e34816..169d99f4d 100644 --- a/src/databases/mod.rs +++ b/src/databases/mod.rs @@ -1,3 +1,3 @@ +pub mod database; pub mod mysql; pub mod sqlite; -pub mod database; diff --git a/src/databases/mysql.rs b/src/databases/mysql.rs index 5b6e34eb1..882fb7bf4 100644 --- a/src/databases/mysql.rs +++ b/src/databases/mysql.rs @@ -1,16 +1,16 @@ use std::str::FromStr; use async_trait::async_trait; -use log::{debug}; +use log::debug; use r2d2::Pool; -use r2d2_mysql::mysql::{Opts, OptsBuilder, params}; use r2d2_mysql::mysql::prelude::Queryable; +use r2d2_mysql::mysql::{params, Opts, OptsBuilder}; use r2d2_mysql::MysqlConnectionManager; -use crate::{AUTH_KEY_LENGTH, InfoHash}; -use crate::databases::database::{Database, Error}; use crate::databases::database; +use crate::databases::database::{Database, Error}; use crate::tracker::key::AuthKey; +use crate::{InfoHash, AUTH_KEY_LENGTH}; pub struct MysqlDatabase { pool: Pool, @@ -21,11 +21,11 @@ impl MysqlDatabase { let opts = Opts::from_url(&db_path).expect("Failed to connect to MySQL database."); let builder = OptsBuilder::from_opts(opts); let manager = MysqlConnectionManager::new(builder); - let pool = r2d2::Pool::builder().build(manager).expect("Failed to create r2d2 MySQL connection pool."); + let pool = r2d2::Pool::builder() + .build(manager) + .expect("Failed to create r2d2 MySQL connection pool."); - Ok(Self { - pool - }) + Ok(Self { pool }) } } @@ -36,29 +36,36 @@ impl Database for MysqlDatabase { CREATE TABLE IF NOT EXISTS whitelist ( id integer PRIMARY KEY AUTO_INCREMENT, info_hash VARCHAR(40) NOT NULL UNIQUE - );".to_string(); + );" + .to_string(); let create_torrents_table = " CREATE TABLE IF NOT EXISTS torrents ( id integer PRIMARY KEY AUTO_INCREMENT, info_hash VARCHAR(40) NOT NULL UNIQUE, completed INTEGER DEFAULT 0 NOT NULL - );".to_string(); + );" + .to_string(); - let create_keys_table = format!(" + let create_keys_table = format!( + " CREATE TABLE IF NOT EXISTS `keys` ( `id` INT NOT NULL AUTO_INCREMENT, `key` VARCHAR({}) NOT NULL, `valid_until` INT(10) NOT NULL, PRIMARY KEY (`id`), UNIQUE (`key`) - );", AUTH_KEY_LENGTH as i8); + );", + AUTH_KEY_LENGTH as i8 + ); let mut conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; - conn.query_drop(&create_torrents_table).expect("Could not create torrents table."); + conn.query_drop(&create_torrents_table) + .expect("Could not create torrents table."); conn.query_drop(&create_keys_table).expect("Could not create keys table."); - conn.query_drop(&create_whitelist_table).expect("Could not create whitelist table."); + conn.query_drop(&create_whitelist_table) + .expect("Could not create whitelist table."); Ok(()) } @@ -66,10 +73,15 @@ impl Database for MysqlDatabase { async fn load_persistent_torrents(&self) -> Result, database::Error> { let mut conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; - let torrents: Vec<(InfoHash, u32)> = conn.query_map("SELECT info_hash, completed FROM torrents", |(info_hash_string, completed): (String, u32)| { - let info_hash = InfoHash::from_str(&info_hash_string).unwrap(); - (info_hash, completed) - }).map_err(|_| database::Error::QueryReturnedNoRows)?; + let torrents: Vec<(InfoHash, u32)> = conn + .query_map( + "SELECT info_hash, completed FROM torrents", + |(info_hash_string, completed): (String, u32)| { + let info_hash = InfoHash::from_str(&info_hash_string).unwrap(); + (info_hash, completed) + }, + ) + .map_err(|_| database::Error::QueryReturnedNoRows)?; Ok(torrents) } @@ -77,12 +89,15 @@ impl Database for MysqlDatabase { async fn load_keys(&self) -> Result, Error> { let mut conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; - let keys: Vec = conn.query_map("SELECT `key`, valid_until FROM `keys`", |(key, valid_until): (String, i64)| { - AuthKey { - key, - valid_until: Some(valid_until as u64) - } - }).map_err(|_| database::Error::QueryReturnedNoRows)?; + let keys: Vec = conn + .query_map( + "SELECT `key`, valid_until FROM `keys`", + |(key, valid_until): (String, i64)| AuthKey { + key, + valid_until: Some(valid_until as u64), + }, + ) + .map_err(|_| database::Error::QueryReturnedNoRows)?; Ok(keys) } @@ -90,9 +105,11 @@ impl Database for MysqlDatabase { async fn load_whitelist(&self) -> Result, Error> { let mut conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; - let info_hashes: Vec = conn.query_map("SELECT info_hash FROM whitelist", |info_hash: String| { - InfoHash::from_str(&info_hash).unwrap() - }).map_err(|_| database::Error::QueryReturnedNoRows)?; + let info_hashes: Vec = conn + .query_map("SELECT info_hash FROM whitelist", |info_hash: String| { + InfoHash::from_str(&info_hash).unwrap() + }) + .map_err(|_| database::Error::QueryReturnedNoRows)?; Ok(info_hashes) } @@ -118,14 +135,15 @@ impl Database for MysqlDatabase { async fn get_info_hash_from_whitelist(&self, info_hash: &str) -> Result { let mut conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; - match conn.exec_first::("SELECT info_hash FROM whitelist WHERE info_hash = :info_hash", params! { info_hash }) - .map_err(|_| database::Error::QueryReturnedNoRows)? { - Some(info_hash) => { - Ok(InfoHash::from_str(&info_hash).unwrap()) - } - None => { - Err(database::Error::InvalidQuery) - } + match conn + .exec_first::( + "SELECT info_hash FROM whitelist WHERE info_hash = :info_hash", + params! { info_hash }, + ) + .map_err(|_| database::Error::QueryReturnedNoRows)? + { + Some(info_hash) => Ok(InfoHash::from_str(&info_hash).unwrap()), + None => Err(database::Error::InvalidQuery), } } @@ -134,10 +152,11 @@ impl Database for MysqlDatabase { let info_hash_str = info_hash.to_string(); - match conn.exec_drop("INSERT INTO whitelist (info_hash) VALUES (:info_hash_str)", params! { info_hash_str }) { - Ok(_) => { - Ok(1) - } + match conn.exec_drop( + "INSERT INTO whitelist (info_hash) VALUES (:info_hash_str)", + params! { info_hash_str }, + ) { + Ok(_) => Ok(1), Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -151,9 +170,7 @@ impl Database for MysqlDatabase { let info_hash = info_hash.to_string(); match conn.exec_drop("DELETE FROM whitelist WHERE info_hash = :info_hash", params! { info_hash }) { - Ok(_) => { - Ok(1) - } + Ok(_) => Ok(1), Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -164,17 +181,15 @@ impl Database for MysqlDatabase { async fn get_key_from_keys(&self, key: &str) -> Result { let mut conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; - match conn.exec_first::<(String, i64), _, _>("SELECT `key`, valid_until FROM `keys` WHERE `key` = :key", params! { key }) - .map_err(|_| database::Error::QueryReturnedNoRows)? { - Some((key, valid_until)) => { - Ok(AuthKey { - key, - valid_until: Some(valid_until as u64), - }) - } - None => { - Err(database::Error::InvalidQuery) - } + match conn + .exec_first::<(String, i64), _, _>("SELECT `key`, valid_until FROM `keys` WHERE `key` = :key", params! { key }) + .map_err(|_| database::Error::QueryReturnedNoRows)? + { + Some((key, valid_until)) => Ok(AuthKey { + key, + valid_until: Some(valid_until as u64), + }), + None => Err(database::Error::InvalidQuery), } } @@ -184,10 +199,11 @@ impl Database for MysqlDatabase { let key = auth_key.key.to_string(); let valid_until = auth_key.valid_until.unwrap_or(0).to_string(); - match conn.exec_drop("INSERT INTO `keys` (`key`, valid_until) VALUES (:key, :valid_until)", params! { key, valid_until }) { - Ok(_) => { - Ok(1) - } + match conn.exec_drop( + "INSERT INTO `keys` (`key`, valid_until) VALUES (:key, :valid_until)", + params! { key, valid_until }, + ) { + Ok(_) => Ok(1), Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -199,9 +215,7 @@ impl Database for MysqlDatabase { let mut conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; match conn.exec_drop("DELETE FROM `keys` WHERE key = :key", params! { key }) { - Ok(_) => { - Ok(1) - } + Ok(_) => Ok(1), Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) diff --git a/src/databases/sqlite.rs b/src/databases/sqlite.rs index 143029ec2..3aba39919 100644 --- a/src/databases/sqlite.rs +++ b/src/databases/sqlite.rs @@ -5,10 +5,10 @@ use log::debug; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; -use crate::{InfoHash}; -use crate::databases::database::{Database, Error}; use crate::databases::database; +use crate::databases::database::{Database, Error}; use crate::tracker::key::AuthKey; +use crate::InfoHash; pub struct SqliteDatabase { pool: Pool, @@ -18,9 +18,7 @@ impl SqliteDatabase { pub fn new(db_path: &str) -> Result { let cm = SqliteConnectionManager::file(db_path); let pool = Pool::new(cm).expect("Failed to create r2d2 SQLite connection pool."); - Ok(SqliteDatabase { - pool - }) + Ok(SqliteDatabase { pool }) } } @@ -31,21 +29,24 @@ impl Database for SqliteDatabase { CREATE TABLE IF NOT EXISTS whitelist ( id INTEGER PRIMARY KEY AUTOINCREMENT, info_hash TEXT NOT NULL UNIQUE - );".to_string(); + );" + .to_string(); let create_torrents_table = " CREATE TABLE IF NOT EXISTS torrents ( id INTEGER PRIMARY KEY AUTOINCREMENT, info_hash TEXT NOT NULL UNIQUE, completed INTEGER DEFAULT 0 NOT NULL - );".to_string(); + );" + .to_string(); let create_keys_table = " CREATE TABLE IF NOT EXISTS keys ( id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL UNIQUE, valid_until INTEGER NOT NULL - );".to_string(); + );" + .to_string(); let conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; @@ -84,7 +85,7 @@ impl Database for SqliteDatabase { Ok(AuthKey { key, - valid_until: Some(valid_until as u64) + valid_until: Some(valid_until as u64), }) })?; @@ -112,9 +113,14 @@ impl Database for SqliteDatabase { async fn save_persistent_torrent(&self, info_hash: &InfoHash, completed: u32) -> Result<(), database::Error> { let conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; - match conn.execute("INSERT INTO torrents (info_hash, completed) VALUES (?1, ?2) ON CONFLICT(info_hash) DO UPDATE SET completed = ?2", [info_hash.to_string(), completed.to_string()]) { + match conn.execute( + "INSERT INTO torrents (info_hash, completed) VALUES (?1, ?2) ON CONFLICT(info_hash) DO UPDATE SET completed = ?2", + [info_hash.to_string(), completed.to_string()], + ) { Ok(updated) => { - if updated > 0 { return Ok(()); } + if updated > 0 { + return Ok(()); + } Err(database::Error::QueryReturnedNoRows) } Err(e) => { @@ -145,7 +151,9 @@ impl Database for SqliteDatabase { match conn.execute("INSERT INTO whitelist (info_hash) VALUES (?)", [info_hash.to_string()]) { Ok(updated) => { - if updated > 0 { return Ok(updated); } + if updated > 0 { + return Ok(updated); + } Err(database::Error::QueryReturnedNoRows) } Err(e) => { @@ -160,7 +168,9 @@ impl Database for SqliteDatabase { match conn.execute("DELETE FROM whitelist WHERE info_hash = ?", [info_hash.to_string()]) { Ok(updated) => { - if updated > 0 { return Ok(updated); } + if updated > 0 { + return Ok(updated); + } Err(database::Error::QueryReturnedNoRows) } Err(e) => { @@ -192,11 +202,14 @@ impl Database for SqliteDatabase { async fn add_key_to_keys(&self, auth_key: &AuthKey) -> Result { let conn = self.pool.get().map_err(|_| database::Error::DatabaseError)?; - match conn.execute("INSERT INTO keys (key, valid_until) VALUES (?1, ?2)", - [auth_key.key.to_string(), auth_key.valid_until.unwrap().to_string()], + match conn.execute( + "INSERT INTO keys (key, valid_until) VALUES (?1, ?2)", + [auth_key.key.to_string(), auth_key.valid_until.unwrap().to_string()], ) { Ok(updated) => { - if updated > 0 { return Ok(updated); } + if updated > 0 { + return Ok(updated); + } Err(database::Error::QueryReturnedNoRows) } Err(e) => { @@ -211,7 +224,9 @@ impl Database for SqliteDatabase { match conn.execute("DELETE FROM keys WHERE key = ?", &[key]) { Ok(updated) => { - if updated > 0 { return Ok(updated); } + if updated > 0 { + return Ok(updated); + } Err(database::Error::QueryReturnedNoRows) } Err(e) => { diff --git a/src/http/filters.rs b/src/http/filters.rs index a288f8d97..514cb804c 100644 --- a/src/http/filters.rs +++ b/src/http/filters.rs @@ -3,44 +3,37 @@ use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use std::sync::Arc; -use warp::{Filter, reject, Rejection}; +use warp::{reject, Filter, Rejection}; -use crate::{InfoHash, MAX_SCRAPE_TORRENTS, PeerId}; -use crate::tracker::key::AuthKey; use crate::http::{AnnounceRequest, AnnounceRequestQuery, ScrapeRequest, ServerError, WebResult}; +use crate::tracker::key::AuthKey; use crate::tracker::tracker::TorrentTracker; +use crate::{InfoHash, PeerId, MAX_SCRAPE_TORRENTS}; /// Pass Arc along -pub fn with_tracker(tracker: Arc) -> impl Filter, ), Error=Infallible> + Clone { - warp::any() - .map(move || tracker.clone()) +pub fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { + warp::any().map(move || tracker.clone()) } /// Check for infoHash -pub fn with_info_hash() -> impl Filter, ), Error=Rejection> + Clone { - warp::filters::query::raw() - .and_then(info_hashes) +pub fn with_info_hash() -> impl Filter,), Error = Rejection> + Clone { + warp::filters::query::raw().and_then(info_hashes) } /// Check for PeerId -pub fn with_peer_id() -> impl Filter + Clone { - warp::filters::query::raw() - .and_then(peer_id) +pub fn with_peer_id() -> impl Filter + Clone { + warp::filters::query::raw().and_then(peer_id) } /// Pass Arc along -pub fn with_auth_key() -> impl Filter, ), Error=Infallible> + Clone { +pub fn with_auth_key() -> impl Filter,), Error = Infallible> + Clone { warp::path::param::() - .map(|key: String| { - AuthKey::from_string(&key) - }) - .or_else(|_| async { - Ok::<(Option, ), Infallible>((None, )) - }) + .map(|key: String| AuthKey::from_string(&key)) + .or_else(|_| async { Ok::<(Option,), Infallible>((None,)) }) } /// Check for PeerAddress -pub fn with_peer_addr(on_reverse_proxy: bool) -> impl Filter + Clone { +pub fn with_peer_addr(on_reverse_proxy: bool) -> impl Filter + Clone { warp::addr::remote() .and(warp::header::optional::("X-Forwarded-For")) .map(move |remote_addr: Option, x_forwarded_for: Option| { @@ -50,7 +43,7 @@ pub fn with_peer_addr(on_reverse_proxy: bool) -> impl Filter impl Filter + Clone { +pub fn with_announce_request(on_reverse_proxy: bool) -> impl Filter + Clone { warp::filters::query::query::() .and(with_info_hash()) .and(with_peer_id()) @@ -59,7 +52,7 @@ pub fn with_announce_request(on_reverse_proxy: bool) -> impl Filter impl Filter + Clone { +pub fn with_scrape_request(on_reverse_proxy: bool) -> impl Filter + Clone { warp::any() .and(with_info_hash()) .and(with_peer_addr(on_reverse_proxy)) @@ -129,7 +122,9 @@ async fn peer_id(raw_query: String) -> WebResult { } /// Get PeerAddress from RemoteAddress or Forwarded -async fn peer_addr((on_reverse_proxy, remote_addr, x_forwarded_for): (bool, Option, Option)) -> WebResult { +async fn peer_addr( + (on_reverse_proxy, remote_addr, x_forwarded_for): (bool, Option, Option), +) -> WebResult { if !on_reverse_proxy && remote_addr.is_none() { return Err(reject::custom(ServerError::AddressNotFound)); } @@ -148,16 +143,19 @@ async fn peer_addr((on_reverse_proxy, remote_addr, x_forwarded_for): (bool, Opti // set client ip to last forwarded ip let x_forwarded_ip = *x_forwarded_ips.last().unwrap(); - IpAddr::from_str(x_forwarded_ip).or_else(|_| { - Err(reject::custom(ServerError::AddressNotFound)) - }) + IpAddr::from_str(x_forwarded_ip).or_else(|_| Err(reject::custom(ServerError::AddressNotFound))) } - false => Ok(remote_addr.unwrap().ip()) + false => Ok(remote_addr.unwrap().ip()), } } /// Parse AnnounceRequest from raw AnnounceRequestQuery, InfoHash and Option -async fn announce_request(announce_request_query: AnnounceRequestQuery, info_hashes: Vec, peer_id: PeerId, peer_addr: IpAddr) -> WebResult { +async fn announce_request( + announce_request_query: AnnounceRequestQuery, + info_hashes: Vec, + peer_id: PeerId, + peer_addr: IpAddr, +) -> WebResult { Ok(AnnounceRequest { info_hash: info_hashes[0], peer_addr, @@ -173,8 +171,5 @@ async fn announce_request(announce_request_query: AnnounceRequestQuery, info_has /// Parse ScrapeRequest from InfoHash async fn scrape_request(info_hashes: Vec, peer_addr: IpAddr) -> WebResult { - Ok(ScrapeRequest { - info_hashes, - peer_addr, - }) + Ok(ScrapeRequest { info_hashes, peer_addr }) } diff --git a/src/http/handlers.rs b/src/http/handlers.rs index 0dc737641..5214bbe6e 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -4,19 +4,26 @@ use std::net::IpAddr; use std::sync::Arc; use log::debug; -use warp::{reject, Rejection, Reply}; use warp::http::Response; +use warp::{reject, Rejection, Reply}; -use crate::{InfoHash}; -use crate::tracker::key::AuthKey; -use crate::tracker::torrent::{TorrentError, TorrentStats}; -use crate::http::{AnnounceRequest, AnnounceResponse, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry, ServerError, WebResult}; +use crate::http::{ + AnnounceRequest, AnnounceResponse, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry, ServerError, + WebResult, +}; use crate::peer::TorrentPeer; +use crate::tracker::key::AuthKey; use crate::tracker::statistics::TrackerStatisticsEvent; +use crate::tracker::torrent::{TorrentError, TorrentStats}; use crate::tracker::tracker::TorrentTracker; +use crate::InfoHash; /// Authenticate InfoHash using optional AuthKey -pub async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { +pub async fn authenticate( + info_hash: &InfoHash, + auth_key: &Option, + tracker: Arc, +) -> Result<(), ServerError> { match tracker.authenticate_request(info_hash, auth_key).await { Ok(_) => Ok(()), Err(e) => { @@ -35,15 +42,22 @@ pub async fn authenticate(info_hash: &InfoHash, auth_key: &Option, trac } /// Handle announce request -pub async fn handle_announce(announce_request: AnnounceRequest, auth_key: Option, tracker: Arc) -> WebResult { +pub async fn handle_announce( + announce_request: AnnounceRequest, + auth_key: Option, + tracker: Arc, +) -> WebResult { if let Err(e) = authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await { return Err(reject::custom(e)); } debug!("{:?}", announce_request); - let peer = TorrentPeer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip()); - let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer).await; + let peer = + TorrentPeer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip()); + let torrent_stats = tracker + .update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer) + .await; // get all torrent peers excluding the peer_addr let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; @@ -52,15 +66,29 @@ pub async fn handle_announce(announce_request: AnnounceRequest, auth_key: Option // send stats event match announce_request.peer_addr { - IpAddr::V4(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Tcp4Announce).await; } - IpAddr::V6(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Tcp6Announce).await; } + IpAddr::V4(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Tcp4Announce).await; + } + IpAddr::V6(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Tcp6Announce).await; + } } - send_announce_response(&announce_request, torrent_stats, peers, announce_interval, tracker.config.min_announce_interval) + send_announce_response( + &announce_request, + torrent_stats, + peers, + announce_interval, + tracker.config.min_announce_interval, + ) } /// Handle scrape request -pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option, tracker: Arc) -> WebResult { +pub async fn handle_scrape( + scrape_request: ScrapeRequest, + auth_key: Option, + tracker: Arc, +) -> WebResult { let mut files: HashMap = HashMap::new(); let db = tracker.get_torrents().await; @@ -69,14 +97,24 @@ pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option { if authenticate(info_hash, &auth_key, tracker.clone()).await.is_ok() { let (seeders, completed, leechers) = torrent_info.get_stats(); - ScrapeResponseEntry { complete: seeders, downloaded: completed, incomplete: leechers } + ScrapeResponseEntry { + complete: seeders, + downloaded: completed, + incomplete: leechers, + } } else { - ScrapeResponseEntry { complete: 0, downloaded: 0, incomplete: 0 } + ScrapeResponseEntry { + complete: 0, + downloaded: 0, + incomplete: 0, + } } } - None => { - ScrapeResponseEntry { complete: 0, downloaded: 0, incomplete: 0 } - } + None => ScrapeResponseEntry { + complete: 0, + downloaded: 0, + incomplete: 0, + }, }; files.insert(info_hash.clone(), scrape_entry); @@ -84,20 +122,33 @@ pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option { tracker.send_stats_event(TrackerStatisticsEvent::Tcp4Scrape).await; } - IpAddr::V6(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Tcp6Scrape).await; } + IpAddr::V4(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Tcp4Scrape).await; + } + IpAddr::V6(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Tcp6Scrape).await; + } } send_scrape_response(files) } /// Send announce response -fn send_announce_response(announce_request: &AnnounceRequest, torrent_stats: TorrentStats, peers: Vec, interval: u32, interval_min: u32) -> WebResult { - let http_peers: Vec = peers.iter().map(|peer| Peer { - peer_id: peer.peer_id.to_string(), - ip: peer.peer_addr.ip(), - port: peer.peer_addr.port(), - }).collect(); +fn send_announce_response( + announce_request: &AnnounceRequest, + torrent_stats: TorrentStats, + peers: Vec, + interval: u32, + interval_min: u32, +) -> WebResult { + let http_peers: Vec = peers + .iter() + .map(|peer| Peer { + peer_id: peer.peer_id.to_string(), + ip: peer.peer_addr.ip(), + port: peer.peer_addr.port(), + }) + .collect(); let res = AnnounceResponse { interval, @@ -111,7 +162,7 @@ fn send_announce_response(announce_request: &AnnounceRequest, torrent_stats: Tor if let Some(1) = announce_request.compact { match res.write_compact() { Ok(body) => Ok(Response::new(body)), - Err(_) => Err(reject::custom(ServerError::InternalServerError)) + Err(_) => Err(reject::custom(ServerError::InternalServerError)), } } else { Ok(Response::new(res.write().into())) @@ -124,7 +175,7 @@ fn send_scrape_response(files: HashMap) -> WebRes match res.write() { Ok(body) => Ok(Response::new(body)), - Err(_) => Err(reject::custom(ServerError::InternalServerError)) + Err(_) => Err(reject::custom(ServerError::InternalServerError)), } } @@ -132,9 +183,15 @@ fn send_scrape_response(files: HashMap) -> WebRes pub async fn send_error(r: Rejection) -> std::result::Result { let body = if let Some(server_error) = r.find::() { debug!("{:?}", server_error); - ErrorResponse { failure_reason: server_error.to_string() }.write() + ErrorResponse { + failure_reason: server_error.to_string(), + } + .write() } else { - ErrorResponse { failure_reason: ServerError::InternalServerError.to_string() }.write() + ErrorResponse { + failure_reason: ServerError::InternalServerError.to_string(), + } + .write() }; Ok(Response::new(body)) diff --git a/src/http/mod.rs b/src/http/mod.rs index 07d077577..4842c0a25 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -6,13 +6,13 @@ pub use self::response::*; pub use self::routes::*; pub use self::server::*; -pub mod server; +pub mod errors; +pub mod filters; +pub mod handlers; pub mod request; pub mod response; -pub mod errors; pub mod routes; -pub mod handlers; -pub mod filters; +pub mod server; pub type Bytes = u64; pub type WebResult = std::result::Result; diff --git a/src/http/request.rs b/src/http/request.rs index 28cd4750e..6dd025e8c 100644 --- a/src/http/request.rs +++ b/src/http/request.rs @@ -2,8 +2,8 @@ use std::net::IpAddr; use serde::Deserialize; -use crate::{InfoHash, PeerId}; use crate::http::Bytes; +use crate::{InfoHash, PeerId}; #[derive(Deserialize)] pub struct AnnounceRequestQuery { diff --git a/src/http/response.rs b/src/http/response.rs index 2bdd4c1e7..4db12f995 100644 --- a/src/http/response.rs +++ b/src/http/response.rs @@ -5,6 +5,7 @@ use std::net::IpAddr; use serde; use serde::Serialize; + use crate::InfoHash; #[derive(Serialize)] diff --git a/src/http/routes.rs b/src/http/routes.rs index 53b2b0ce5..a9ca3027f 100644 --- a/src/http/routes.rs +++ b/src/http/routes.rs @@ -3,24 +3,18 @@ use std::sync::Arc; use warp::{Filter, Rejection}; -use crate::http::handle_announce; -use crate::http::handle_scrape; -use crate::http::send_error; -use crate::http::with_announce_request; -use crate::http::with_auth_key; -use crate::http::with_scrape_request; -use crate::http::with_tracker; +use crate::http::{ + handle_announce, handle_scrape, send_error, with_announce_request, with_auth_key, with_scrape_request, with_tracker, +}; use crate::tracker::tracker::TorrentTracker; /// All routes -pub fn routes(tracker: Arc) -> impl Filter + Clone { - announce(tracker.clone()) - .or(scrape(tracker)) - .recover(send_error) +pub fn routes(tracker: Arc) -> impl Filter + Clone { + announce(tracker.clone()).or(scrape(tracker)).recover(send_error) } /// GET /announce or /announce/ -fn announce(tracker: Arc) -> impl Filter + Clone { +fn announce(tracker: Arc) -> impl Filter + Clone { warp::path::path("announce") .and(warp::filters::method::get()) .and(with_announce_request(tracker.config.on_reverse_proxy)) @@ -30,7 +24,7 @@ fn announce(tracker: Arc) -> impl Filter -fn scrape(tracker: Arc) -> impl Filter + Clone { +fn scrape(tracker: Arc) -> impl Filter + Clone { warp::path::path("scrape") .and(warp::filters::method::get()) .and(with_scrape_request(tracker.config.on_reverse_proxy)) diff --git a/src/http/server.rs b/src/http/server.rs index 5a5b5f735..8b92d8792 100644 --- a/src/http/server.rs +++ b/src/http/server.rs @@ -12,33 +12,31 @@ pub struct HttpServer { impl HttpServer { pub fn new(tracker: Arc) -> HttpServer { - HttpServer { - tracker - } + HttpServer { tracker } } /// Start the HttpServer pub fn start(&self, socket_addr: SocketAddr) -> impl warp::Future { - let (_addr, server) = warp::serve(routes(self.tracker.clone())) - .bind_with_graceful_shutdown(socket_addr, async move { - tokio::signal::ctrl_c() - .await - .expect("Failed to listen to shutdown signal."); - }); + let (_addr, server) = warp::serve(routes(self.tracker.clone())).bind_with_graceful_shutdown(socket_addr, async move { + tokio::signal::ctrl_c().await.expect("Failed to listen to shutdown signal."); + }); server } /// Start the HttpServer in TLS mode - pub fn start_tls(&self, socket_addr: SocketAddr, ssl_cert_path: String, ssl_key_path: String) -> impl warp::Future { + pub fn start_tls( + &self, + socket_addr: SocketAddr, + ssl_cert_path: String, + ssl_key_path: String, + ) -> impl warp::Future { let (_addr, server) = warp::serve(routes(self.tracker.clone())) .tls() .cert_path(ssl_cert_path) .key_path(ssl_key_path) .bind_with_graceful_shutdown(socket_addr, async move { - tokio::signal::ctrl_c() - .await - .expect("Failed to listen to shutdown signal."); + tokio::signal::ctrl_c().await.expect("Failed to listen to shutdown signal."); }); server diff --git a/src/jobs/http_tracker.rs b/src/jobs/http_tracker.rs index 85f64200f..ef67f0a7e 100644 --- a/src/jobs/http_tracker.rs +++ b/src/jobs/http_tracker.rs @@ -1,9 +1,11 @@ use std::net::SocketAddr; use std::sync::Arc; + use log::{info, warn}; use tokio::task::JoinHandle; -use crate::{HttpServer, HttpTrackerConfig}; + use crate::tracker::tracker::TorrentTracker; +use crate::{HttpServer, HttpTrackerConfig}; pub fn start_job(config: &HttpTrackerConfig, tracker: Arc) -> JoinHandle<()> { let bind_addr = config.bind_address.parse::().unwrap(); @@ -19,7 +21,9 @@ pub fn start_job(config: &HttpTrackerConfig, tracker: Arc) -> Jo http_tracker.start(bind_addr).await; } else if ssl_enabled && ssl_cert_path.is_some() && ssl_key_path.is_some() { info!("Starting HTTPS server on: {} (TLS)", bind_addr); - http_tracker.start_tls(bind_addr, ssl_cert_path.unwrap(), ssl_key_path.unwrap()).await; + http_tracker + .start_tls(bind_addr, ssl_cert_path.unwrap(), ssl_key_path.unwrap()) + .await; } else { warn!("Could not start HTTP tracker on: {}, missing SSL Cert or Key!", bind_addr); } diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index c3e58e56e..8b8f0662b 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,4 +1,4 @@ +pub mod http_tracker; pub mod torrent_cleanup; pub mod tracker_api; -pub mod http_tracker; pub mod udp_tracker; diff --git a/src/jobs/torrent_cleanup.rs b/src/jobs/torrent_cleanup.rs index 7d9967352..6e4b0c77e 100644 --- a/src/jobs/torrent_cleanup.rs +++ b/src/jobs/torrent_cleanup.rs @@ -1,9 +1,11 @@ use std::sync::Arc; + use chrono::Utc; use log::info; use tokio::task::JoinHandle; -use crate::{Configuration}; + use crate::tracker::tracker::TorrentTracker; +use crate::Configuration; pub fn start_job(config: &Configuration, tracker: Arc) -> JoinHandle<()> { let weak_tracker = std::sync::Arc::downgrade(&tracker); diff --git a/src/jobs/tracker_api.rs b/src/jobs/tracker_api.rs index 476a87a6a..f3c9ae788 100644 --- a/src/jobs/tracker_api.rs +++ b/src/jobs/tracker_api.rs @@ -1,12 +1,18 @@ use std::sync::Arc; + use log::info; use tokio::task::JoinHandle; -use crate::{Configuration}; + use crate::api::server; use crate::tracker::tracker::TorrentTracker; +use crate::Configuration; pub fn start_job(config: &Configuration, tracker: Arc) -> JoinHandle<()> { - let bind_addr = config.http_api.bind_address.parse::().expect("Tracker API bind_address invalid."); + let bind_addr = config + .http_api + .bind_address + .parse::() + .expect("Tracker API bind_address invalid."); info!("Starting Torrust API server on: {}", bind_addr); tokio::spawn(async move { diff --git a/src/jobs/udp_tracker.rs b/src/jobs/udp_tracker.rs index 32ef76ef4..f93979c9f 100644 --- a/src/jobs/udp_tracker.rs +++ b/src/jobs/udp_tracker.rs @@ -1,8 +1,10 @@ use std::sync::Arc; + use log::{error, info, warn}; use tokio::task::JoinHandle; -use crate::{UdpServer, UdpTrackerConfig}; + use crate::tracker::tracker::TorrentTracker; +use crate::{UdpServer, UdpTrackerConfig}; pub fn start_job(config: &UdpTrackerConfig, tracker: Arc) -> JoinHandle<()> { let bind_addr = config.bind_address.clone(); diff --git a/src/lib.rs b/src/lib.rs index 245f4686c..6dcc7e6da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,18 +1,18 @@ +pub use api::server::*; pub use http::server::*; +pub use protocol::common::*; pub use udp::server::*; -pub use protocol::common::*; pub use self::config::*; -pub use api::server::*; pub use self::tracker::*; +pub mod api; pub mod config; -pub mod tracker; -pub mod logging; -pub mod udp; -pub mod http; -pub mod setup; pub mod databases; +pub mod http; pub mod jobs; -pub mod api; +pub mod logging; pub mod protocol; +pub mod setup; +pub mod tracker; +pub mod udp; diff --git a/src/logging.rs b/src/logging.rs index c2e77551f..209c9f848 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -5,19 +5,17 @@ use crate::Configuration; pub fn setup_logging(cfg: &Configuration) { let log_level = match &cfg.log_level { None => log::LevelFilter::Info, - Some(level) => { - match level.as_str() { - "off" => log::LevelFilter::Off, - "trace" => log::LevelFilter::Trace, - "debug" => log::LevelFilter::Debug, - "info" => log::LevelFilter::Info, - "warn" => log::LevelFilter::Warn, - "error" => log::LevelFilter::Error, - _ => { - panic!("Unknown log level encountered: '{}'", level.as_str()); - } + Some(level) => match level.as_str() { + "off" => log::LevelFilter::Off, + "trace" => log::LevelFilter::Trace, + "debug" => log::LevelFilter::Debug, + "info" => log::LevelFilter::Info, + "warn" => log::LevelFilter::Warn, + "error" => log::LevelFilter::Error, + _ => { + panic!("Unknown log level encountered: '{}'", level.as_str()); } - } + }, }; if let Err(_err) = fern::Dispatch::new() diff --git a/src/main.rs b/src/main.rs index 963419f03..0b406c85a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,8 @@ use std::sync::Arc; + use log::info; -use torrust_tracker::Configuration; -use torrust_tracker::logging; -use torrust_tracker::setup; use torrust_tracker::tracker::tracker::TorrentTracker; +use torrust_tracker::{logging, setup, Configuration}; #[tokio::main] async fn main() { diff --git a/src/protocol/common.rs b/src/protocol/common.rs index 5d69ed0e1..92a3ed51c 100644 --- a/src/protocol/common.rs +++ b/src/protocol/common.rs @@ -221,8 +221,9 @@ impl PeerId { impl Serialize for PeerId { fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, { + where + S: serde::Serializer, + { let buff_size = self.0.len() * 2; let mut tmp: Vec = vec![0; buff_size]; binascii::bin2hex(&self.0, &mut tmp).unwrap(); diff --git a/src/protocol/utils.rs b/src/protocol/utils.rs index 30b87b99b..e50c8b036 100644 --- a/src/protocol/utils.rs +++ b/src/protocol/utils.rs @@ -11,9 +11,7 @@ pub fn get_connection_id(remote_address: &SocketAddr) -> ConnectionId { } pub fn current_time() -> u64 { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH).unwrap() - .as_secs() + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() } pub fn ser_instant(inst: &std::time::Instant, ser: S) -> Result { diff --git a/src/setup.rs b/src/setup.rs index ed9b6d8ff..0c5ed9004 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -1,11 +1,13 @@ use std::sync::Arc; -use log::{warn}; + +use log::warn; use tokio::task::JoinHandle; -use crate::{Configuration}; + use crate::jobs::{http_tracker, torrent_cleanup, tracker_api, udp_tracker}; use crate::tracker::tracker::TorrentTracker; +use crate::Configuration; -pub async fn setup(config: &Configuration, tracker: Arc) -> Vec>{ +pub async fn setup(config: &Configuration, tracker: Arc) -> Vec> { let mut jobs: Vec> = Vec::new(); // Load peer keys @@ -15,15 +17,23 @@ pub async fn setup(config: &Configuration, tracker: Arc) -> Vec< // Load whitelisted torrents if tracker.is_whitelisted() { - tracker.load_whitelist().await.expect("Could not load whitelist from database."); + tracker + .load_whitelist() + .await + .expect("Could not load whitelist from database."); } // Start the UDP blocks for udp_tracker_config in &config.udp_trackers { - if !udp_tracker_config.enabled { continue; } + if !udp_tracker_config.enabled { + continue; + } if tracker.is_private() { - warn!("Could not start UDP tracker on: {} while in {:?}. UDP is not safe for private trackers!", udp_tracker_config.bind_address, config.mode); + warn!( + "Could not start UDP tracker on: {} while in {:?}. UDP is not safe for private trackers!", + udp_tracker_config.bind_address, config.mode + ); } else { jobs.push(udp_tracker::start_job(&udp_tracker_config, tracker.clone())) } @@ -31,7 +41,9 @@ pub async fn setup(config: &Configuration, tracker: Arc) -> Vec< // Start the HTTP blocks for http_tracker_config in &config.http_trackers { - if !http_tracker_config.enabled { continue; } + if !http_tracker_config.enabled { + continue; + } jobs.push(http_tracker::start_job(&http_tracker_config, tracker.clone())); } diff --git a/src/tracker/key.rs b/src/tracker/key.rs index 2e2ca81f7..f935dac07 100644 --- a/src/tracker/key.rs +++ b/src/tracker/key.rs @@ -1,11 +1,10 @@ use derive_more::{Display, Error}; use log::debug; -use rand::{Rng, thread_rng}; use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; use serde::Serialize; use crate::protocol::utils::current_time; - use crate::AUTH_KEY_LENGTH; pub fn generate_auth_key(seconds_valid: u64) -> AuthKey { @@ -25,8 +24,12 @@ pub fn generate_auth_key(seconds_valid: u64) -> AuthKey { pub fn verify_auth_key(auth_key: &AuthKey) -> Result<(), Error> { let current_time = current_time(); - if auth_key.valid_until.is_none() { return Err(Error::KeyInvalid); } - if auth_key.valid_until.unwrap() < current_time { return Err(Error::KeyExpired); } + if auth_key.valid_until.is_none() { + return Err(Error::KeyInvalid); + } + if auth_key.valid_until.unwrap() < current_time { + return Err(Error::KeyExpired); + } Ok(()) } @@ -40,10 +43,7 @@ pub struct AuthKey { impl AuthKey { pub fn from_buffer(key_buffer: [u8; AUTH_KEY_LENGTH]) -> Option { if let Ok(key) = String::from_utf8(Vec::from(key_buffer)) { - Some(AuthKey { - key, - valid_until: None, - }) + Some(AuthKey { key, valid_until: None }) } else { None } @@ -85,17 +85,10 @@ mod tests { #[test] fn auth_key_from_buffer() { - let auth_key = key::AuthKey::from_buffer( - [ - 89, 90, 83, 108, - 52, 108, 77, 90, - 117, 112, 82, 117, - 79, 112, 83, 82, - 67, 51, 107, 114, - 73, 75, 82, 53, - 66, 80, 66, 49, - 52, 110, 114, 74] - ); + let auth_key = key::AuthKey::from_buffer([ + 89, 90, 83, 108, 52, 108, 77, 90, 117, 112, 82, 117, 79, 112, 83, 82, 67, 51, 107, 114, 73, 75, 82, 53, 66, 80, 66, + 49, 52, 110, 114, 74, + ]); assert!(auth_key.is_some()); assert_eq!(auth_key.unwrap().key, "YZSl4lMZupRuOpSRC3krIKR5BPB14nrJ"); diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index 791e2e7d2..bbb027a35 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -1,6 +1,6 @@ -pub mod tracker; -pub mod statistics; -pub mod peer; -pub mod torrent; pub mod key; pub mod mode; +pub mod peer; +pub mod statistics; +pub mod torrent; +pub mod tracker; diff --git a/src/tracker/mode.rs b/src/tracker/mode.rs index edcb27f1c..9110b7f4f 100644 --- a/src/tracker/mode.rs +++ b/src/tracker/mode.rs @@ -1,5 +1,5 @@ use serde; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug)] pub enum TrackerMode { diff --git a/src/tracker/peer.rs b/src/tracker/peer.rs index ce4e52022..0514f41ed 100644 --- a/src/tracker/peer.rs +++ b/src/tracker/peer.rs @@ -2,11 +2,11 @@ use std::net::{IpAddr, SocketAddr}; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; use serde; -use serde::{Serialize}; +use serde::Serialize; -use crate::protocol::common::{NumberOfBytesDef, AnnounceEventDef}; -use crate::protocol::utils::ser_instant; use crate::http::AnnounceRequest; +use crate::protocol::common::{AnnounceEventDef, NumberOfBytesDef}; +use crate::protocol::utils::ser_instant; use crate::PeerId; #[derive(PartialEq, Eq, Debug, Clone, Serialize)] @@ -26,7 +26,11 @@ pub struct TorrentPeer { } impl TorrentPeer { - pub fn from_udp_announce_request(announce_request: &aquatic_udp_protocol::AnnounceRequest, remote_ip: IpAddr, host_opt_ip: Option) -> Self { + pub fn from_udp_announce_request( + announce_request: &aquatic_udp_protocol::AnnounceRequest, + remote_ip: IpAddr, + host_opt_ip: Option, + ) -> Self { let peer_addr = TorrentPeer::peer_addr_from_ip_and_port_and_opt_host_ip(remote_ip, host_opt_ip, announce_request.port.0); TorrentPeer { @@ -40,7 +44,11 @@ impl TorrentPeer { } } - pub fn from_http_announce_request(announce_request: &AnnounceRequest, remote_ip: IpAddr, host_opt_ip: Option) -> Self { + pub fn from_http_announce_request( + announce_request: &AnnounceRequest, + remote_ip: IpAddr, + host_opt_ip: Option, + ) -> Self { let peer_addr = TorrentPeer::peer_addr_from_ip_and_port_and_opt_host_ip(remote_ip, host_opt_ip, announce_request.port); let event: AnnounceEvent = if let Some(event) = &announce_request.event { @@ -48,7 +56,7 @@ impl TorrentPeer { "started" => AnnounceEvent::Started, "stopped" => AnnounceEvent::Stopped, "completed" => AnnounceEvent::Completed, - _ => AnnounceEvent::None + _ => AnnounceEvent::None, } } else { AnnounceEvent::None @@ -74,5 +82,7 @@ impl TorrentPeer { } } - pub fn is_seeder(&self) -> bool { self.left.0 <= 0 && self.event != AnnounceEvent::Stopped } + pub fn is_seeder(&self) -> bool { + self.left.0 <= 0 && self.event != AnnounceEvent::Stopped + } } diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index c67df72ec..85a2dbae9 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use tokio::sync::{mpsc, RwLock, RwLockReadGuard}; -use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::Sender; +use tokio::sync::{mpsc, RwLock, RwLockReadGuard}; const CHANNEL_BUFFER_SIZE: usize = 65_535; diff --git a/src/tracker/torrent.rs b/src/tracker/torrent.rs index 0c03e3f82..7950ce9c0 100644 --- a/src/tracker/torrent.rs +++ b/src/tracker/torrent.rs @@ -1,10 +1,10 @@ use std::net::{IpAddr, SocketAddr}; -use aquatic_udp_protocol::{AnnounceEvent}; +use aquatic_udp_protocol::AnnounceEvent; use serde::{Deserialize, Serialize}; -use crate::{MAX_SCRAPE_TORRENTS, PeerId}; use crate::peer::TorrentPeer; +use crate::{PeerId, MAX_SCRAPE_TORRENTS}; #[derive(Serialize, Deserialize, Clone)] pub struct TorrentEntry { @@ -54,11 +54,13 @@ impl TorrentEntry { // Filter out different ip_version from remote_addr Some(remote_addr) => { // Skip ip address of client - if peer.peer_addr.ip() == remote_addr.ip() { return false; } + if peer.peer_addr.ip() == remote_addr.ip() { + return false; + } match peer.peer_addr.ip() { - IpAddr::V4(_) => { remote_addr.is_ipv4() } - IpAddr::V6(_) => { remote_addr.is_ipv6() } + IpAddr::V4(_) => remote_addr.is_ipv4(), + IpAddr::V6(_) => remote_addr.is_ipv6(), } } }) @@ -73,9 +75,8 @@ impl TorrentEntry { } pub fn remove_inactive_peers(&mut self, max_peer_timeout: u32) { - self.peers.retain(|_, peer| { - peer.updated.elapsed() > std::time::Duration::from_secs(max_peer_timeout as u64) - }); + self.peers + .retain(|_, peer| peer.updated.elapsed() > std::time::Duration::from_secs(max_peer_timeout as u64)); } } diff --git a/src/tracker/tracker.rs b/src/tracker/tracker.rs index 163bfe446..51d7716fb 100644 --- a/src/tracker/tracker.rs +++ b/src/tracker/tracker.rs @@ -3,19 +3,19 @@ use std::collections::BTreeMap; use std::net::SocketAddr; use std::sync::Arc; -use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::sync::mpsc::error::SendError; +use tokio::sync::{RwLock, RwLockReadGuard}; -use crate::Configuration; -use crate::protocol::common::InfoHash; -use crate::databases::database::Database; use crate::databases::database; +use crate::databases::database::Database; use crate::mode::TrackerMode; use crate::peer::TorrentPeer; -use crate::tracker::key::AuthKey; +use crate::protocol::common::InfoHash; use crate::statistics::{StatsTracker, TrackerStatistics, TrackerStatisticsEvent}; use crate::tracker::key; +use crate::tracker::key::AuthKey; use crate::tracker::torrent::{TorrentEntry, TorrentError, TorrentStats}; +use crate::Configuration; pub struct TorrentTracker { pub config: Arc, @@ -24,7 +24,7 @@ pub struct TorrentTracker { whitelist: RwLock>, torrents: RwLock>, stats_tracker: StatsTracker, - database: Box + database: Box, } impl TorrentTracker { @@ -33,7 +33,9 @@ impl TorrentTracker { let mut stats_tracker = StatsTracker::new(); // starts a thread for updating tracker stats - if config.tracker_usage_statistics { stats_tracker.run_worker(); } + if config.tracker_usage_statistics { + stats_tracker.run_worker(); + } Ok(TorrentTracker { config: config.clone(), @@ -42,7 +44,7 @@ impl TorrentTracker { whitelist: RwLock::new(std::collections::HashSet::new()), torrents: RwLock::new(std::collections::BTreeMap::new()), stats_tracker, - database + database, }) } @@ -74,7 +76,7 @@ impl TorrentTracker { pub async fn verify_auth_key(&self, auth_key: &AuthKey) -> Result<(), key::Error> { match self.keys.read().await.get(&auth_key.key) { None => Err(key::Error::KeyInvalid), - Some(key) => key::verify_auth_key(key) + Some(key) => key::verify_auth_key(key), } } @@ -124,7 +126,9 @@ impl TorrentTracker { pub async fn authenticate_request(&self, info_hash: &InfoHash, key: &Option) -> Result<(), TorrentError> { // no authentication needed in public mode - if self.is_public() { return Ok(()); } + if self.is_public() { + return Ok(()); + } // check if auth_key is set and valid if self.is_private() { @@ -157,7 +161,9 @@ impl TorrentTracker { for (info_hash, completed) in persistent_torrents { // Skip if torrent entry already exists - if torrents.contains_key(&info_hash) { continue; } + if torrents.contains_key(&info_hash) { + continue; + } let torrent_entry = TorrentEntry { peers: Default::default(), @@ -170,14 +176,12 @@ impl TorrentTracker { Ok(()) } - pub async fn get_torrent_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr, ) -> Vec { + pub async fn get_torrent_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr) -> Vec { let read_lock = self.torrents.read().await; match read_lock.get(info_hash) { None => vec![], - Some(entry) => { - entry.get_peers(Some(client_addr)).into_iter().cloned().collect() - } + Some(entry) => entry.get_peers(Some(client_addr)).into_iter().cloned().collect(), } } @@ -185,19 +189,18 @@ impl TorrentTracker { let mut torrents = self.torrents.write().await; let torrent_entry = match torrents.entry(info_hash.clone()) { - Entry::Vacant(vacant) => { - vacant.insert(TorrentEntry::new()) - } - Entry::Occupied(entry) => { - entry.into_mut() - } + Entry::Vacant(vacant) => vacant.insert(TorrentEntry::new()), + Entry::Occupied(entry) => entry.into_mut(), }; let stats_updated = torrent_entry.update_peer(peer); // todo: move this action to a separate worker if self.config.persistent_torrent_completed_stat && stats_updated { - let _ = self.database.save_persistent_torrent(&info_hash, torrent_entry.completed).await; + let _ = self + .database + .save_persistent_torrent(&info_hash, torrent_entry.completed) + .await; } let (seeders, completed, leechers) = torrent_entry.get_stats(); @@ -231,8 +234,8 @@ impl TorrentTracker { torrent_entry.remove_inactive_peers(self.config.max_peer_timeout); match self.config.persistent_torrent_completed_stat { - true => { torrent_entry.completed > 0 || torrent_entry.peers.len() > 0 } - false => { torrent_entry.peers.len() > 0 } + true => torrent_entry.completed > 0 || torrent_entry.peers.len() > 0, + false => torrent_entry.peers.len() > 0, } }); } else { diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 860a2fe4b..907dac0bc 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -1,16 +1,19 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; -use aquatic_udp_protocol::{AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId}; +use aquatic_udp_protocol::{ + AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, NumberOfDownloads, + NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId, +}; -use crate::{InfoHash, MAX_SCRAPE_TORRENTS}; use crate::peer::TorrentPeer; -use crate::tracker::torrent::{TorrentError}; -use crate::udp::errors::ServerError; -use crate::udp::request::AnnounceRequestWrapper; +use crate::protocol::utils::get_connection_id; use crate::tracker::statistics::TrackerStatisticsEvent; +use crate::tracker::torrent::TorrentError; use crate::tracker::tracker::TorrentTracker; -use crate::protocol::utils::get_connection_id; +use crate::udp::errors::ServerError; +use crate::udp::request::AnnounceRequestWrapper; +use crate::{InfoHash, MAX_SCRAPE_TORRENTS}; pub async fn authenticate(info_hash: &InfoHash, tracker: Arc) -> Result<(), ServerError> { match tracker.authenticate_request(info_hash, &None).await { @@ -34,42 +37,38 @@ pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec, tracker: A match Request::from_bytes(&payload[..payload.len()], MAX_SCRAPE_TORRENTS).map_err(|_| ServerError::InternalServerError) { Ok(request) => { let transaction_id = match &request { - Request::Connect(connect_request) => { - connect_request.transaction_id - } - Request::Announce(announce_request) => { - announce_request.transaction_id - } - Request::Scrape(scrape_request) => { - scrape_request.transaction_id - } + Request::Connect(connect_request) => connect_request.transaction_id, + Request::Announce(announce_request) => announce_request.transaction_id, + Request::Scrape(scrape_request) => scrape_request.transaction_id, }; match handle_request(request, remote_addr, tracker).await { Ok(response) => response, - Err(e) => handle_error(e, transaction_id) + Err(e) => handle_error(e, transaction_id), } } // bad request - Err(_) => handle_error(ServerError::BadRequest, TransactionId(0)) + Err(_) => handle_error(ServerError::BadRequest, TransactionId(0)), } } -pub async fn handle_request(request: Request, remote_addr: SocketAddr, tracker: Arc) -> Result { +pub async fn handle_request( + request: Request, + remote_addr: SocketAddr, + tracker: Arc, +) -> Result { match request { - Request::Connect(connect_request) => { - handle_connect(remote_addr, &connect_request, tracker).await - } - Request::Announce(announce_request) => { - handle_announce(remote_addr, &announce_request, tracker).await - } - Request::Scrape(scrape_request) => { - handle_scrape(remote_addr, &scrape_request, tracker).await - } + Request::Connect(connect_request) => handle_connect(remote_addr, &connect_request, tracker).await, + Request::Announce(announce_request) => handle_announce(remote_addr, &announce_request, tracker).await, + Request::Scrape(scrape_request) => handle_scrape(remote_addr, &scrape_request, tracker).await, } } -pub async fn handle_connect(remote_addr: SocketAddr, request: &ConnectRequest, tracker: Arc) -> Result { +pub async fn handle_connect( + remote_addr: SocketAddr, + request: &ConnectRequest, + tracker: Arc, +) -> Result { let connection_id = get_connection_id(&remote_addr); let response = Response::from(ConnectResponse { @@ -79,26 +78,42 @@ pub async fn handle_connect(remote_addr: SocketAddr, request: &ConnectRequest, t // send stats event match remote_addr { - SocketAddr::V4(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Udp4Connect).await; } - SocketAddr::V6(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Udp6Connect).await; } + SocketAddr::V4(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Udp4Connect).await; + } + SocketAddr::V6(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Udp6Connect).await; + } } Ok(response) } -pub async fn handle_announce(remote_addr: SocketAddr, announce_request: &AnnounceRequest, tracker: Arc) -> Result { +pub async fn handle_announce( + remote_addr: SocketAddr, + announce_request: &AnnounceRequest, + tracker: Arc, +) -> Result { let wrapped_announce_request = AnnounceRequestWrapper::new(announce_request.clone()); authenticate(&wrapped_announce_request.info_hash, tracker.clone()).await?; - let peer = TorrentPeer::from_udp_announce_request(&wrapped_announce_request.announce_request, remote_addr.ip(), tracker.config.get_ext_ip()); + let peer = TorrentPeer::from_udp_announce_request( + &wrapped_announce_request.announce_request, + remote_addr.ip(), + tracker.config.get_ext_ip(), + ); //let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer).await; - let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer).await; + let torrent_stats = tracker + .update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer) + .await; // get all peers excluding the client_addr - let peers = tracker.get_torrent_peers(&wrapped_announce_request.info_hash, &peer.peer_addr).await; + let peers = tracker + .get_torrent_peers(&wrapped_announce_request.info_hash, &peer.peer_addr) + .await; let announce_response = if remote_addr.is_ipv4() { Response::from(AnnounceResponse { @@ -106,16 +121,19 @@ pub async fn handle_announce(remote_addr: SocketAddr, announce_request: &Announc announce_interval: AnnounceInterval(tracker.config.announce_interval as i32), leechers: NumberOfPeers(torrent_stats.leechers as i32), seeders: NumberOfPeers(torrent_stats.seeders as i32), - peers: peers.iter() - .filter_map(|peer| if let IpAddr::V4(ip) = peer.peer_addr.ip() { - Some(ResponsePeer:: { - ip_address: ip, - port: Port(peer.peer_addr.port()), - }) - } else { - None - } - ).collect(), + peers: peers + .iter() + .filter_map(|peer| { + if let IpAddr::V4(ip) = peer.peer_addr.ip() { + Some(ResponsePeer:: { + ip_address: ip, + port: Port(peer.peer_addr.port()), + }) + } else { + None + } + }) + .collect(), }) } else { Response::from(AnnounceResponse { @@ -123,30 +141,41 @@ pub async fn handle_announce(remote_addr: SocketAddr, announce_request: &Announc announce_interval: AnnounceInterval(tracker.config.announce_interval as i32), leechers: NumberOfPeers(torrent_stats.leechers as i32), seeders: NumberOfPeers(torrent_stats.seeders as i32), - peers: peers.iter() - .filter_map(|peer| if let IpAddr::V6(ip) = peer.peer_addr.ip() { - Some(ResponsePeer:: { - ip_address: ip, - port: Port(peer.peer_addr.port()), - }) - } else { - None - } - ).collect(), + peers: peers + .iter() + .filter_map(|peer| { + if let IpAddr::V6(ip) = peer.peer_addr.ip() { + Some(ResponsePeer:: { + ip_address: ip, + port: Port(peer.peer_addr.port()), + }) + } else { + None + } + }) + .collect(), }) }; // send stats event match remote_addr { - SocketAddr::V4(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Udp4Announce).await; } - SocketAddr::V6(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Udp6Announce).await; } + SocketAddr::V4(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Udp4Announce).await; + } + SocketAddr::V6(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Udp6Announce).await; + } } Ok(announce_response) } // todo: refactor this, db lock can be a lot shorter -pub async fn handle_scrape(remote_addr: SocketAddr, request: &ScrapeRequest, tracker: Arc) -> Result { +pub async fn handle_scrape( + remote_addr: SocketAddr, + request: &ScrapeRequest, + tracker: Arc, +) -> Result { let db = tracker.get_torrents().await; let mut torrent_stats: Vec = Vec::new(); @@ -172,13 +201,11 @@ pub async fn handle_scrape(remote_addr: SocketAddr, request: &ScrapeRequest, tra } } } - None => { - TorrentScrapeStatistics { - seeders: NumberOfPeers(0), - completed: NumberOfDownloads(0), - leechers: NumberOfPeers(0), - } - } + None => TorrentScrapeStatistics { + seeders: NumberOfPeers(0), + completed: NumberOfDownloads(0), + leechers: NumberOfPeers(0), + }, }; torrent_stats.push(scrape_entry); @@ -188,8 +215,12 @@ pub async fn handle_scrape(remote_addr: SocketAddr, request: &ScrapeRequest, tra // send stats event match remote_addr { - SocketAddr::V4(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Udp4Scrape).await; } - SocketAddr::V6(_) => { tracker.send_stats_event(TrackerStatisticsEvent::Udp6Scrape).await; } + SocketAddr::V4(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Udp4Scrape).await; + } + SocketAddr::V6(_) => { + tracker.send_stats_event(TrackerStatisticsEvent::Udp6Scrape).await; + } } Ok(Response::from(ScrapeResponse { @@ -200,5 +231,8 @@ pub async fn handle_scrape(remote_addr: SocketAddr, request: &ScrapeRequest, tra fn handle_error(e: ServerError, transaction_id: TransactionId) -> Response { let message = e.to_string(); - Response::from(ErrorResponse { transaction_id, message: message.into() }) + Response::from(ErrorResponse { + transaction_id, + message: message.into(), + }) } diff --git a/src/udp/mod.rs b/src/udp/mod.rs index 25780ba93..ae87973f1 100644 --- a/src/udp/mod.rs +++ b/src/udp/mod.rs @@ -4,9 +4,9 @@ pub use self::request::*; pub use self::server::*; pub mod errors; +pub mod handlers; pub mod request; pub mod server; -pub mod handlers; pub type Bytes = u64; pub type Port = u16; diff --git a/src/udp/server.rs b/src/udp/server.rs index bcacc2642..11cb61d99 100644 --- a/src/udp/server.rs +++ b/src/udp/server.rs @@ -62,7 +62,9 @@ impl UdpServer { debug!("{:?}", &inner[..position]); UdpServer::send_packet(socket, &remote_addr, &inner[..position]).await; } - Err(_) => { debug!("could not write response to bytes."); } + Err(_) => { + debug!("could not write response to bytes."); + } } }