diff --git a/Cargo.lock b/Cargo.lock index 7bc7233c8..eb04ba651 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -86,6 +95,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e121dee8023ce33ab248d9ce1493df03c3b38a659b240096fcbd7048ff9c31f" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide 0.4.4", + "object", + "rustc-demangle", +] + [[package]] name = "base-x" version = "0.2.8" @@ -375,6 +399,21 @@ dependencies = [ "syn", ] +[[package]] +name = "dhat" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47003dc9f6368a88e85956c3b2573a7e6872746a3e5d762a8885da3a136a0381" +dependencies = [ + "backtrace", + "lazy_static", + "parking_lot 0.11.2", + "rustc-hash", + "serde 1.0.136", + "serde_json", + "thousands", +] + [[package]] name = "digest" version = "0.9.0" @@ -459,7 +498,7 @@ dependencies = [ "crc32fast", "libc", "libz-sys", - "miniz_oxide", + "miniz_oxide 0.5.1", ] [[package]] @@ -673,6 +712,12 @@ dependencies = [ "wasi 0.10.0+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "glob" version = "0.3.0" @@ -1020,6 +1065,16 @@ dependencies = [ "unicase", ] +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "miniz_oxide" version = "0.5.1" @@ -1253,6 +1308,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.10.0" @@ -1593,6 +1657,12 @@ dependencies = [ "serde 1.0.136", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -2057,6 +2127,12 @@ dependencies = [ "syn", ] +[[package]] +name = "thousands" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" + [[package]] name = "time" version = "0.1.44" @@ -2217,9 +2293,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa" +checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7" dependencies = [ "serde 1.0.136", ] @@ -2235,6 +2311,7 @@ dependencies = [ "chrono", "config", "derive_more", + "dhat", "fern", "futures", "hex", @@ -2262,9 +2339,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.33" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" dependencies = [ "cfg-if", "log", @@ -2286,9 +2363,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.25" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dfce9f3241b150f36e8e54bb561a742d5daa1a47b5dd9a5ce369fd4a4db2210" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" dependencies = [ "lazy_static", ] diff --git a/Cargo.toml b/Cargo.toml index 81f76abe9..a10d548c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,20 +7,24 @@ description = "A feature rich BitTorrent tracker." edition = "2018" [profile.release] +debug = 1 lto = "fat" +[features] +dhat-heap = [] # if you are doing heap profiling + [dependencies] -serde = {version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde_bencode = "^0.2.3" serde_bytes = "0.11" serde_json = "1.0.72" hex = "0.4.3" percent-encoding = "2.1.0" -warp = {version = "0.3", features = ["tls"]} -tokio = {version = "1.7", features = ["full"]} +warp = { version = "0.3", features = ["tls"] } +tokio = { version = "1.7", features = ["full"] } binascii = "0.1" toml = "0.5" -log = {version = "0.4", features = ["release_max_level_info"]} +log = { version = "0.4", features = ["release_max_level_info"] } fern = "0.6" chrono = "0.4" byteorder = "1" @@ -34,3 +38,4 @@ thiserror = "1.0" aquatic_udp_protocol = { git = "https://github.com/greatest-ape/aquatic" } futures = "0.3.21" async-trait = "0.1.52" +dhat = "0.3.0" diff --git a/src/common.rs b/src/common.rs index 4d2f5ec71..5d69ed0e1 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,5 +1,5 @@ -use serde::{Deserialize, Serialize}; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; +use serde::{Deserialize, Serialize}; pub const MAX_SCRAPE_TORRENTS: u8 = 74; pub const AUTH_KEY_LENGTH: usize = 32; @@ -19,7 +19,7 @@ pub enum AnnounceEventDef { Started, Stopped, Completed, - None + None, } #[derive(Serialize, Deserialize)] @@ -135,7 +135,7 @@ impl PeerId { String::from(std::str::from_utf8(bytes_out).unwrap()) } else { "".to_string() - } + }; } } @@ -218,6 +218,7 @@ impl PeerId { } } } + impl Serialize for PeerId { fn serialize(&self, serializer: S) -> Result where diff --git a/src/config.rs b/src/config.rs index b46f29d69..ce3f59760 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,19 +1,21 @@ -pub use crate::tracker::TrackerMode; -use serde::{Serialize, Deserialize, Serializer}; use std; use std::collections::HashMap; use std::fs; -use toml; -use std::net::{IpAddr}; +use std::net::IpAddr; use std::path::Path; use std::str::FromStr; -use config::{ConfigError, Config, File}; + +use config::{Config, ConfigError, File}; +use serde::{Deserialize, Serialize, Serializer}; +use toml; + use crate::database::DatabaseDrivers; +pub use crate::tracker::TrackerMode; #[derive(Serialize, Deserialize, PartialEq)] pub enum TrackerServer { UDP, - HTTP + HTTP, } #[derive(Serialize, Deserialize, Debug)] @@ -30,7 +32,7 @@ pub struct HttpTrackerConfig { #[serde(serialize_with = "none_as_empty_string")] pub ssl_cert_path: Option, #[serde(serialize_with = "none_as_empty_string")] - pub ssl_key_path: Option + pub ssl_key_path: Option, } impl HttpTrackerConfig { @@ -114,7 +116,7 @@ impl Configuration { match Self::load(data.as_slice()) { Ok(cfg) => { Ok(cfg) - }, + } Err(e) => Err(ConfigurationError::ParseError(e)), } } @@ -158,21 +160,21 @@ impl Configuration { enabled: true, bind_address: String::from("127.0.0.1:1212"), access_tokens: [(String::from("admin"), String::from("MyAccessToken"))].iter().cloned().collect(), - } + }, }; configuration.udp_trackers.push( - UdpTrackerConfig{ + UdpTrackerConfig { enabled: false, - bind_address: String::from("0.0.0.0:6969") + bind_address: String::from("0.0.0.0:6969"), } ); configuration.http_trackers.push( - HttpTrackerConfig{ + HttpTrackerConfig { enabled: false, bind_address: String::from("0.0.0.0:6969"), ssl_enabled: false, ssl_cert_path: None, - ssl_key_path: None + ssl_key_path: None, } ); configuration @@ -190,7 +192,7 @@ impl Configuration { eprintln!("Creating config file.."); let config = Configuration::default(); let _ = config.save_to_file(); - return Err(ConfigError::Message(format!("Please edit the config.TOML in the root folder and restart the tracker."))) + return Err(ConfigError::Message(format!("Please edit the config.TOML in the root folder and restart the tracker."))); } let torrust_config: Configuration = config.try_into().map_err(|e| ConfigError::Message(format!("Errors while processing config: {}.", e)))?; @@ -198,7 +200,7 @@ impl Configuration { Ok(torrust_config) } - pub fn save_to_file(&self) -> Result<(), ()>{ + pub fn save_to_file(&self) -> Result<(), ()> { let toml_string = toml::to_string(self).expect("Could not encode TOML value"); fs::write("config.toml", toml_string).expect("Could not write to file!"); Ok(()) diff --git a/src/database.rs b/src/database.rs index 18bf41994..a90161e91 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,18 +1,20 @@ use std::collections::BTreeMap; -use crate::{InfoHash}; -use crate::key_manager::AuthKey; -use crate::sqlite_database::SqliteDatabase; + use async_trait::async_trait; use derive_more::{Display, Error}; use log::debug; +use serde::{Deserialize, Serialize}; + +use crate::InfoHash; +use crate::key_manager::AuthKey; use crate::mysql_database::MysqlDatabase; -use serde::{Serialize, Deserialize}; +use crate::sqlite_database::SqliteDatabase; use crate::torrent::TorrentEntry; #[derive(Serialize, Deserialize, Debug)] pub enum DatabaseDrivers { Sqlite3, - MySQL + MySQL, } pub fn connect_database(db_driver: &DatabaseDrivers, db_path: &str) -> Result, r2d2::Error> { diff --git a/src/http_api_server.rs b/src/http_api_server.rs index eff45fc33..89505cb09 100644 --- a/src/http_api_server.rs +++ b/src/http_api_server.rs @@ -1,10 +1,13 @@ -use crate::tracker::{TorrentTracker}; -use serde::{Deserialize, Serialize}; use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use warp::{filters, reply, reply::Reply, serve, Filter, Server}; + +use serde::{Deserialize, Serialize}; +use warp::{Filter, filters, reply, reply::Reply, serve, Server}; + use crate::torrent::TorrentPeer; +use crate::tracker::TorrentTracker; + use super::common::*; #[derive(Deserialize, Debug)] @@ -52,7 +55,7 @@ enum ActionStatus<'a> { impl warp::reject::Reject for ActionStatus<'static> {} -fn authenticate(tokens: HashMap) -> impl Filter + Clone { +fn authenticate(tokens: HashMap) -> impl Filter + Clone { #[derive(Deserialize)] struct AuthToken { token: Option, @@ -69,7 +72,7 @@ fn authenticate(tokens: HashMap) -> impl Filter { if !tokens.contains(&token) { - return Err(warp::reject::custom(ActionStatus::Err { reason: "token not valid".into() })) + return Err(warp::reject::custom(ActionStatus::Err { reason: "token not valid".into() })); } Ok(()) @@ -81,7 +84,7 @@ fn authenticate(tokens: HashMap) -> impl Filter) -> Server + Clone + Send + Sync + 'static> { +pub fn build_server(tracker: Arc) -> Server + Clone + Send + Sync + 'static> { // GET /api/torrents?offset=:u32&limit=:u32 // View torrent list let api_torrents = tracker.clone(); @@ -131,7 +134,7 @@ pub fn build_server(tracker: Arc) -> Server| { async move { - let mut results = Stats{ + let mut results = Stats { torrents: 0, seeders: 0, completed: 0, @@ -147,7 +150,7 @@ pub fn build_server(tracker: Arc) -> Server = db @@ -195,7 +198,7 @@ pub fn build_server(tracker: Arc) -> Server) -> Server)| { async move { - match tracker.remove_torrent_from_whitelist(&info_hash).await { - Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), - Err(_) => Err(warp::reject::custom(ActionStatus::Err { reason: "failed to remove torrent from whitelist".into() })) - } + match tracker.remove_torrent_from_whitelist(&info_hash).await { + Ok(_) => Ok(warp::reply::json(&ActionStatus::Ok)), + Err(_) => Err(warp::reject::custom(ActionStatus::Err { reason: "failed to remove torrent from whitelist".into() })) + } } }); diff --git a/src/key_manager.rs b/src/key_manager.rs index b1f16f1dc..507402358 100644 --- a/src/key_manager.rs +++ b/src/key_manager.rs @@ -1,10 +1,12 @@ -use super::common::AUTH_KEY_LENGTH; -use crate::utils::current_time; -use rand::{thread_rng, Rng}; +use derive_more::{Display, Error}; +use log::debug; +use rand::{Rng, thread_rng}; use rand::distributions::Alphanumeric; use serde::Serialize; -use log::debug; -use derive_more::{Display, Error}; + +use crate::utils::current_time; + +use super::common::AUTH_KEY_LENGTH; pub fn generate_auth_key(seconds_valid: u64) -> AuthKey { let key: String = thread_rng() @@ -23,8 +25,8 @@ pub fn generate_auth_key(seconds_valid: u64) -> AuthKey { pub fn verify_auth_key(auth_key: &AuthKey) -> Result<(), Error> { let current_time = current_time(); - if auth_key.valid_until.is_none() { return Err(Error::KeyInvalid) } - if auth_key.valid_until.unwrap() < current_time { return Err(Error::KeyExpired) } + if auth_key.valid_until.is_none() { return Err(Error::KeyInvalid); } + if auth_key.valid_until.unwrap() < current_time { return Err(Error::KeyExpired); } Ok(()) } @@ -67,7 +69,7 @@ pub enum Error { #[display(fmt = "Key is invalid.")] KeyInvalid, #[display(fmt = "Key has expired.")] - KeyExpired + KeyExpired, } impl From for Error { diff --git a/src/lib.rs b/src/lib.rs index 3d928aff4..b6cebfc5e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,11 @@ +pub use torrust_http_tracker::server::*; +pub use torrust_udp_tracker::server::*; + +pub use self::common::*; +pub use self::config::*; +pub use self::http_api_server::*; +pub use self::tracker::*; + pub mod config; pub mod tracker; pub mod http_api_server; @@ -13,9 +21,3 @@ pub mod mysql_database; pub mod torrent; pub mod tracker_stats; -pub use self::config::*; -pub use torrust_udp_tracker::server::*; -pub use torrust_http_tracker::server::*; -pub use self::tracker::*; -pub use self::http_api_server::*; -pub use self::common::*; diff --git a/src/logging.rs b/src/logging.rs index 580e35094..c2e77551f 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -1,4 +1,5 @@ use log::info; + use crate::Configuration; pub fn setup_logging(cfg: &Configuration) { diff --git a/src/main.rs b/src/main.rs index 9ba45427e..b17ef14fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,21 @@ use std::net::SocketAddr; use std::sync::Arc; -use log::{info}; + +use log::info; use tokio::task::JoinHandle; + use torrust_tracker::{Configuration, http_api_server, HttpApiConfig, HttpTrackerConfig, logging, TorrentTracker, UdpServer, UdpTrackerConfig}; use torrust_tracker::torrust_http_tracker::server::HttpServer; +#[cfg(feature = "dhat-heap")] +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + #[tokio::main] async fn main() { + #[cfg(feature = "dhat-heap")] + let _profiler = dhat::Profiler::new_heap(); + // torrust config let config = match Configuration::load_from_file() { Ok(config) => Arc::new(config), diff --git a/src/mysql_database.rs b/src/mysql_database.rs index 0597d46aa..7ecae214a 100644 --- a/src/mysql_database.rs +++ b/src/mysql_database.rs @@ -1,18 +1,20 @@ use std::collections::BTreeMap; -use crate::{InfoHash, AUTH_KEY_LENGTH, database}; -use log::debug; -use r2d2::{Pool}; -use crate::key_manager::AuthKey; use std::str::FromStr; -use crate::database::Database; + use async_trait::async_trait; +use log::debug; +use r2d2::Pool; use r2d2_mysql::mysql::{Opts, OptsBuilder, params, TxOpts}; use r2d2_mysql::mysql::prelude::Queryable; use r2d2_mysql::MysqlConnectionManager; + +use crate::{AUTH_KEY_LENGTH, database, InfoHash}; +use crate::database::{Database, Error}; +use crate::key_manager::AuthKey; use crate::torrent::TorrentEntry; pub struct MysqlDatabase { - pool: Pool + pool: Pool, } impl MysqlDatabase { @@ -34,20 +36,20 @@ impl Database for MysqlDatabase { let create_whitelist_table = " CREATE TABLE IF NOT EXISTS whitelist ( id integer PRIMARY KEY AUTO_INCREMENT, - info_hash VARCHAR(20) NOT NULL UNIQUE + info_hash BINARY(20) NOT NULL UNIQUE );".to_string(); let create_torrents_table = " CREATE TABLE IF NOT EXISTS torrents ( id integer PRIMARY KEY AUTO_INCREMENT, - info_hash VARCHAR(20) NOT NULL UNIQUE, + info_hash BINARY(20) NOT NULL UNIQUE, completed INTEGER DEFAULT 0 NOT NULL );".to_string(); let create_keys_table = format!(" CREATE TABLE IF NOT EXISTS `keys` ( `id` INT NOT NULL AUTO_INCREMENT, - `key` VARCHAR({}) NOT NULL, + `key` BINARY({}) NOT NULL, `valid_until` INT(10) NOT NULL, PRIMARY KEY (`id`), UNIQUE (`key`) @@ -65,7 +67,7 @@ impl Database for MysqlDatabase { async fn load_persistent_torrent_data(&self) -> Result, database::Error> { let mut conn = self.pool.get().map_err(|_| database::Error::InvalidQuery)?; - let torrents: Vec<(InfoHash, u32)> = conn.query_map("SELECT info_hash, completed FROM torrents", |(info_hash_string, completed): (String, u32)| { + let torrents: Vec<(InfoHash, u32)> = conn.query_map("SELECT HEX(info_hash), completed FROM torrents", |(info_hash_string, completed): (String, u32)| { let info_hash = InfoHash::from_str(&info_hash_string).unwrap(); (info_hash, completed) }).map_err(|_| database::Error::QueryReturnedNoRows)?; @@ -79,11 +81,16 @@ impl Database for MysqlDatabase { let mut db_transaction = conn.start_transaction(TxOpts::default()).map_err(|_| database::Error::DatabaseError)?; for (info_hash, torrent_entry) in torrents { - let (_seeders, completed, _leechers) = torrent_entry.get_stats(); - let _ = db_transaction.exec_drop("INSERT OR REPLACE INTO torrents (info_hash, completed) VALUES (?, ?)", (info_hash.to_string(), completed.to_string())); + let (_seeders, completed, _leechers) = torrent_entry.get_stats(); + if db_transaction.exec_drop("INSERT INTO torrents (info_hash, completed) VALUES (UNHEX(?), ?) ON DUPLICATE KEY UPDATE completed = completed", (info_hash.to_string(), completed.to_string())).is_err() { + return Err(Error::InvalidQuery); + } + debug!("INSERT INTO torrents (info_hash, completed) VALUES (UNHEX('{}'), {}) ON DUPLICATE KEY UPDATE completed = completed", info_hash.to_string(), completed.to_string()); } - let _ = db_transaction.commit(); + if db_transaction.commit().is_err() { + return Err(Error::DatabaseError); + }; Ok(()) } @@ -91,15 +98,15 @@ impl Database for MysqlDatabase { async fn get_info_hash_from_whitelist(&self, info_hash: &str) -> Result { let mut conn = self.pool.get().map_err(|_| database::Error::InvalidQuery)?; - match conn.exec_first::("SELECT info_hash FROM whitelist WHERE info_hash = :info_hash", params! { info_hash => info_hash }) + match conn.exec_first::("SELECT HEX(info_hash) FROM whitelist WHERE info_hash = UNHEX(:info_hash)", params! { info_hash => info_hash }) .map_err(|_| database::Error::QueryReturnedNoRows)? { - Some(info_hash) => { - Ok(InfoHash::from_str(&info_hash).unwrap()) - }, - None => { - Err(database::Error::InvalidQuery) - } + Some(info_hash) => { + Ok(InfoHash::from_str(&info_hash).unwrap()) + } + None => { + Err(database::Error::InvalidQuery) } + } } async fn add_info_hash_to_whitelist(&self, info_hash: InfoHash) -> Result { @@ -107,10 +114,10 @@ impl Database for MysqlDatabase { let info_hash_str = info_hash.to_string(); - match conn.exec_drop("INSERT INTO whitelist (info_hash) VALUES (:info_hash_str)", params! { info_hash_str }) { + match conn.exec_drop("INSERT INTO whitelist (info_hash) VALUES (UNHEX(:info_hash_str))", params! { info_hash_str }) { Ok(_) => { Ok(1) - }, + } Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -123,10 +130,10 @@ impl Database for MysqlDatabase { let info_hash = info_hash.to_string(); - match conn.exec_drop("DELETE FROM whitelist WHERE info_hash = :info_hash", params! { info_hash }) { + match conn.exec_drop("DELETE FROM whitelist WHERE info_hash = UNHEX(:info_hash)", params! { info_hash }) { Ok(_) => { Ok(1) - }, + } Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -142,9 +149,9 @@ impl Database for MysqlDatabase { Some((key, valid_until)) => { Ok(AuthKey { key, - valid_until: Some(valid_until as u64) + valid_until: Some(valid_until as u64), }) - }, + } None => { Err(database::Error::InvalidQuery) } @@ -160,7 +167,7 @@ impl Database for MysqlDatabase { match conn.exec_drop("INSERT INTO `keys` (`key`, valid_until) VALUES (:key, :valid_until)", params! { key, valid_until }) { Ok(_) => { Ok(1) - }, + } Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -174,7 +181,7 @@ impl Database for MysqlDatabase { match conn.exec_drop("DELETE FROM `keys` WHERE key = :key", params! { key }) { Ok(_) => { Ok(1) - }, + } Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) diff --git a/src/sqlite_database.rs b/src/sqlite_database.rs index 5facd99d8..fa519ffd0 100644 --- a/src/sqlite_database.rs +++ b/src/sqlite_database.rs @@ -1,17 +1,19 @@ use std::collections::BTreeMap; -use crate::{InfoHash, AUTH_KEY_LENGTH, database}; +use std::str::FromStr; + +use async_trait::async_trait; use log::debug; -use r2d2_sqlite::{SqliteConnectionManager}; -use r2d2::{Pool}; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; use r2d2_sqlite::rusqlite::NO_PARAMS; -use crate::key_manager::AuthKey; -use std::str::FromStr; + +use crate::{AUTH_KEY_LENGTH, database, InfoHash}; use crate::database::Database; -use async_trait::async_trait; +use crate::key_manager::AuthKey; use crate::torrent::TorrentEntry; pub struct SqliteDatabase { - pool: Pool + pool: Pool, } impl SqliteDatabase { @@ -68,7 +70,7 @@ impl Database for SqliteDatabase { Ok((info_hash, completed)) })?; - let torrents: Vec<(InfoHash, u32)> = torrent_iter.filter_map(|x| x.ok() ).collect(); + let torrents: Vec<(InfoHash, u32)> = torrent_iter.filter_map(|x| x.ok()).collect(); Ok(torrents) } @@ -79,8 +81,8 @@ impl Database for SqliteDatabase { let db_transaction = conn.transaction()?; for (info_hash, torrent_entry) in torrents { - let (_seeders, completed, _leechers) = torrent_entry.get_stats(); - let _ = db_transaction.execute("INSERT OR REPLACE INTO torrents (info_hash, completed) VALUES (?, ?)", &[info_hash.to_string(), completed.to_string()]); + let (_seeders, completed, _leechers) = torrent_entry.get_stats(); + let _ = db_transaction.execute("INSERT OR REPLACE INTO torrents (info_hash, completed) VALUES (?, ?)", &[info_hash.to_string(), completed.to_string()]); } let _ = db_transaction.commit(); @@ -109,9 +111,9 @@ impl Database for SqliteDatabase { match conn.execute("INSERT INTO whitelist (info_hash) VALUES (?)", &[info_hash.to_string()]) { Ok(updated) => { - if updated > 0 { return Ok(updated) } + if updated > 0 { return Ok(updated); } Err(database::Error::QueryReturnedNoRows) - }, + } Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -124,9 +126,9 @@ impl Database for SqliteDatabase { match conn.execute("DELETE FROM whitelist WHERE info_hash = ?", &[info_hash.to_string()]) { Ok(updated) => { - if updated > 0 { return Ok(updated) } + if updated > 0 { return Ok(updated); } Err(database::Error::QueryReturnedNoRows) - }, + } Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -146,7 +148,7 @@ impl Database for SqliteDatabase { Ok(AuthKey { key, - valid_until: Some(valid_until_i64 as u64) + valid_until: Some(valid_until_i64 as u64), }) } else { Err(database::Error::QueryReturnedNoRows) @@ -157,12 +159,12 @@ impl Database for SqliteDatabase { let conn = self.pool.get().map_err(|_| database::Error::InvalidQuery)?; match conn.execute("INSERT INTO keys (key, valid_until) VALUES (?1, ?2)", - &[auth_key.key.to_string(), auth_key.valid_until.unwrap().to_string()] + &[auth_key.key.to_string(), auth_key.valid_until.unwrap().to_string()], ) { Ok(updated) => { - if updated > 0 { return Ok(updated) } + if updated > 0 { return Ok(updated); } Err(database::Error::QueryReturnedNoRows) - }, + } Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) @@ -175,9 +177,9 @@ impl Database for SqliteDatabase { match conn.execute("DELETE FROM keys WHERE key = ?", &[key]) { Ok(updated) => { - if updated > 0 { return Ok(updated) } + if updated > 0 { return Ok(updated); } Err(database::Error::QueryReturnedNoRows) - }, + } Err(e) => { debug!("{:?}", e); Err(database::Error::InvalidQuery) diff --git a/src/torrent.rs b/src/torrent.rs index ef933d224..e2984a490 100644 --- a/src/torrent.rs +++ b/src/torrent.rs @@ -1,10 +1,12 @@ use std::borrow::Cow; use std::net::{IpAddr, SocketAddr}; + use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; + use crate::{InfoHash, MAX_SCRAPE_TORRENTS, PeerId}; -use crate::torrust_http_tracker::AnnounceRequest; use crate::common::{AnnounceEventDef, NumberOfBytesDef}; +use crate::torrust_http_tracker::AnnounceRequest; #[derive(PartialEq, Eq, Debug, Clone, Serialize)] pub struct TorrentPeer { @@ -33,7 +35,7 @@ impl TorrentPeer { uploaded: announce_request.bytes_uploaded, downloaded: announce_request.bytes_downloaded, left: announce_request.bytes_left, - event: announce_request.event + event: announce_request.event, } } @@ -58,7 +60,7 @@ impl TorrentPeer { uploaded: NumberOfBytes(announce_request.uploaded as i64), downloaded: NumberOfBytes(announce_request.downloaded as i64), left: NumberOfBytes(announce_request.left as i64), - event + event, } } diff --git a/src/torrust_http_tracker/errors.rs b/src/torrust_http_tracker/errors.rs index d8d6c7623..fe0cf26e6 100644 --- a/src/torrust_http_tracker/errors.rs +++ b/src/torrust_http_tracker/errors.rs @@ -1,5 +1,5 @@ -use warp::reject::Reject; use thiserror::Error; +use warp::reject::Reject; #[derive(Error, Debug)] pub enum ServerError { diff --git a/src/torrust_http_tracker/filters.rs b/src/torrust_http_tracker/filters.rs index 61fa20a45..5c4fc9743 100644 --- a/src/torrust_http_tracker/filters.rs +++ b/src/torrust_http_tracker/filters.rs @@ -2,43 +2,45 @@ use std::convert::Infallible; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use std::sync::Arc; + use log::debug; use warp::{Filter, reject, Rejection}; + use crate::{InfoHash, MAX_SCRAPE_TORRENTS, PeerId, TorrentTracker}; use crate::key_manager::AuthKey; use crate::torrust_http_tracker::{AnnounceRequest, AnnounceRequestQuery, ScrapeRequest, ServerError, WebResult}; /// Pass Arc along -pub fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { +pub fn with_tracker(tracker: Arc) -> impl Filter, ), Error=Infallible> + Clone { warp::any() .map(move || tracker.clone()) } /// Check for infoHash -pub fn with_info_hash() -> impl Filter,), Error = Rejection> + Clone { +pub fn with_info_hash() -> impl Filter, ), Error=Rejection> + Clone { warp::filters::query::raw() .and_then(info_hashes) } /// Check for PeerId -pub fn with_peer_id() -> impl Filter + Clone { +pub fn with_peer_id() -> impl Filter + Clone { warp::filters::query::raw() .and_then(peer_id) } /// Pass Arc along -pub fn with_auth_key() -> impl Filter,), Error = Infallible> + Clone { +pub fn with_auth_key() -> impl Filter, ), Error=Infallible> + Clone { warp::path::param::() .map(|key: String| { AuthKey::from_string(&key) }) .or_else(|_| async { - Ok::<(Option,), Infallible>((None,)) + Ok::<(Option, ), Infallible>((None, )) }) } /// Check for PeerAddress -pub fn with_peer_addr(on_reverse_proxy: bool) -> impl Filter + Clone { +pub fn with_peer_addr(on_reverse_proxy: bool) -> impl Filter + Clone { warp::addr::remote() .and(warp::header::optional::("X-Forwarded-For")) .map(move |remote_addr: Option, x_forwarded_for: Option| { @@ -48,7 +50,7 @@ pub fn with_peer_addr(on_reverse_proxy: bool) -> impl Filter impl Filter + Clone { +pub fn with_announce_request(on_reverse_proxy: bool) -> impl Filter + Clone { warp::filters::query::query::() .and(with_info_hash()) .and(with_peer_id()) @@ -57,7 +59,7 @@ pub fn with_announce_request(on_reverse_proxy: bool) -> impl Filter impl Filter + Clone { +pub fn with_scrape_request(on_reverse_proxy: bool) -> impl Filter + Clone { warp::any() .and(with_info_hash()) .and(with_peer_addr(on_reverse_proxy)) @@ -129,11 +131,11 @@ async fn peer_id(raw_query: String) -> WebResult { /// Get PeerAddress from RemoteAddress or Forwarded async fn peer_addr((on_reverse_proxy, remote_addr, x_forwarded_for): (bool, Option, Option)) -> WebResult { if !on_reverse_proxy && remote_addr.is_none() { - return Err(reject::custom(ServerError::AddressNotFound)) + return Err(reject::custom(ServerError::AddressNotFound)); } if on_reverse_proxy && x_forwarded_for.is_none() { - return Err(reject::custom(ServerError::AddressNotFound)) + return Err(reject::custom(ServerError::AddressNotFound)); } match on_reverse_proxy { @@ -150,7 +152,7 @@ async fn peer_addr((on_reverse_proxy, remote_addr, x_forwarded_for): (bool, Opti debug!("{}", e); Err(reject::custom(ServerError::AddressNotFound)) }) - }, + } false => Ok(remote_addr.unwrap().ip()) } } @@ -166,7 +168,7 @@ async fn announce_request(announce_request_query: AnnounceRequestQuery, info_has port: announce_request_query.port, left: announce_request_query.left.unwrap_or(0), event: announce_request_query.event, - compact: announce_request_query.compact + compact: announce_request_query.compact, }) } diff --git a/src/torrust_http_tracker/handlers.rs b/src/torrust_http_tracker/handlers.rs index 8e8f2576f..9021e8858 100644 --- a/src/torrust_http_tracker/handlers.rs +++ b/src/torrust_http_tracker/handlers.rs @@ -2,9 +2,11 @@ use std::collections::HashMap; use std::convert::Infallible; use std::net::IpAddr; use std::sync::Arc; + use log::debug; use warp::{reject, Rejection, Reply}; -use warp::http::{Response}; +use warp::http::Response; + use crate::{InfoHash, TorrentTracker}; use crate::key_manager::AuthKey; use crate::torrent::{TorrentError, TorrentPeer, TorrentStats}; @@ -34,7 +36,7 @@ pub async fn authenticate(info_hash: &InfoHash, auth_key: &Option, trac /// Handle announce request pub async fn handle_announce(announce_request: AnnounceRequest, auth_key: Option, tracker: Arc) -> WebResult { if let Err(e) = authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await { - return Err(reject::custom(e)) + return Err(reject::custom(e)); } debug!("{:?}", announce_request); @@ -63,7 +65,7 @@ pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option { @@ -94,7 +96,7 @@ fn send_announce_response(announce_request: &AnnounceRequest, torrent_stats: Tor let http_peers: Vec = peers.iter().map(|peer| Peer { peer_id: peer.peer_id.to_string(), ip: peer.peer_addr.ip(), - port: peer.peer_addr.port() + port: peer.peer_addr.port(), }).collect(); let res = AnnounceResponse { @@ -102,7 +104,7 @@ fn send_announce_response(announce_request: &AnnounceRequest, torrent_stats: Tor interval_min, complete: torrent_stats.seeders, incomplete: torrent_stats.leechers, - peers: http_peers + peers: http_peers, }; // check for compact response request diff --git a/src/torrust_http_tracker/mod.rs b/src/torrust_http_tracker/mod.rs index ea6675dce..07d077577 100644 --- a/src/torrust_http_tracker/mod.rs +++ b/src/torrust_http_tracker/mod.rs @@ -1,3 +1,11 @@ +pub use self::errors::*; +pub use self::filters::*; +pub use self::handlers::*; +pub use self::request::*; +pub use self::response::*; +pub use self::routes::*; +pub use self::server::*; + pub mod server; pub mod request; pub mod response; @@ -6,13 +14,5 @@ pub mod routes; pub mod handlers; pub mod filters; -pub use self::server::*; -pub use self::request::*; -pub use self::response::*; -pub use self::errors::*; -pub use self::routes::*; -pub use self::handlers::*; -pub use self::filters::*; - pub type Bytes = u64; pub type WebResult = std::result::Result; diff --git a/src/torrust_http_tracker/request.rs b/src/torrust_http_tracker/request.rs index 0fb316671..487e53a13 100644 --- a/src/torrust_http_tracker/request.rs +++ b/src/torrust_http_tracker/request.rs @@ -1,5 +1,7 @@ -use std::net::{IpAddr}; -use serde::{Deserialize}; +use std::net::IpAddr; + +use serde::Deserialize; + use crate::{InfoHash, PeerId}; use crate::torrust_http_tracker::Bytes; diff --git a/src/torrust_http_tracker/response.rs b/src/torrust_http_tracker/response.rs index af27bc5e9..f57129cde 100644 --- a/src/torrust_http_tracker/response.rs +++ b/src/torrust_http_tracker/response.rs @@ -2,7 +2,8 @@ use std::collections::HashMap; use std::error::Error; use std::io::Write; use std::net::IpAddr; -use serde::{Serialize}; + +use serde::Serialize; #[derive(Serialize)] pub struct Peer { @@ -18,7 +19,7 @@ pub struct AnnounceResponse { //pub tracker_id: String, pub complete: u32, pub incomplete: u32, - pub peers: Vec + pub peers: Vec, } impl AnnounceResponse { @@ -75,7 +76,7 @@ pub struct ScrapeResponseEntry { #[derive(Serialize)] pub struct ScrapeResponse { - pub files: HashMap + pub files: HashMap, } impl ScrapeResponse { @@ -87,7 +88,7 @@ impl ScrapeResponse { #[derive(Serialize)] pub struct ErrorResponse { #[serde(rename = "failure reason")] - pub failure_reason: String + pub failure_reason: String, } impl ErrorResponse { diff --git a/src/torrust_http_tracker/routes.rs b/src/torrust_http_tracker/routes.rs index 4b4de722f..fb6bf5c16 100644 --- a/src/torrust_http_tracker/routes.rs +++ b/src/torrust_http_tracker/routes.rs @@ -1,11 +1,13 @@ use std::convert::Infallible; use std::sync::Arc; + use warp::{Filter, Rejection}; + use crate::TorrentTracker; -use crate::torrust_http_tracker::{handle_announce, send_error, handle_scrape, with_announce_request, with_auth_key, with_scrape_request, with_tracker}; +use crate::torrust_http_tracker::{handle_announce, handle_scrape, send_error, with_announce_request, with_auth_key, with_scrape_request, with_tracker}; /// All routes -pub fn routes(tracker: Arc,) -> impl Filter + Clone { +pub fn routes(tracker: Arc) -> impl Filter + Clone { root(tracker.clone()) .or(announce(tracker.clone())) .or(scrape(tracker.clone())) @@ -13,7 +15,7 @@ pub fn routes(tracker: Arc,) -> impl Filter -fn root(tracker: Arc,) -> impl Filter + Clone { +fn root(tracker: Arc) -> impl Filter + Clone { warp::any() .and(warp::filters::method::get()) .and(with_announce_request(tracker.config.on_reverse_proxy)) @@ -23,7 +25,7 @@ fn root(tracker: Arc,) -> impl Filter -fn announce(tracker: Arc,) -> impl Filter + Clone { +fn announce(tracker: Arc) -> impl Filter + Clone { warp::path::path("announce") .and(warp::filters::method::get()) .and(with_announce_request(tracker.config.on_reverse_proxy)) @@ -33,7 +35,7 @@ fn announce(tracker: Arc,) -> impl Filter -fn scrape(tracker: Arc,) -> impl Filter + Clone { +fn scrape(tracker: Arc) -> impl Filter + Clone { warp::path::path("scrape") .and(warp::filters::method::get()) .and(with_scrape_request(tracker.config.on_reverse_proxy)) diff --git a/src/torrust_http_tracker/server.rs b/src/torrust_http_tracker/server.rs index 69811b3d9..336670030 100644 --- a/src/torrust_http_tracker/server.rs +++ b/src/torrust_http_tracker/server.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; use std::sync::Arc; + use crate::TorrentTracker; use crate::torrust_http_tracker::routes; diff --git a/src/torrust_udp_tracker/handlers.rs b/src/torrust_udp_tracker/handlers.rs index c94e2e917..ff6e8981b 100644 --- a/src/torrust_udp_tracker/handlers.rs +++ b/src/torrust_udp_tracker/handlers.rs @@ -1,6 +1,8 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; + use aquatic_udp_protocol::{AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId}; + use crate::{InfoHash, MAX_SCRAPE_TORRENTS, TorrentTracker}; use crate::torrent::{TorrentError, TorrentPeer}; use crate::torrust_udp_tracker::errors::ServerError; @@ -103,15 +105,15 @@ pub async fn handle_announce(remote_addr: SocketAddr, announce_request: &Announc leechers: NumberOfPeers(torrent_stats.leechers as i32), seeders: NumberOfPeers(torrent_stats.seeders as i32), peers: peers.iter() - .filter_map(|peer| if let IpAddr::V4(ip) = peer.peer_addr.ip() { + .filter_map(|peer| if let IpAddr::V4(ip) = peer.peer_addr.ip() { Some(ResponsePeer:: { ip_address: ip, - port: Port(peer.peer_addr.port()) + port: Port(peer.peer_addr.port()), }) } else { None } - ).collect() + ).collect(), }) } else { Response::from(AnnounceResponse { @@ -120,15 +122,15 @@ pub async fn handle_announce(remote_addr: SocketAddr, announce_request: &Announc leechers: NumberOfPeers(torrent_stats.leechers as i32), seeders: NumberOfPeers(torrent_stats.seeders as i32), peers: peers.iter() - .filter_map(|peer| if let IpAddr::V6(ip) = peer.peer_addr.ip() { + .filter_map(|peer| if let IpAddr::V6(ip) = peer.peer_addr.ip() { Some(ResponsePeer:: { ip_address: ip, - port: Port(peer.peer_addr.port()) + port: Port(peer.peer_addr.port()), }) } else { None } - ).collect() + ).collect(), }) }; @@ -150,7 +152,7 @@ pub async fn handle_scrape(remote_addr: SocketAddr, request: &ScrapeRequest, tra for info_hash in request.info_hashes.iter() { let info_hash = InfoHash(info_hash.0); - if authenticate(&info_hash, tracker.clone()).await.is_err() { continue } + if authenticate(&info_hash, tracker.clone()).await.is_err() { continue; } let scrape_entry = match db.get(&info_hash) { Some(torrent_info) => { @@ -182,7 +184,7 @@ pub async fn handle_scrape(remote_addr: SocketAddr, request: &ScrapeRequest, tra Ok(Response::from(ScrapeResponse { transaction_id: request.transaction_id, - torrent_stats + torrent_stats, })) } diff --git a/src/torrust_udp_tracker/mod.rs b/src/torrust_udp_tracker/mod.rs index 6aa5fbce0..25780ba93 100644 --- a/src/torrust_udp_tracker/mod.rs +++ b/src/torrust_udp_tracker/mod.rs @@ -1,13 +1,13 @@ +pub use self::errors::*; +pub use self::handlers::*; +pub use self::request::*; +pub use self::server::*; + pub mod errors; pub mod request; pub mod server; pub mod handlers; -pub use self::errors::*; -pub use self::request::*; -pub use self::server::*; -pub use self::handlers::*; - pub type Bytes = u64; pub type Port = u16; pub type TransactionId = i64; diff --git a/src/torrust_udp_tracker/request.rs b/src/torrust_udp_tracker/request.rs index f3f67fdc1..6531f54b9 100644 --- a/src/torrust_udp_tracker/request.rs +++ b/src/torrust_udp_tracker/request.rs @@ -1,5 +1,6 @@ -use aquatic_udp_protocol::{AnnounceRequest}; -use crate::{InfoHash}; +use aquatic_udp_protocol::AnnounceRequest; + +use crate::InfoHash; // struct AnnounceRequest { // pub connection_id: i64, @@ -25,7 +26,7 @@ impl AnnounceRequestWrapper { pub fn new(announce_request: AnnounceRequest) -> Self { AnnounceRequestWrapper { announce_request: announce_request.clone(), - info_hash: InfoHash(announce_request.info_hash.0) + info_hash: InfoHash(announce_request.info_hash.0), } } } diff --git a/src/torrust_udp_tracker/server.rs b/src/torrust_udp_tracker/server.rs index cae1e5b94..8dc34d85d 100644 --- a/src/torrust_udp_tracker/server.rs +++ b/src/torrust_udp_tracker/server.rs @@ -1,10 +1,12 @@ use std::io::Cursor; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; -use aquatic_udp_protocol::{Response}; + +use aquatic_udp_protocol::Response; use log::{debug, info}; use tokio::net::UdpSocket; -use crate::{TorrentTracker}; + +use crate::TorrentTracker; use crate::torrust_udp_tracker::{handle_packet, MAX_PACKET_SIZE}; pub struct UdpServer { diff --git a/src/tracker.rs b/src/tracker.rs index 7a036c1af..0e42f69e1 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -1,16 +1,18 @@ +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use log::info; use serde::{Deserialize, Serialize}; use serde; -use std::collections::BTreeMap; use tokio::sync::{RwLock, RwLockReadGuard}; -use crate::common::{InfoHash}; -use std::net::{SocketAddr}; + use crate::{Configuration, database, key_manager}; -use std::collections::btree_map::Entry; -use std::sync::Arc; -use log::info; +use crate::common::InfoHash; +use crate::database::Database; use tokio::sync::mpsc::error::SendError; use crate::key_manager::AuthKey; -use crate::database::{Database}; use crate::key_manager::Error::KeyInvalid; use crate::torrent::{TorrentEntry, TorrentError, TorrentPeer, TorrentStats}; use crate::tracker_stats::{StatsTracker, TrackerStats, TrackerStatsEvent}; @@ -34,7 +36,6 @@ pub enum TrackerMode { PrivateListedMode, } - pub struct TorrentTracker { mode: TrackerMode, pub config: Arc, @@ -80,7 +81,7 @@ impl TorrentTracker { let auth_key = key_manager::generate_auth_key(seconds_valid); // add key to database - if let Err(error) = self.database.add_key_to_keys(&auth_key).await { return Err(error) } + if let Err(error) = self.database.add_key_to_keys(&auth_key).await { return Err(error); } Ok(auth_key) } @@ -96,18 +97,18 @@ impl TorrentTracker { pub async fn authenticate_request(&self, info_hash: &InfoHash, key: &Option) -> Result<(), TorrentError> { // no authentication needed in public mode - if self.is_public() { return Ok(()) } + if self.is_public() { return Ok(()); } // check if auth_key is set and valid if self.is_private() { match key { Some(key) => { if self.verify_auth_key(key).await.is_err() { - return Err(TorrentError::PeerKeyNotValid) + return Err(TorrentError::PeerKeyNotValid); } } None => { - return Err(TorrentError::PeerNotAuthenticated) + return Err(TorrentError::PeerNotAuthenticated); } } } @@ -115,7 +116,7 @@ impl TorrentTracker { // check if info_hash is whitelisted if self.is_whitelisted() { if self.is_info_hash_whitelisted(info_hash).await == false { - return Err(TorrentError::TorrentNotWhitelisted) + return Err(TorrentError::TorrentNotWhitelisted); } } @@ -160,7 +161,7 @@ impl TorrentTracker { pub async fn get_torrent_peers( &self, info_hash: &InfoHash, - peer_addr: &SocketAddr + peer_addr: &SocketAddr, ) -> Vec { let read_lock = self.torrents.read().await; match read_lock.get(info_hash) { @@ -210,7 +211,7 @@ impl TorrentTracker { let torrent_entry = TorrentEntry { peers: Default::default(), completed, - seeders + seeders, }; torrents.insert(info_hash.clone(), torrent_entry); } @@ -250,6 +251,7 @@ impl TorrentTracker { // remove torrents without peers if enabled, and defragment memory pub async fn cleanup_torrents(&self) { info!("Cleaning torrents..."); + let lock = self.torrents.write().await; // First we create a mapping of all the torrent hashes in a vector, and we use this to iterate through the btreemap. @@ -263,10 +265,10 @@ impl TorrentTracker { // Let's iterate through all torrents, and parse. for hash in torrent_hashes.iter() { - let mut torrent = TorrentEntry{ + let mut torrent = TorrentEntry { peers: BTreeMap::new(), completed: 0, - seeders: 0 + seeders: 0, }; let lock = self.torrents.write().await; @@ -299,32 +301,39 @@ impl TorrentTracker { pub async fn periodic_saving(&self) { // Get a lock for writing - let mut shadow = self.shadow.write().await; + // let mut shadow = self.shadow.write().await; // We will get the data and insert it into the shadow, while clearing updates. let mut updates = self.updates.write().await; - - for (infohash, completed) in updates.iter() { - if shadow.contains_key(infohash) { - shadow.remove(infohash); - } - shadow.insert(*infohash, *completed); + let mut updates_cloned: std::collections::HashMap = std::collections::HashMap::new(); + // let mut torrent_hashes: Vec = Vec::new(); + for (k, completed) in updates.iter() { + updates_cloned.insert(*k, *completed); } updates.clear(); drop(updates); - // We get shadow data into local array to be handled. + let mut shadows = self.shadow.write().await; + for (k, completed) in updates_cloned.iter() { + if shadows.contains_key(k) { + shadows.remove(k); + } + shadows.insert(*k, *completed); + } + drop(updates_cloned); + + // We updated the shadow data from the updates data, let's handle shadow data as expected. let mut shadow_copy: BTreeMap = BTreeMap::new(); - for (infohash, completed) in shadow.iter() { - shadow_copy.insert(*infohash, TorrentEntry{ + for (infohash, completed) in shadows.iter() { + shadow_copy.insert(*infohash, TorrentEntry { peers: Default::default(), completed: *completed, - seeders: 0 + seeders: 0, }); } // Drop the lock - drop(shadow); + drop(shadows); // We will now save the data from the shadow into the database. // This should not put any strain on the server itself, other then the harddisk/ssd. diff --git a/src/tracker_stats.rs b/src/tracker_stats.rs index 1a6a71c2b..0bcd781ba 100644 --- a/src/tracker_stats.rs +++ b/src/tracker_stats.rs @@ -1,6 +1,7 @@ use std::sync::Arc; + use tokio::sync::{mpsc, RwLock, RwLockReadGuard}; -use tokio::sync::mpsc::{Sender}; +use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; const CHANNEL_BUFFER_SIZE: usize = 65_535; @@ -16,7 +17,7 @@ pub enum TrackerStatsEvent { Udp4Scrape, Udp6Connect, Udp6Announce, - Udp6Scrape + Udp6Scrape, } #[derive(Debug)] @@ -56,14 +57,14 @@ impl TrackerStats { pub struct StatsTracker { channel_sender: Option>, - pub stats: Arc> + pub stats: Arc>, } impl StatsTracker { pub fn new() -> Self { Self { channel_sender: None, - stats: Arc::new(RwLock::new(TrackerStats::new())) + stats: Arc::new(RwLock::new(TrackerStats::new())), } } diff --git a/src/utils.rs b/src/utils.rs index e3a8302df..fb2a94513 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,8 +1,9 @@ -use std::net::SocketAddr; -use std::time::SystemTime; use std::error::Error; use std::fmt::Write; use std::io::Cursor; +use std::net::SocketAddr; +use std::time::SystemTime; + use aquatic_udp_protocol::ConnectionId; use byteorder::{BigEndian, ReadBytesExt};