From 5546898f7df5557a2f2f1a0381370ab8c4785144 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Sat, 29 Jan 2022 22:13:14 +0100 Subject: [PATCH 01/16] refactor: separating tracker servers --- src/lib.rs | 10 +- src/main.rs | 2 +- src/torrust_http_tracker/mod.rs | 7 + src/torrust_http_tracker/request.rs | 14 ++ src/torrust_http_tracker/response.rs | 94 +++++++++++++ .../server.rs} | 126 +----------------- src/torrust_udp_tracker/mod.rs | 1 + .../server.rs} | 12 +- src/tracker.rs | 10 +- 9 files changed, 137 insertions(+), 139 deletions(-) create mode 100644 src/torrust_http_tracker/mod.rs create mode 100644 src/torrust_http_tracker/request.rs create mode 100644 src/torrust_http_tracker/response.rs rename src/{http_server.rs => torrust_http_tracker/server.rs} (76%) create mode 100644 src/torrust_udp_tracker/mod.rs rename src/{udp_server.rs => torrust_udp_tracker/server.rs} (98%) diff --git a/src/lib.rs b/src/lib.rs index 58f1d4dac..c055cfae4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,4 @@ pub mod config; -pub mod udp_server; -pub mod http_server; pub mod tracker; pub mod http_api_server; pub mod common; @@ -8,12 +6,12 @@ pub mod utils; pub mod database; pub mod key_manager; pub mod logging; -pub mod udp_tracker; -mod http_tracker; +pub mod torrust_udp_tracker; +pub mod torrust_http_tracker; pub use self::config::*; -pub use self::udp_server::*; -pub use self::http_server::*; +pub use torrust_udp_tracker::server::*; +pub use torrust_http_tracker::server::*; pub use self::tracker::*; pub use self::http_api_server::*; pub use self::common::*; diff --git a/src/main.rs b/src/main.rs index cfb5365ed..45b42ef63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use log::{info}; use torrust_tracker::{http_api_server, Configuration, TorrentTracker, UdpServer, HttpTrackerConfig, UdpTrackerConfig, HttpApiConfig, logging, TrackerServer}; use std::sync::Arc; use tokio::task::JoinHandle; -use torrust_tracker::http_server::HttpServer; +use torrust_tracker::torrust_http_tracker::server::HttpServer; #[tokio::main] async fn main() { diff --git a/src/torrust_http_tracker/mod.rs b/src/torrust_http_tracker/mod.rs new file mode 100644 index 000000000..236f6e996 --- /dev/null +++ b/src/torrust_http_tracker/mod.rs @@ -0,0 +1,7 @@ +pub mod server; +pub mod request; +pub mod response; + +pub use self::server::*; +pub use self::request::*; +pub use self::response::*; diff --git a/src/torrust_http_tracker/request.rs b/src/torrust_http_tracker/request.rs new file mode 100644 index 000000000..d88f36d3a --- /dev/null +++ b/src/torrust_http_tracker/request.rs @@ -0,0 +1,14 @@ +use serde::{Deserialize}; + +#[derive(Deserialize)] +pub struct AnnounceRequest { + pub downloaded: u32, + pub uploaded: u32, + pub key: String, + pub peer_id: String, + pub port: u16, + pub info_hash: String, + pub left: u32, + pub event: Option, + pub compact: Option, +} diff --git a/src/torrust_http_tracker/response.rs b/src/torrust_http_tracker/response.rs new file mode 100644 index 000000000..a30fc89f6 --- /dev/null +++ b/src/torrust_http_tracker/response.rs @@ -0,0 +1,94 @@ +use std::collections::HashMap; +use std::error::Error; +use std::io::Write; +use std::net::IpAddr; +use serde::{Serialize}; +use warp::http::Response; + +#[derive(Serialize)] +pub struct Peer { + pub peer_id: String, + pub ip: IpAddr, + pub port: u16, +} + +#[derive(Serialize)] +pub struct AnnounceResponse { + pub interval: u32, + //pub tracker_id: String, + pub complete: u32, + pub incomplete: u32, + pub peers: Vec +} + +impl AnnounceResponse { + pub fn write(&self) -> String { + serde_bencode::to_string(&self).unwrap() + } + + pub fn write_compact(&self) -> Result, Box> { + let mut peers_v4: Vec = Vec::new(); + let mut peers_v6: Vec = Vec::new(); + + for peer in &self.peers { + match peer.ip { + IpAddr::V4(ip) => { + peers_v4.write(&u32::from(ip).to_be_bytes())?; + peers_v4.write(&peer.port.to_be_bytes())?; + } + IpAddr::V6(ip) => { + peers_v6.write(&u128::from(ip).to_be_bytes())?; + peers_v6.write(&peer.port.to_be_bytes())?; + } + } + } + + let mut bytes: Vec = Vec::new(); + bytes.write(b"d8:intervali")?; + bytes.write(&self.interval.to_string().as_bytes())?; + bytes.write(b"e8:completei")?; + bytes.write(&self.complete.to_string().as_bytes())?; + bytes.write(b"e10:incompletei")?; + bytes.write(&self.incomplete.to_string().as_bytes())?; + bytes.write(b"e5:peers")?; + bytes.write(&peers_v4.len().to_string().as_bytes())?; + bytes.write(b":")?; + bytes.write(peers_v4.as_slice())?; + bytes.write(b"e6:peers6")?; + bytes.write(&peers_v6.len().to_string().as_bytes())?; + bytes.write(b":")?; + bytes.write(peers_v6.as_slice())?; + bytes.write(b"e")?; + + Ok(bytes) + } +} + +#[derive(Serialize)] +pub struct ScrapeResponseEntry { + pub complete: u32, + pub downloaded: u32, + pub incomplete: u32, +} + +#[derive(Serialize)] +pub struct ScrapeResponse { + pub files: HashMap +} + +impl ScrapeResponse { + pub fn write(&self) -> String { + serde_bencode::to_string(&self).unwrap() + } +} + +#[derive(Serialize)] +pub struct ErrorResponse { + pub failure_reason: String +} + +impl warp::Reply for ErrorResponse { + fn into_response(self) -> warp::reply::Response { + Response::new(format!("{}", serde_bencode::to_string(&self).unwrap()).into()) + } +} diff --git a/src/http_server.rs b/src/torrust_http_tracker/server.rs similarity index 76% rename from src/http_server.rs rename to src/torrust_http_tracker/server.rs index 9c942856c..af1774403 100644 --- a/src/http_server.rs +++ b/src/torrust_http_tracker/server.rs @@ -1,135 +1,19 @@ use std::collections::{HashMap}; use crate::tracker::{TorrentTracker}; -use serde::{Deserialize, Serialize}; use std::convert::Infallible; -use std::error::Error; -use std::io::Write; -use std::net::{IpAddr, SocketAddr}; +use std::net::{SocketAddr}; use std::sync::Arc; use std::str::FromStr; +use super::{AnnounceResponse, ScrapeResponse}; use log::{debug}; use warp::{filters, reply::Reply, Filter}; use warp::http::Response; use crate::{TorrentError, TorrentPeer, TorrentStats}; use crate::key_manager::AuthKey; use crate::utils::url_encode_bytes; -use super::common::*; - -#[derive(Deserialize, Debug)] -pub struct AnnounceRequest { - pub downloaded: u32, - pub uploaded: u32, - pub key: String, - pub peer_id: String, - pub port: u16, - pub info_hash: String, - pub left: u32, - pub event: Option, - pub compact: Option, -} - -impl AnnounceRequest { - pub fn is_compact(&self) -> bool { - self.compact.unwrap_or(0) == 1 - } -} - -#[derive(Deserialize, Debug)] -pub struct ScrapeRequest { - pub info_hash: String, -} - -#[derive(Serialize)] -struct Peer { - peer_id: String, - ip: IpAddr, - port: u16, -} - -#[derive(Serialize)] -struct AnnounceResponse { - interval: u32, - //tracker_id: String, - complete: u32, - incomplete: u32, - peers: Vec -} - -impl AnnounceResponse { - pub fn write(&self) -> String { - serde_bencode::to_string(&self).unwrap() - } - - pub fn write_compact(&self) -> Result, Box> { - let mut peers_v4: Vec = Vec::new(); - let mut peers_v6: Vec = Vec::new(); - - for peer in &self.peers { - match peer.ip { - IpAddr::V4(ip) => { - peers_v4.write(&u32::from(ip).to_be_bytes())?; - peers_v4.write(&peer.port.to_be_bytes())?; - } - IpAddr::V6(ip) => { - peers_v6.write(&u128::from(ip).to_be_bytes())?; - peers_v6.write(&peer.port.to_be_bytes())?; - } - } - } - - debug!("{:?}", String::from_utf8_lossy(peers_v4.as_slice())); - debug!("{:?}", String::from_utf8_lossy(peers_v6.as_slice())); - - let mut bytes: Vec = Vec::new(); - bytes.write(b"d8:intervali")?; - bytes.write(&self.interval.to_string().as_bytes())?; - bytes.write(b"e8:completei")?; - bytes.write(&self.complete.to_string().as_bytes())?; - bytes.write(b"e10:incompletei")?; - bytes.write(&self.incomplete.to_string().as_bytes())?; - bytes.write(b"e5:peers")?; - bytes.write(&peers_v4.len().to_string().as_bytes())?; - bytes.write(b":")?; - bytes.write(peers_v4.as_slice())?; - bytes.write(b"e6:peers6")?; - bytes.write(&peers_v6.len().to_string().as_bytes())?; - bytes.write(b":")?; - bytes.write(peers_v6.as_slice())?; - bytes.write(b"e")?; - - debug!("{:?}", String::from_utf8_lossy(bytes.as_slice())); - Ok(bytes) - } -} - -#[derive(Serialize)] -struct ScrapeResponse { - files: HashMap -} - -impl ScrapeResponse { - pub fn write(&self) -> String { - serde_bencode::to_string(&self).unwrap() - } -} - -#[derive(Serialize)] -struct ScrapeResponseEntry { - complete: u32, - downloaded: u32, - incomplete: u32, -} - -#[derive(Serialize)] -struct ErrorResponse { - failure_reason: String -} - -impl warp::Reply for ErrorResponse { - fn into_response(self) -> warp::reply::Response { - Response::new(format!("{}", serde_bencode::to_string(&self).unwrap()).into()) - } -} +use crate::common::*; +use crate::torrust_http_tracker::request::AnnounceRequest; +use crate::torrust_http_tracker::{ErrorResponse, Peer, ScrapeResponseEntry}; #[derive(Clone)] pub struct HttpServer { diff --git a/src/torrust_udp_tracker/mod.rs b/src/torrust_udp_tracker/mod.rs new file mode 100644 index 000000000..74f47ad34 --- /dev/null +++ b/src/torrust_udp_tracker/mod.rs @@ -0,0 +1 @@ +pub mod server; diff --git a/src/udp_server.rs b/src/torrust_udp_tracker/server.rs similarity index 98% rename from src/udp_server.rs rename to src/torrust_udp_tracker/server.rs index cf0474f7c..681a46f47 100644 --- a/src/udp_server.rs +++ b/src/torrust_udp_tracker/server.rs @@ -1,15 +1,15 @@ -use log::{debug}; +use log::debug; use std; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; -use std::io::{Cursor}; +use std::io::Cursor; use aquatic_udp_protocol::{AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, IpVersion, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId}; use tokio::net::UdpSocket; -use super::common::*; +use crate::common::*; use crate::utils::get_connection_id; use crate::tracker::TorrentTracker; -use crate::{TorrentPeer, TorrentError}; +use crate::{InfoHash, TorrentError, TorrentPeer}; struct RequestError { error: TorrentError, @@ -18,7 +18,7 @@ struct RequestError { struct AnnounceRequestWrapper { announce_request: AnnounceRequest, - info_hash: super::common::InfoHash, + info_hash: InfoHash, } impl AnnounceRequestWrapper { diff --git a/src/tracker.rs b/src/tracker.rs index 86a2d2e7e..7a8dae82f 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -3,15 +3,15 @@ use serde; use std::borrow::Cow; use std::collections::BTreeMap; use tokio::sync::RwLock; -use crate::common::{InfoHash, NumberOfBytesDef, AnnounceEventDef, PeerId}; -use std::net::{SocketAddr, IpAddr}; -use crate::{Configuration, http_server, key_manager, MAX_SCRAPE_TORRENTS}; +use crate::common::{AnnounceEventDef, InfoHash, NumberOfBytesDef, PeerId}; +use std::net::{IpAddr, SocketAddr}; +use crate::{Configuration, torrust_http_tracker, key_manager, MAX_SCRAPE_TORRENTS}; use std::collections::btree_map::Entry; use crate::database::SqliteDatabase; use std::sync::Arc; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; use log::debug; -use crate::key_manager::{AuthKey}; +use crate::key_manager::AuthKey; use r2d2_sqlite::rusqlite; const TWO_HOURS: std::time::Duration = std::time::Duration::from_secs(3600 * 2); @@ -77,7 +77,7 @@ impl TorrentPeer { } } - pub fn from_http_announce_request(announce_request: &http_server::AnnounceRequest, remote_addr: SocketAddr, peer_addr: Option) -> Self { + pub fn from_http_announce_request(announce_request: &torrust_http_tracker::request::AnnounceRequest, remote_addr: SocketAddr, peer_addr: Option) -> Self { // Potentially substitute localhost IP with external IP let peer_addr = match peer_addr { None => SocketAddr::new(IpAddr::from(remote_addr.ip()), announce_request.port), From ed37171f084b49321ae39000898f4795f6f8b98b Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Sat, 29 Jan 2022 22:29:18 +0100 Subject: [PATCH 02/16] refactor: reordered imports --- src/torrust_http_tracker/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/torrust_http_tracker/server.rs b/src/torrust_http_tracker/server.rs index af1774403..3b556c344 100644 --- a/src/torrust_http_tracker/server.rs +++ b/src/torrust_http_tracker/server.rs @@ -1,13 +1,13 @@ use std::collections::{HashMap}; -use crate::tracker::{TorrentTracker}; use std::convert::Infallible; use std::net::{SocketAddr}; use std::sync::Arc; use std::str::FromStr; -use super::{AnnounceResponse, ScrapeResponse}; use log::{debug}; use warp::{filters, reply::Reply, Filter}; use warp::http::Response; +use super::{AnnounceResponse, ScrapeResponse}; +use crate::tracker::{TorrentTracker}; use crate::{TorrentError, TorrentPeer, TorrentStats}; use crate::key_manager::AuthKey; use crate::utils::url_encode_bytes; From 9360231b64ebc14a80c7002aede54b58ab9cd202 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 00:22:49 +0100 Subject: [PATCH 03/16] refactor: http tracker basically done --- Cargo.lock | 3 +- Cargo.toml | 6 +- src/main.rs | 21 +- src/torrust_http_tracker/errors.rs | 45 +++ src/torrust_http_tracker/mod.rs | 1 + src/torrust_http_tracker/request.rs | 21 +- src/torrust_http_tracker/response.rs | 7 - src/torrust_http_tracker/server.rs | 418 ++++++++++++++------------- src/torrust_udp_tracker/server.rs | 2 +- src/tracker.rs | 77 +++-- 10 files changed, 328 insertions(+), 273 deletions(-) create mode 100644 src/torrust_http_tracker/errors.rs diff --git a/Cargo.lock b/Cargo.lock index af57a3768..08a4722e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1742,7 +1742,7 @@ dependencies = [ [[package]] name = "torrust-tracker" -version = "2.1.0" +version = "2.1.1" dependencies = [ "aquatic_udp_protocol", "binascii", @@ -1763,6 +1763,7 @@ dependencies = [ "serde_bencode", "serde_bytes", "serde_json", + "thiserror", "tokio", "toml", "warp", diff --git a/Cargo.toml b/Cargo.toml index 6e78211ea..e3d0a06a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "torrust-tracker" -version = "2.1.0" -authors = ["Mick van Dijke ", "Naim A. "] +version = "2.1.1" +license = "AGPL-3.0" +authors = ["Mick van Dijke "] description = "A feature rich BitTorrent tracker." edition = "2018" @@ -30,5 +31,6 @@ rand = "0.8.4" env_logger = "0.9.0" config = "0.11" derive_more = "0.99" +thiserror = "1.0" aquatic_udp_protocol = "0.1.0" diff --git a/src/main.rs b/src/main.rs index 45b42ef63..ce8887742 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use log::{info}; use torrust_tracker::{http_api_server, Configuration, TorrentTracker, UdpServer, HttpTrackerConfig, UdpTrackerConfig, HttpApiConfig, logging, TrackerServer}; use std::sync::Arc; @@ -74,25 +75,21 @@ fn start_api_server(config: &HttpApiConfig, tracker: Arc) -> Joi } fn start_http_tracker_server(config: &HttpTrackerConfig, tracker: Arc) -> JoinHandle<()> { - info!("Starting HTTP server on: {}", config.bind_address); - let http_tracker = Arc::new(HttpServer::new(tracker)); - let bind_addr = config.bind_address.parse::().unwrap(); + let http_tracker = HttpServer::new(tracker); + let bind_addr = config.bind_address.parse::().unwrap(); let ssl_enabled = config.ssl_enabled; let ssl_cert_path = config.ssl_cert_path.clone(); let ssl_key_path = config.ssl_key_path.clone(); + tokio::spawn(async move { // run with tls if ssl_enabled and cert and key path are set - if ssl_enabled { - info!("SSL enabled."); - warp::serve(HttpServer::routes(http_tracker)) - .tls() - .cert_path(ssl_cert_path.as_ref().unwrap()) - .key_path(ssl_key_path.as_ref().unwrap()) - .run(bind_addr).await; + if ssl_enabled && ssl_cert_path.is_some() && ssl_key_path.is_some() { + info!("Starting HTTPS server on: {} (TLS)", bind_addr); + http_tracker.start_tls(bind_addr, ssl_cert_path.as_ref().unwrap(), ssl_key_path.as_ref().unwrap()).await; } else { - warp::serve(HttpServer::routes(http_tracker)) - .run(bind_addr).await; + info!("Starting HTTP server on: {}", bind_addr); + http_tracker.start(bind_addr).await; } }) } diff --git a/src/torrust_http_tracker/errors.rs b/src/torrust_http_tracker/errors.rs new file mode 100644 index 000000000..d6a24ac38 --- /dev/null +++ b/src/torrust_http_tracker/errors.rs @@ -0,0 +1,45 @@ +use warp::reject::Reject; +use thiserror::Error; +use crate::TorrentError; + +#[derive(Error, Debug)] +pub enum ServerError { + #[error("internal server error")] + InternalServerError, + + #[error("info_hash is either missing or invalid")] + InvalidInfoHash, + + #[error("could not find remote address")] + AddressNotFound, + + #[error("torrent has no peers")] + NoPeersFound, + + #[error("torrent not on whitelist")] + TorrentNotWhitelisted, + + #[error("peer not authenticated")] + PeerNotAuthenticated, + + #[error("invalid authentication key")] + PeerKeyNotValid, + + #[error("exceeded info_hash limit")] + ExceededInfoHashLimit, +} + +impl Reject for ServerError {} + +impl From for ServerError { + fn from(e: TorrentError) -> Self { + match e { + TorrentError::TorrentNotWhitelisted => ServerError::TorrentNotWhitelisted, + TorrentError::PeerNotAuthenticated => ServerError::PeerNotAuthenticated, + TorrentError::PeerKeyNotValid => ServerError::PeerKeyNotValid, + TorrentError::NoPeersFound => ServerError::NoPeersFound, + TorrentError::CouldNotSendResponse => ServerError::InternalServerError, + TorrentError::InvalidInfoHash => ServerError::InvalidInfoHash, + } + } +} diff --git a/src/torrust_http_tracker/mod.rs b/src/torrust_http_tracker/mod.rs index 236f6e996..f105e320c 100644 --- a/src/torrust_http_tracker/mod.rs +++ b/src/torrust_http_tracker/mod.rs @@ -1,6 +1,7 @@ pub mod server; pub mod request; pub mod response; +mod errors; pub use self::server::*; pub use self::request::*; diff --git a/src/torrust_http_tracker/request.rs b/src/torrust_http_tracker/request.rs index d88f36d3a..b62b9430a 100644 --- a/src/torrust_http_tracker/request.rs +++ b/src/torrust_http_tracker/request.rs @@ -1,14 +1,31 @@ +use std::net::SocketAddr; use serde::{Deserialize}; +use crate::InfoHash; #[derive(Deserialize)] -pub struct AnnounceRequest { +pub struct AnnounceRequestQuery { pub downloaded: u32, pub uploaded: u32, pub key: String, pub peer_id: String, pub port: u16, - pub info_hash: String, pub left: u32, pub event: Option, pub compact: Option, } + +pub struct AnnounceRequest { + pub info_hash: InfoHash, + pub peer_addr: SocketAddr, + pub downloaded: u32, + pub uploaded: u32, + pub peer_id: String, + pub port: u16, + pub left: u32, + pub event: Option, + pub compact: Option, +} + +pub struct ScrapeRequest { + pub info_hashes: Vec, +} diff --git a/src/torrust_http_tracker/response.rs b/src/torrust_http_tracker/response.rs index a30fc89f6..df039a1c2 100644 --- a/src/torrust_http_tracker/response.rs +++ b/src/torrust_http_tracker/response.rs @@ -3,7 +3,6 @@ use std::error::Error; use std::io::Write; use std::net::IpAddr; use serde::{Serialize}; -use warp::http::Response; #[derive(Serialize)] pub struct Peer { @@ -86,9 +85,3 @@ impl ScrapeResponse { pub struct ErrorResponse { pub failure_reason: String } - -impl warp::Reply for ErrorResponse { - fn into_response(self) -> warp::reply::Response { - Response::new(format!("{}", serde_bencode::to_string(&self).unwrap()).into()) - } -} diff --git a/src/torrust_http_tracker/server.rs b/src/torrust_http_tracker/server.rs index 3b556c344..5ac6f57cf 100644 --- a/src/torrust_http_tracker/server.rs +++ b/src/torrust_http_tracker/server.rs @@ -1,20 +1,24 @@ -use std::collections::{HashMap}; +use std::collections::HashMap; use std::convert::Infallible; use std::net::{SocketAddr}; use std::sync::Arc; use std::str::FromStr; use log::{debug}; -use warp::{filters, reply::Reply, Filter}; -use warp::http::Response; -use super::{AnnounceResponse, ScrapeResponse}; +use warp::{reply::Reply, Filter, Rejection, reject}; +use warp::http::{Response, StatusCode}; +use super::{AnnounceResponse}; use crate::tracker::{TorrentTracker}; -use crate::{TorrentError, TorrentPeer, TorrentStats}; +use crate::{TorrentPeer, TorrentStats}; use crate::key_manager::AuthKey; -use crate::utils::url_encode_bytes; use crate::common::*; -use crate::torrust_http_tracker::request::AnnounceRequest; -use crate::torrust_http_tracker::{ErrorResponse, Peer, ScrapeResponseEntry}; +use crate::torrust_http_tracker::request::AnnounceRequestQuery; +use crate::torrust_http_tracker::{AnnounceRequest, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry}; +use crate::torrust_http_tracker::errors::ServerError; +use crate::utils::url_encode_bytes; +type WebResult = std::result::Result; + +/// Server that listens on HTTP, needs a TorrentTracker #[derive(Clone)] pub struct HttpServer { tracker: Arc, @@ -27,232 +31,230 @@ impl HttpServer { } } - // &self did not work here - pub fn routes(http_server: Arc) -> impl Filter + Clone + Send + Sync + 'static { - // optional tracker key - let opt_key = warp::path::param::() - .map(Some) - .or_else(|_| async { - // Ok(None) - Ok::<(Option,), std::convert::Infallible>((None,)) - }); - - // GET /announce?key=:String - // Announce peer - let hs1 = http_server.clone(); - let announce_route = - filters::path::path("announce") - .and(filters::method::get()) - .and(warp::addr::remote()) - .and(opt_key) - .and(filters::query::raw()) - .and(filters::query::query()) - .map(move |remote_addr, key, raw_query, query| { - debug!("Request: {}", raw_query); - (remote_addr, key, raw_query, query, hs1.clone()) - }) - .and_then(move |(remote_addr, key, raw_query, mut query, http_server): (Option, Option, String, AnnounceRequest, Arc)| { - async move { - if remote_addr.is_none() { return HttpServer::send_error("could not get remote address") } - - // query.info_hash somehow receives a corrupt string - // so we have to get the info_hash manually from the raw query - let info_hashes = HttpServer::info_hashes_from_raw_query(&raw_query); - if info_hashes.len() < 1 { return HttpServer::send_error("info_hash not found") } - query.info_hash = info_hashes[0].to_string(); - debug!("{:?}", query.info_hash); - - if let Some(err) = http_server.authenticate_request(&query.info_hash, key).await { return err } - - http_server.handle_announce(query, remote_addr.unwrap()).await - } - }); - - // GET /scrape?key=:String - // Get torrent info - let hs2 = http_server.clone(); - let scrape_route = - filters::path::path("scrape") - .and(filters::method::get()) - .and(opt_key) - .and(filters::query::raw()) - .map(move |key, raw_query| { - debug!("Request: {}", raw_query); - (key, raw_query, hs2.clone()) - }) - .and_then(move |(key, raw_query, http_server): (Option, String, Arc)| { - async move { - let info_hashes = HttpServer::info_hashes_from_raw_query(&raw_query); - if info_hashes.len() < 1 { return HttpServer::send_error("info_hash not found") } - if info_hashes.len() > 50 { return HttpServer::send_error("exceeded the max of 50 info_hashes") } - debug!("{:?}", info_hashes); - - // todo: verify all info_hashes before scrape - if let Some(err) = http_server.authenticate_request(&info_hashes[0].to_string(), key).await { return err } - - http_server.handle_scrape(info_hashes).await - } - }); - - // all routes - warp::any().and(announce_route.or(scrape_route)) + /// Start the HttpServer + pub async fn start(&self, socket_addr: SocketAddr) { + warp::serve(routes(self.tracker.clone())) + .run(socket_addr).await; } - fn info_hashes_from_raw_query(raw_query: &str) -> Vec { - let split_raw_query: Vec<&str> = raw_query.split("&").collect(); - let mut info_hashes: Vec = Vec::new(); - - for v in split_raw_query { - if v.contains("info_hash") { - let raw_info_hash = v.split("=").collect::>()[1]; - let info_hash_bytes = percent_encoding::percent_decode_str(raw_info_hash).collect::>(); - let info_hash = InfoHash::from_str(&hex::encode(info_hash_bytes)); - if let Ok(ih) = info_hash { - info_hashes.push(ih); - } + /// Start the HttpServer in TLS mode + pub async fn start_tls(&self, socket_addr: SocketAddr, ssl_cert_path: &str, ssl_key_path: &str) { + warp::serve(routes(self.tracker.clone())) + .tls() + .cert_path(ssl_cert_path) + .key_path(ssl_key_path) + .run(socket_addr).await; + } +} + +/// All routes +fn routes(tracker: Arc,) -> impl Filter + Clone { + announce(tracker.clone()) + .or(scrape(tracker.clone())) + .recover(handle_error) +} + +/// Pass Arc along +fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { + warp::any() + .map(move || tracker.clone()) +} + +/// Check for infoHash +fn with_info_hash() -> impl Filter,), Error = Rejection> + Clone { + warp::filters::query::raw() + .and_then(info_hashes) +} + +/// Parse InfoHash from raw query string +async fn info_hashes(raw_query: String) -> WebResult> { + let split_raw_query: Vec<&str> = raw_query.split("&").collect(); + let mut info_hashes: Vec = Vec::new(); + + for v in split_raw_query { + if v.contains("info_hash") { + let raw_info_hash = v.split("=").collect::>()[1]; + let info_hash_bytes = percent_encoding::percent_decode_str(raw_info_hash).collect::>(); + let info_hash = InfoHash::from_str(&hex::encode(info_hash_bytes)); + if let Ok(ih) = info_hash { + info_hashes.push(ih); } } + } - info_hashes + if info_hashes.len() > MAX_SCRAPE_TORRENTS as usize { + Err(reject::custom(ServerError::ExceededInfoHashLimit)) + } else if info_hashes.len() < 1 { + Err(reject::custom(ServerError::InvalidInfoHash)) + } else { + Ok(info_hashes) } +} - fn send_announce_response(query: &AnnounceRequest, torrent_stats: TorrentStats, peers: Vec, interval: u32) -> Result { - let http_peers: Vec = peers.iter().map(|peer| Peer { - peer_id: String::from_utf8_lossy(&peer.peer_id.0).to_string(), - ip: peer.peer_addr.ip(), - port: peer.peer_addr.port() - }).collect(); - - let res = AnnounceResponse { - interval, - complete: torrent_stats.seeders, - incomplete: torrent_stats.leechers, - peers: http_peers - }; +/// Pass Arc along +fn with_auth_key() -> impl Filter,), Error = warp::Rejection> + Clone { + warp::path::param::() + .map(|key_string: String| { + AuthKey::from_string(&key_string) + }) +} - // check for compact response request - let response = match query.compact { - None => Response::new(res.write().into()), - Some(int) => { - if int == 1 { - let res_compact = res.write_compact(); - match res_compact { - Ok(response) => Response::new(response.into()), - Err(e) => { - debug!("{}", e); - HttpServer::send_error("server error").unwrap() - } - } - } else { - Response::new(res.write().into()) - } - } - }; +/// Check for AnnounceRequest +fn with_announce_request() -> impl Filter + Clone { + warp::filters::query::query::() + .and(with_info_hash()) + .and(warp::addr::remote()) + .and_then(announce_request) +} + +/// Parse AnnounceRequest from raw AnnounceRequestQuery, InfoHash and Option +async fn announce_request(announce_request_query: AnnounceRequestQuery, info_hashes: Vec, remote_addr: Option) -> WebResult { + if remote_addr.is_none() { return Err(reject::custom(ServerError::AddressNotFound)) } + + Ok(AnnounceRequest { + info_hash: info_hashes[0], + peer_addr: remote_addr.unwrap(), + downloaded: announce_request_query.downloaded, + uploaded: announce_request_query.uploaded, + peer_id: announce_request_query.peer_id, + port: announce_request_query.port, + left: announce_request_query.left, + event: announce_request_query.event, + compact: announce_request_query.compact + }) +} + +/// Check for ScrapeRequest +fn with_scrape_request() -> impl Filter + Clone { + warp::any() + .and(with_info_hash()) + .and_then(scrape_request) +} + +/// Parse ScrapeRequest from InfoHash +async fn scrape_request(info_hashes: Vec) -> WebResult { + Ok(ScrapeRequest { + info_hashes, + }) +} - Ok(response) +/// Authenticate AnnounceRequest using optional AuthKey +async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { + match tracker.authenticate_request(info_hash, auth_key).await { + Ok(_) => Ok(()), + Err(e) => Err(ServerError::from(e)) } +} + +/// GET /announce/ +fn announce(tracker: Arc,) -> impl Filter + Clone { + warp::path::path("announce") + .and(warp::filters::method::get()) + .and(with_announce_request()) + .and(with_auth_key()) + .and(with_tracker(tracker)) + .and_then(handle_announce) +} - fn send_error(msg: &str) -> Result { - Ok(ErrorResponse { - failure_reason: msg.to_string() - }.into_response()) +/// GET /scrape/ +fn scrape(tracker: Arc,) -> impl Filter + Clone { + warp::path::path("scrape") + .and(warp::filters::method::get()) + .and(with_scrape_request()) + .and(with_auth_key()) + .and(with_tracker(tracker)) + .and_then(handle_scrape) +} + +/// Handle announce request +pub async fn handle_announce(announce_request: AnnounceRequest, auth_key: Option, tracker: Arc,) -> WebResult { + if let Err(e) = authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await { + return Err(reject::custom(e)) } - async fn authenticate_request(&self, info_hash_str: &str, key: Option) -> Option> { - let info_hash= InfoHash::from_str(info_hash_str); - if info_hash.is_err() { return Some(HttpServer::send_error("invalid info_hash")) } + let peer = TorrentPeer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip()); - let auth_key = match key { - None => None, - Some(v) => AuthKey::from_string(&v) - }; + match tracker.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer).await { + Err(e) => Err(reject::custom(ServerError::from(e))), + Ok(torrent_stats) => { + // get all peers excluding the client_addr + let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; + if peers.is_none() { return Err(reject::custom(ServerError::NoPeersFound)) } - if let Err(e) = self.tracker.authenticate_request(&info_hash.unwrap(), auth_key).await { - return match e { - TorrentError::TorrentNotWhitelisted => { - debug!("Info_hash not whitelisted."); - Some(HttpServer::send_error("torrent not whitelisted")) - } - TorrentError::PeerKeyNotValid => { - debug!("Peer key not valid."); - Some(HttpServer::send_error("peer key not valid")) - } - TorrentError::PeerNotAuthenticated => { - debug!("Peer not authenticated."); - Some(HttpServer::send_error("peer not authenticated")) - } - _ => { - debug!("Unhandled HTTP error."); - Some(HttpServer::send_error("oops")) - } - } + // success response + let announce_interval = tracker.config.http_tracker.announce_interval; + send_announce_response(&announce_request, torrent_stats, peers.unwrap(), announce_interval) } - - None } +} - async fn handle_announce(&self, query: AnnounceRequest, remote_addr: SocketAddr) -> Result { - let info_hash = match InfoHash::from_str(&query.info_hash) { - Ok(v) => v, - Err(_) => { - return HttpServer::send_error("info_hash is invalid") - } - }; +/// Handle scrape request +pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option, tracker: Arc,) -> WebResult { + let mut files: HashMap = HashMap::new(); + let db = tracker.get_torrents().await; + + for info_hash in scrape_request.info_hashes.iter() { + // authenticate every info_hash + if authenticate(info_hash, &auth_key, tracker.clone()).await.is_err() { continue } - let peer = TorrentPeer::from_http_announce_request(&query, remote_addr, self.tracker.config.get_ext_ip()); + let scrape_entry = match db.get(&info_hash) { + Some(torrent_info) => { + let (seeders, completed, leechers) = torrent_info.get_stats(); - match self.tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await { - Err(e) => { - debug!("{:?}", e); - HttpServer::send_error("server error") + ScrapeResponseEntry { complete: seeders, downloaded: completed, incomplete: leechers } } - Ok(torrent_stats) => { - // get all peers excluding the client_addr - let peers = self.tracker.get_torrent_peers(&info_hash, &peer.peer_addr).await; - if peers.is_none() { - debug!("No peers found after announce."); - return HttpServer::send_error("peer is invalid") - } - - // todo: add http announce interval config option - // success response - let announce_interval = self.tracker.config.http_tracker.announce_interval; - HttpServer::send_announce_response(&query, torrent_stats, peers.unwrap(), announce_interval) + None => { + ScrapeResponseEntry { complete: 0, downloaded: 0, incomplete: 0 } } + }; + + if let Ok(encoded_info_hash) = url_encode_bytes(&info_hash.0) { + files.insert(encoded_info_hash, scrape_entry); } } - async fn handle_scrape(&self, info_hashes: Vec) -> Result { - let mut res = ScrapeResponse { - files: HashMap::new() - }; - let db = self.tracker.get_torrents().await; - - for info_hash in info_hashes.iter() { - let scrape_entry = match db.get(&info_hash) { - Some(torrent_info) => { - let (seeders, completed, leechers) = torrent_info.get_stats(); - - ScrapeResponseEntry { - complete: seeders, - downloaded: completed, - incomplete: leechers - } - } - None => { - ScrapeResponseEntry { - complete: 0, - downloaded: 0, - incomplete: 0 - } - } - }; - - if let Ok(encoded_info_hash) = url_encode_bytes(&info_hash.0) { - res.files.insert(encoded_info_hash, scrape_entry); - } - } + send_scrape_response(files) +} +/// Send announce response +fn send_announce_response(announce_request: &AnnounceRequest, torrent_stats: TorrentStats, peers: Vec, interval: u32) -> WebResult { + let http_peers: Vec = peers.iter().map(|peer| Peer { + peer_id: String::from_utf8_lossy(&peer.peer_id.0).to_string(), + ip: peer.peer_addr.ip(), + port: peer.peer_addr.port() + }).collect(); + + let res = AnnounceResponse { + interval, + complete: torrent_stats.seeders, + incomplete: torrent_stats.leechers, + peers: http_peers + }; + + // check for compact response request + if let Some(1) = announce_request.compact { + match res.write_compact() { + Ok(body) => Ok(Response::new(body)), + Err(_) => Err(reject::custom(ServerError::InternalServerError)) + } + } else { Ok(Response::new(res.write().into())) } } + +/// Send scrape response +fn send_scrape_response(files: HashMap) -> WebResult { + Ok(Response::new(ScrapeResponse { files }.write())) +} + +/// Handle all server errors and send error reply +async fn handle_error(r: Rejection) -> std::result::Result { + if let Some(e) = r.find::() { + debug!("{:?}", e); + let reply = warp::reply::json(&ErrorResponse { failure_reason: e.to_string() }); + Ok(warp::reply::with_status(reply, StatusCode::BAD_REQUEST)) + } else { + let reply = warp::reply::json(&ErrorResponse { failure_reason: "internal server error".to_string() }); + Ok(warp::reply::with_status(reply, StatusCode::INTERNAL_SERVER_ERROR)) + } +} diff --git a/src/torrust_udp_tracker/server.rs b/src/torrust_udp_tracker/server.rs index 681a46f47..b3cbccf7f 100644 --- a/src/torrust_udp_tracker/server.rs +++ b/src/torrust_udp_tracker/server.rs @@ -114,7 +114,7 @@ impl UdpServer { async fn handle_announce(&self, remote_addr: SocketAddr, announce_request: &AnnounceRequest) -> Result { let wrapped_announce_request = AnnounceRequestWrapper::new(announce_request.clone()); - self.tracker.authenticate_request(&wrapped_announce_request.info_hash, None).await?; + self.tracker.authenticate_request(&wrapped_announce_request.info_hash, &None).await?; let peer = TorrentPeer::from_udp_announce_request(&wrapped_announce_request.announce_request, remote_addr, self.tracker.config.get_ext_ip()); diff --git a/src/tracker.rs b/src/tracker.rs index 7a8dae82f..33d7ddb5f 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -5,7 +5,7 @@ use std::collections::BTreeMap; use tokio::sync::RwLock; use crate::common::{AnnounceEventDef, InfoHash, NumberOfBytesDef, PeerId}; use std::net::{IpAddr, SocketAddr}; -use crate::{Configuration, torrust_http_tracker, key_manager, MAX_SCRAPE_TORRENTS}; +use crate::{Configuration, key_manager, MAX_SCRAPE_TORRENTS}; use std::collections::btree_map::Entry; use crate::database::SqliteDatabase; use std::sync::Arc; @@ -13,6 +13,7 @@ use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; use log::debug; use crate::key_manager::AuthKey; use r2d2_sqlite::rusqlite; +use crate::torrust_http_tracker::AnnounceRequest; const TWO_HOURS: std::time::Duration = std::time::Duration::from_secs(3600 * 2); const FIVE_MINUTES: std::time::Duration = std::time::Duration::from_secs(300); @@ -77,7 +78,7 @@ impl TorrentPeer { } } - pub fn from_http_announce_request(announce_request: &torrust_http_tracker::request::AnnounceRequest, remote_addr: SocketAddr, peer_addr: Option) -> Self { + pub fn from_http_announce_request(announce_request: &AnnounceRequest, remote_addr: SocketAddr, peer_addr: Option) -> Self { // Potentially substitute localhost IP with external IP let peer_addr = match peer_addr { None => SocketAddr::new(IpAddr::from(remote_addr.ip()), announce_request.port), @@ -245,7 +246,8 @@ pub enum TorrentError { PeerNotAuthenticated, PeerKeyNotValid, NoPeersFound, - CouldNotSendResponse + CouldNotSendResponse, + InvalidInfoHash, } pub struct TorrentTracker { @@ -267,6 +269,18 @@ impl TorrentTracker { } } + fn is_public(&self) -> bool { + self.config.mode == TrackerMode::PublicMode + } + + fn is_private(&self) -> bool { + self.config.mode == TrackerMode::PrivateMode || self.config.mode == TrackerMode::PrivateListedMode + } + + fn is_whitelisted(&self) -> bool { + self.config.mode == TrackerMode::ListedMode || self.config.mode == TrackerMode::PrivateListedMode + } + pub async fn generate_auth_key(&self, seconds_valid: u64) -> Result { let auth_key = key_manager::generate_auth_key(seconds_valid); @@ -285,49 +299,32 @@ impl TorrentTracker { key_manager::verify_auth_key(&db_key) } - pub async fn authenticate_request(&self, info_hash: &InfoHash, key: Option) -> Result<(), TorrentError> { - match self.config.mode { - TrackerMode::PublicMode => Ok(()), - TrackerMode::ListedMode => { - if !self.is_info_hash_whitelisted(info_hash).await { - return Err(TorrentError::TorrentNotWhitelisted) - } - - Ok(()) - } - TrackerMode::PrivateMode => { - match key { - Some(key) => { - if self.verify_auth_key(&key).await.is_err() { - return Err(TorrentError::PeerKeyNotValid) - } + pub async fn authenticate_request(&self, info_hash: &InfoHash, key: &Option) -> Result<(), TorrentError> { + // no authentication needed in public mode + if self.is_public() { return Ok(()) } - Ok(()) - } - None => { - return Err(TorrentError::PeerNotAuthenticated) + // check if auth_key is set and valid + if self.is_private() { + match key { + Some(key) => { + if self.verify_auth_key(key).await.is_err() { + return Err(TorrentError::PeerKeyNotValid) } } + None => { + return Err(TorrentError::PeerNotAuthenticated) + } } - TrackerMode::PrivateListedMode => { - match key { - Some(key) => { - if self.verify_auth_key(&key).await.is_err() { - return Err(TorrentError::PeerKeyNotValid) - } - - if !self.is_info_hash_whitelisted(info_hash).await { - return Err(TorrentError::TorrentNotWhitelisted) - } + } - Ok(()) - } - None => { - return Err(TorrentError::PeerNotAuthenticated) - } - } + // check if info_hash is whitelisted + if self.is_whitelisted() { + if self.is_info_hash_whitelisted(info_hash).await == false { + return Err(TorrentError::TorrentNotWhitelisted) } } + + Ok(()) } // Adding torrents is not relevant to public trackers. @@ -351,7 +348,7 @@ impl TorrentTracker { pub async fn get_torrent_peers( &self, info_hash: &InfoHash, - peer_addr: &std::net::SocketAddr + peer_addr: &SocketAddr ) -> Option> { let read_lock = self.torrents.read().await; match read_lock.get(info_hash) { From e173b9bf5e8a0993d3e7187fd182f44d01549216 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 00:38:28 +0100 Subject: [PATCH 04/16] refactor: http tracker additional separation --- src/torrust_http_tracker/filters.rs | 83 +++++++++++ src/torrust_http_tracker/handlers.rs | 100 +++++++++++++ src/torrust_http_tracker/mod.rs | 9 +- src/torrust_http_tracker/routes.rs | 32 ++++ src/torrust_http_tracker/server.rs | 212 --------------------------- 5 files changed, 223 insertions(+), 213 deletions(-) create mode 100644 src/torrust_http_tracker/filters.rs create mode 100644 src/torrust_http_tracker/handlers.rs create mode 100644 src/torrust_http_tracker/routes.rs diff --git a/src/torrust_http_tracker/filters.rs b/src/torrust_http_tracker/filters.rs new file mode 100644 index 000000000..be1d234f6 --- /dev/null +++ b/src/torrust_http_tracker/filters.rs @@ -0,0 +1,83 @@ +/// Pass Arc along +fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { + warp::any() + .map(move || tracker.clone()) +} + +/// Check for infoHash +fn with_info_hash() -> impl Filter,), Error = Rejection> + Clone { + warp::filters::query::raw() + .and_then(info_hashes) +} + +/// Parse InfoHash from raw query string +async fn info_hashes(raw_query: String) -> WebResult> { + let split_raw_query: Vec<&str> = raw_query.split("&").collect(); + let mut info_hashes: Vec = Vec::new(); + + for v in split_raw_query { + if v.contains("info_hash") { + let raw_info_hash = v.split("=").collect::>()[1]; + let info_hash_bytes = percent_encoding::percent_decode_str(raw_info_hash).collect::>(); + let info_hash = InfoHash::from_str(&hex::encode(info_hash_bytes)); + if let Ok(ih) = info_hash { + info_hashes.push(ih); + } + } + } + + if info_hashes.len() > MAX_SCRAPE_TORRENTS as usize { + Err(reject::custom(ServerError::ExceededInfoHashLimit)) + } else if info_hashes.len() < 1 { + Err(reject::custom(ServerError::InvalidInfoHash)) + } else { + Ok(info_hashes) + } +} + +/// Pass Arc along +fn with_auth_key() -> impl Filter,), Error = warp::Rejection> + Clone { + warp::path::param::() + .map(|key_string: String| { + AuthKey::from_string(&key_string) + }) +} + +/// Check for AnnounceRequest +fn with_announce_request() -> impl Filter + Clone { + warp::filters::query::query::() + .and(with_info_hash()) + .and(warp::addr::remote()) + .and_then(announce_request) +} + +/// Parse AnnounceRequest from raw AnnounceRequestQuery, InfoHash and Option +async fn announce_request(announce_request_query: AnnounceRequestQuery, info_hashes: Vec, remote_addr: Option) -> WebResult { + if remote_addr.is_none() { return Err(reject::custom(ServerError::AddressNotFound)) } + + Ok(AnnounceRequest { + info_hash: info_hashes[0], + peer_addr: remote_addr.unwrap(), + downloaded: announce_request_query.downloaded, + uploaded: announce_request_query.uploaded, + peer_id: announce_request_query.peer_id, + port: announce_request_query.port, + left: announce_request_query.left, + event: announce_request_query.event, + compact: announce_request_query.compact + }) +} + +/// Check for ScrapeRequest +fn with_scrape_request() -> impl Filter + Clone { + warp::any() + .and(with_info_hash()) + .and_then(scrape_request) +} + +/// Parse ScrapeRequest from InfoHash +async fn scrape_request(info_hashes: Vec) -> WebResult { + Ok(ScrapeRequest { + info_hashes, + }) +} diff --git a/src/torrust_http_tracker/handlers.rs b/src/torrust_http_tracker/handlers.rs new file mode 100644 index 000000000..1016b4668 --- /dev/null +++ b/src/torrust_http_tracker/handlers.rs @@ -0,0 +1,100 @@ +/// Authenticate AnnounceRequest using optional AuthKey +async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { + match tracker.authenticate_request(info_hash, auth_key).await { + Ok(_) => Ok(()), + Err(e) => Err(ServerError::from(e)) + } +} + +/// Handle announce request +pub async fn handle_announce(announce_request: AnnounceRequest, auth_key: Option, tracker: Arc,) -> WebResult { + if let Err(e) = authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await { + return Err(reject::custom(e)) + } + + let peer = TorrentPeer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip()); + + match tracker.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer).await { + Err(e) => Err(reject::custom(ServerError::from(e))), + Ok(torrent_stats) => { + // get all peers excluding the client_addr + let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; + if peers.is_none() { return Err(reject::custom(ServerError::NoPeersFound)) } + + // success response + let announce_interval = tracker.config.http_tracker.announce_interval; + send_announce_response(&announce_request, torrent_stats, peers.unwrap(), announce_interval) + } + } +} + +/// Handle scrape request +pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option, tracker: Arc,) -> WebResult { + let mut files: HashMap = HashMap::new(); + let db = tracker.get_torrents().await; + + for info_hash in scrape_request.info_hashes.iter() { + // authenticate every info_hash + if authenticate(info_hash, &auth_key, tracker.clone()).await.is_err() { continue } + + let scrape_entry = match db.get(&info_hash) { + Some(torrent_info) => { + let (seeders, completed, leechers) = torrent_info.get_stats(); + + ScrapeResponseEntry { complete: seeders, downloaded: completed, incomplete: leechers } + } + None => { + ScrapeResponseEntry { complete: 0, downloaded: 0, incomplete: 0 } + } + }; + + if let Ok(encoded_info_hash) = url_encode_bytes(&info_hash.0) { + files.insert(encoded_info_hash, scrape_entry); + } + } + + send_scrape_response(files) +} + +/// Handle all server errors and send error reply +async fn handle_error(r: Rejection) -> std::result::Result { + if let Some(e) = r.find::() { + debug!("{:?}", e); + let reply = warp::reply::json(&ErrorResponse { failure_reason: e.to_string() }); + Ok(warp::reply::with_status(reply, StatusCode::BAD_REQUEST)) + } else { + let reply = warp::reply::json(&ErrorResponse { failure_reason: "internal server error".to_string() }); + Ok(warp::reply::with_status(reply, StatusCode::INTERNAL_SERVER_ERROR)) + } +} + +/// Send announce response +fn send_announce_response(announce_request: &AnnounceRequest, torrent_stats: TorrentStats, peers: Vec, interval: u32) -> WebResult { + let http_peers: Vec = peers.iter().map(|peer| Peer { + peer_id: String::from_utf8_lossy(&peer.peer_id.0).to_string(), + ip: peer.peer_addr.ip(), + port: peer.peer_addr.port() + }).collect(); + + let res = AnnounceResponse { + interval, + complete: torrent_stats.seeders, + incomplete: torrent_stats.leechers, + peers: http_peers + }; + + // check for compact response request + if let Some(1) = announce_request.compact { + match res.write_compact() { + Ok(body) => Ok(Response::new(body)), + Err(_) => Err(reject::custom(ServerError::InternalServerError)) + } + } else { + Ok(Response::new(res.write().into())) + } +} + +/// Send scrape response +fn send_scrape_response(files: HashMap) -> WebResult { + Ok(Response::new(ScrapeResponse { files }.write())) +} diff --git a/src/torrust_http_tracker/mod.rs b/src/torrust_http_tracker/mod.rs index f105e320c..733e69704 100644 --- a/src/torrust_http_tracker/mod.rs +++ b/src/torrust_http_tracker/mod.rs @@ -1,8 +1,15 @@ pub mod server; pub mod request; pub mod response; -mod errors; +pub mod errors; +pub mod routes; +pub mod handlers; +pub mod filters; pub use self::server::*; pub use self::request::*; pub use self::response::*; +pub use self::errors::*; +pub use self::routes::*; +pub use self::handlers::*; +pub use self::filters::*; diff --git a/src/torrust_http_tracker/routes.rs b/src/torrust_http_tracker/routes.rs new file mode 100644 index 000000000..20821f511 --- /dev/null +++ b/src/torrust_http_tracker/routes.rs @@ -0,0 +1,32 @@ +use std::convert::Infallible; +use std::sync::Arc; +use warp::{Filter, Rejection}; +use crate::TorrentTracker; +use crate::torrust_http_tracker::{handle_announce, handle_scrape}; + +/// All routes +fn routes(tracker: Arc,) -> impl Filter + Clone { + announce(tracker.clone()) + .or(scrape(tracker.clone())) + .recover(handle_error) +} + +/// GET /announce/ +fn announce(tracker: Arc,) -> impl Filter + Clone { + warp::path::path("announce") + .and(warp::filters::method::get()) + .and(with_announce_request()) + .and(with_auth_key()) + .and(with_tracker(tracker)) + .and_then(handle_announce) +} + +/// GET /scrape/ +fn scrape(tracker: Arc,) -> impl Filter + Clone { + warp::path::path("scrape") + .and(warp::filters::method::get()) + .and(with_scrape_request()) + .and(with_auth_key()) + .and(with_tracker(tracker)) + .and_then(handle_scrape) +} diff --git a/src/torrust_http_tracker/server.rs b/src/torrust_http_tracker/server.rs index 5ac6f57cf..da739453d 100644 --- a/src/torrust_http_tracker/server.rs +++ b/src/torrust_http_tracker/server.rs @@ -46,215 +46,3 @@ impl HttpServer { .run(socket_addr).await; } } - -/// All routes -fn routes(tracker: Arc,) -> impl Filter + Clone { - announce(tracker.clone()) - .or(scrape(tracker.clone())) - .recover(handle_error) -} - -/// Pass Arc along -fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { - warp::any() - .map(move || tracker.clone()) -} - -/// Check for infoHash -fn with_info_hash() -> impl Filter,), Error = Rejection> + Clone { - warp::filters::query::raw() - .and_then(info_hashes) -} - -/// Parse InfoHash from raw query string -async fn info_hashes(raw_query: String) -> WebResult> { - let split_raw_query: Vec<&str> = raw_query.split("&").collect(); - let mut info_hashes: Vec = Vec::new(); - - for v in split_raw_query { - if v.contains("info_hash") { - let raw_info_hash = v.split("=").collect::>()[1]; - let info_hash_bytes = percent_encoding::percent_decode_str(raw_info_hash).collect::>(); - let info_hash = InfoHash::from_str(&hex::encode(info_hash_bytes)); - if let Ok(ih) = info_hash { - info_hashes.push(ih); - } - } - } - - if info_hashes.len() > MAX_SCRAPE_TORRENTS as usize { - Err(reject::custom(ServerError::ExceededInfoHashLimit)) - } else if info_hashes.len() < 1 { - Err(reject::custom(ServerError::InvalidInfoHash)) - } else { - Ok(info_hashes) - } -} - -/// Pass Arc along -fn with_auth_key() -> impl Filter,), Error = warp::Rejection> + Clone { - warp::path::param::() - .map(|key_string: String| { - AuthKey::from_string(&key_string) - }) -} - -/// Check for AnnounceRequest -fn with_announce_request() -> impl Filter + Clone { - warp::filters::query::query::() - .and(with_info_hash()) - .and(warp::addr::remote()) - .and_then(announce_request) -} - -/// Parse AnnounceRequest from raw AnnounceRequestQuery, InfoHash and Option -async fn announce_request(announce_request_query: AnnounceRequestQuery, info_hashes: Vec, remote_addr: Option) -> WebResult { - if remote_addr.is_none() { return Err(reject::custom(ServerError::AddressNotFound)) } - - Ok(AnnounceRequest { - info_hash: info_hashes[0], - peer_addr: remote_addr.unwrap(), - downloaded: announce_request_query.downloaded, - uploaded: announce_request_query.uploaded, - peer_id: announce_request_query.peer_id, - port: announce_request_query.port, - left: announce_request_query.left, - event: announce_request_query.event, - compact: announce_request_query.compact - }) -} - -/// Check for ScrapeRequest -fn with_scrape_request() -> impl Filter + Clone { - warp::any() - .and(with_info_hash()) - .and_then(scrape_request) -} - -/// Parse ScrapeRequest from InfoHash -async fn scrape_request(info_hashes: Vec) -> WebResult { - Ok(ScrapeRequest { - info_hashes, - }) -} - -/// Authenticate AnnounceRequest using optional AuthKey -async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { - match tracker.authenticate_request(info_hash, auth_key).await { - Ok(_) => Ok(()), - Err(e) => Err(ServerError::from(e)) - } -} - -/// GET /announce/ -fn announce(tracker: Arc,) -> impl Filter + Clone { - warp::path::path("announce") - .and(warp::filters::method::get()) - .and(with_announce_request()) - .and(with_auth_key()) - .and(with_tracker(tracker)) - .and_then(handle_announce) -} - -/// GET /scrape/ -fn scrape(tracker: Arc,) -> impl Filter + Clone { - warp::path::path("scrape") - .and(warp::filters::method::get()) - .and(with_scrape_request()) - .and(with_auth_key()) - .and(with_tracker(tracker)) - .and_then(handle_scrape) -} - -/// Handle announce request -pub async fn handle_announce(announce_request: AnnounceRequest, auth_key: Option, tracker: Arc,) -> WebResult { - if let Err(e) = authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await { - return Err(reject::custom(e)) - } - - let peer = TorrentPeer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip()); - - match tracker.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer).await { - Err(e) => Err(reject::custom(ServerError::from(e))), - Ok(torrent_stats) => { - // get all peers excluding the client_addr - let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; - if peers.is_none() { return Err(reject::custom(ServerError::NoPeersFound)) } - - // success response - let announce_interval = tracker.config.http_tracker.announce_interval; - send_announce_response(&announce_request, torrent_stats, peers.unwrap(), announce_interval) - } - } -} - -/// Handle scrape request -pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option, tracker: Arc,) -> WebResult { - let mut files: HashMap = HashMap::new(); - let db = tracker.get_torrents().await; - - for info_hash in scrape_request.info_hashes.iter() { - // authenticate every info_hash - if authenticate(info_hash, &auth_key, tracker.clone()).await.is_err() { continue } - - let scrape_entry = match db.get(&info_hash) { - Some(torrent_info) => { - let (seeders, completed, leechers) = torrent_info.get_stats(); - - ScrapeResponseEntry { complete: seeders, downloaded: completed, incomplete: leechers } - } - None => { - ScrapeResponseEntry { complete: 0, downloaded: 0, incomplete: 0 } - } - }; - - if let Ok(encoded_info_hash) = url_encode_bytes(&info_hash.0) { - files.insert(encoded_info_hash, scrape_entry); - } - } - - send_scrape_response(files) -} - -/// Send announce response -fn send_announce_response(announce_request: &AnnounceRequest, torrent_stats: TorrentStats, peers: Vec, interval: u32) -> WebResult { - let http_peers: Vec = peers.iter().map(|peer| Peer { - peer_id: String::from_utf8_lossy(&peer.peer_id.0).to_string(), - ip: peer.peer_addr.ip(), - port: peer.peer_addr.port() - }).collect(); - - let res = AnnounceResponse { - interval, - complete: torrent_stats.seeders, - incomplete: torrent_stats.leechers, - peers: http_peers - }; - - // check for compact response request - if let Some(1) = announce_request.compact { - match res.write_compact() { - Ok(body) => Ok(Response::new(body)), - Err(_) => Err(reject::custom(ServerError::InternalServerError)) - } - } else { - Ok(Response::new(res.write().into())) - } -} - -/// Send scrape response -fn send_scrape_response(files: HashMap) -> WebResult { - Ok(Response::new(ScrapeResponse { files }.write())) -} - -/// Handle all server errors and send error reply -async fn handle_error(r: Rejection) -> std::result::Result { - if let Some(e) = r.find::() { - debug!("{:?}", e); - let reply = warp::reply::json(&ErrorResponse { failure_reason: e.to_string() }); - Ok(warp::reply::with_status(reply, StatusCode::BAD_REQUEST)) - } else { - let reply = warp::reply::json(&ErrorResponse { failure_reason: "internal server error".to_string() }); - Ok(warp::reply::with_status(reply, StatusCode::INTERNAL_SERVER_ERROR)) - } -} From 68fb76b7a646a622ada1a391f83e5e6e0b73a56a Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 00:42:52 +0100 Subject: [PATCH 05/16] refactor: http tracker additional separation imports --- src/torrust_http_tracker/filters.rs | 9 +++++++++ src/torrust_http_tracker/handlers.rs | 12 +++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/torrust_http_tracker/filters.rs b/src/torrust_http_tracker/filters.rs index be1d234f6..0c2a2f635 100644 --- a/src/torrust_http_tracker/filters.rs +++ b/src/torrust_http_tracker/filters.rs @@ -1,3 +1,12 @@ +use std::convert::Infallible; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; +use warp::{Filter, reject, Rejection}; +use crate::{InfoHash, MAX_SCRAPE_TORRENTS, TorrentTracker}; +use crate::key_manager::AuthKey; +use crate::torrust_http_tracker::{AnnounceRequest, AnnounceRequestQuery, ScrapeRequest, ServerError}; + /// Pass Arc along fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { warp::any() diff --git a/src/torrust_http_tracker/handlers.rs b/src/torrust_http_tracker/handlers.rs index 1016b4668..1037852ec 100644 --- a/src/torrust_http_tracker/handlers.rs +++ b/src/torrust_http_tracker/handlers.rs @@ -1,3 +1,14 @@ +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::Arc; +use log::debug; +use warp::{reject, Rejection, Reply}; +use warp::http::{Response, StatusCode}; +use crate::{InfoHash, TorrentPeer, TorrentStats, TorrentTracker}; +use crate::key_manager::AuthKey; +use crate::torrust_http_tracker::{AnnounceRequest, AnnounceResponse, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry, ServerError}; +use crate::utils::url_encode_bytes; + /// Authenticate AnnounceRequest using optional AuthKey async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { match tracker.authenticate_request(info_hash, auth_key).await { @@ -40,7 +51,6 @@ pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option { let (seeders, completed, leechers) = torrent_info.get_stats(); - ScrapeResponseEntry { complete: seeders, downloaded: completed, incomplete: leechers } } None => { From 87c218cca79506f180fcd4e5dcc3644a373ccc94 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 00:44:47 +0100 Subject: [PATCH 06/16] refactor: http tracker added pub to some fn --- src/torrust_http_tracker/handlers.rs | 4 ++-- src/torrust_http_tracker/routes.rs | 4 ++-- src/torrust_http_tracker/server.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/torrust_http_tracker/handlers.rs b/src/torrust_http_tracker/handlers.rs index 1037852ec..7237d4817 100644 --- a/src/torrust_http_tracker/handlers.rs +++ b/src/torrust_http_tracker/handlers.rs @@ -10,7 +10,7 @@ use crate::torrust_http_tracker::{AnnounceRequest, AnnounceResponse, ErrorRespon use crate::utils::url_encode_bytes; /// Authenticate AnnounceRequest using optional AuthKey -async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { +pub async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { match tracker.authenticate_request(info_hash, auth_key).await { Ok(_) => Ok(()), Err(e) => Err(ServerError::from(e)) @@ -67,7 +67,7 @@ pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option std::result::Result { +pub async fn handle_error(r: Rejection) -> std::result::Result { if let Some(e) = r.find::() { debug!("{:?}", e); let reply = warp::reply::json(&ErrorResponse { failure_reason: e.to_string() }); diff --git a/src/torrust_http_tracker/routes.rs b/src/torrust_http_tracker/routes.rs index 20821f511..64a448d4d 100644 --- a/src/torrust_http_tracker/routes.rs +++ b/src/torrust_http_tracker/routes.rs @@ -2,10 +2,10 @@ use std::convert::Infallible; use std::sync::Arc; use warp::{Filter, Rejection}; use crate::TorrentTracker; -use crate::torrust_http_tracker::{handle_announce, handle_scrape}; +use crate::torrust_http_tracker::{handle_announce, handle_error, handle_scrape}; /// All routes -fn routes(tracker: Arc,) -> impl Filter + Clone { +pub fn routes(tracker: Arc,) -> impl Filter + Clone { announce(tracker.clone()) .or(scrape(tracker.clone())) .recover(handle_error) diff --git a/src/torrust_http_tracker/server.rs b/src/torrust_http_tracker/server.rs index da739453d..c521d797b 100644 --- a/src/torrust_http_tracker/server.rs +++ b/src/torrust_http_tracker/server.rs @@ -12,7 +12,7 @@ use crate::{TorrentPeer, TorrentStats}; use crate::key_manager::AuthKey; use crate::common::*; use crate::torrust_http_tracker::request::AnnounceRequestQuery; -use crate::torrust_http_tracker::{AnnounceRequest, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry}; +use crate::torrust_http_tracker::{AnnounceRequest, ErrorResponse, Peer, routes, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry}; use crate::torrust_http_tracker::errors::ServerError; use crate::utils::url_encode_bytes; From 0b7a1c7a776252151b7d7f5ca9665fe647991b1a Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 00:52:44 +0100 Subject: [PATCH 07/16] refactor: http tracker imports --- src/torrust_http_tracker/filters.rs | 12 ++++++------ src/torrust_http_tracker/handlers.rs | 2 ++ src/torrust_http_tracker/mod.rs | 2 ++ src/torrust_http_tracker/routes.rs | 2 +- src/torrust_http_tracker/server.rs | 21 +++------------------ 5 files changed, 14 insertions(+), 25 deletions(-) diff --git a/src/torrust_http_tracker/filters.rs b/src/torrust_http_tracker/filters.rs index 0c2a2f635..85e345b12 100644 --- a/src/torrust_http_tracker/filters.rs +++ b/src/torrust_http_tracker/filters.rs @@ -5,16 +5,16 @@ use std::sync::Arc; use warp::{Filter, reject, Rejection}; use crate::{InfoHash, MAX_SCRAPE_TORRENTS, TorrentTracker}; use crate::key_manager::AuthKey; -use crate::torrust_http_tracker::{AnnounceRequest, AnnounceRequestQuery, ScrapeRequest, ServerError}; +use crate::torrust_http_tracker::{AnnounceRequest, AnnounceRequestQuery, ScrapeRequest, ServerError, WebResult}; /// Pass Arc along -fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { +pub fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { warp::any() .map(move || tracker.clone()) } /// Check for infoHash -fn with_info_hash() -> impl Filter,), Error = Rejection> + Clone { +pub fn with_info_hash() -> impl Filter,), Error = Rejection> + Clone { warp::filters::query::raw() .and_then(info_hashes) } @@ -45,7 +45,7 @@ async fn info_hashes(raw_query: String) -> WebResult> { } /// Pass Arc along -fn with_auth_key() -> impl Filter,), Error = warp::Rejection> + Clone { +pub fn with_auth_key() -> impl Filter,), Error = warp::Rejection> + Clone { warp::path::param::() .map(|key_string: String| { AuthKey::from_string(&key_string) @@ -53,7 +53,7 @@ fn with_auth_key() -> impl Filter,), Error = warp::Re } /// Check for AnnounceRequest -fn with_announce_request() -> impl Filter + Clone { +pub fn with_announce_request() -> impl Filter + Clone { warp::filters::query::query::() .and(with_info_hash()) .and(warp::addr::remote()) @@ -78,7 +78,7 @@ async fn announce_request(announce_request_query: AnnounceRequestQuery, info_has } /// Check for ScrapeRequest -fn with_scrape_request() -> impl Filter + Clone { +pub fn with_scrape_request() -> impl Filter + Clone { warp::any() .and(with_info_hash()) .and_then(scrape_request) diff --git a/src/torrust_http_tracker/handlers.rs b/src/torrust_http_tracker/handlers.rs index 7237d4817..6f7ad4aa9 100644 --- a/src/torrust_http_tracker/handlers.rs +++ b/src/torrust_http_tracker/handlers.rs @@ -9,6 +9,8 @@ use crate::key_manager::AuthKey; use crate::torrust_http_tracker::{AnnounceRequest, AnnounceResponse, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry, ServerError}; use crate::utils::url_encode_bytes; +type WebResult = std::result::Result; + /// Authenticate AnnounceRequest using optional AuthKey pub async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { match tracker.authenticate_request(info_hash, auth_key).await { diff --git a/src/torrust_http_tracker/mod.rs b/src/torrust_http_tracker/mod.rs index 733e69704..0e5cf91ee 100644 --- a/src/torrust_http_tracker/mod.rs +++ b/src/torrust_http_tracker/mod.rs @@ -13,3 +13,5 @@ pub use self::errors::*; pub use self::routes::*; pub use self::handlers::*; pub use self::filters::*; + +pub type WebResult = std::result::Result; diff --git a/src/torrust_http_tracker/routes.rs b/src/torrust_http_tracker/routes.rs index 64a448d4d..9c7b45b4c 100644 --- a/src/torrust_http_tracker/routes.rs +++ b/src/torrust_http_tracker/routes.rs @@ -2,7 +2,7 @@ use std::convert::Infallible; use std::sync::Arc; use warp::{Filter, Rejection}; use crate::TorrentTracker; -use crate::torrust_http_tracker::{handle_announce, handle_error, handle_scrape}; +use crate::torrust_http_tracker::{handle_announce, handle_error, handle_scrape, with_announce_request, with_auth_key, with_scrape_request, with_tracker}; /// All routes pub fn routes(tracker: Arc,) -> impl Filter + Clone { diff --git a/src/torrust_http_tracker/server.rs b/src/torrust_http_tracker/server.rs index c521d797b..90f8a84d0 100644 --- a/src/torrust_http_tracker/server.rs +++ b/src/torrust_http_tracker/server.rs @@ -1,22 +1,7 @@ -use std::collections::HashMap; -use std::convert::Infallible; -use std::net::{SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; -use std::str::FromStr; -use log::{debug}; -use warp::{reply::Reply, Filter, Rejection, reject}; -use warp::http::{Response, StatusCode}; -use super::{AnnounceResponse}; -use crate::tracker::{TorrentTracker}; -use crate::{TorrentPeer, TorrentStats}; -use crate::key_manager::AuthKey; -use crate::common::*; -use crate::torrust_http_tracker::request::AnnounceRequestQuery; -use crate::torrust_http_tracker::{AnnounceRequest, ErrorResponse, Peer, routes, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry}; -use crate::torrust_http_tracker::errors::ServerError; -use crate::utils::url_encode_bytes; - -type WebResult = std::result::Result; +use crate::TorrentTracker; +use crate::torrust_http_tracker::routes; /// Server that listens on HTTP, needs a TorrentTracker #[derive(Clone)] From 7c86f45d2e77db49b7a2f96be8176384f6bf9656 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 00:57:53 +0100 Subject: [PATCH 08/16] refactor: http tracker --- src/torrust_http_tracker/handlers.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/torrust_http_tracker/handlers.rs b/src/torrust_http_tracker/handlers.rs index 6f7ad4aa9..cd7a8269c 100644 --- a/src/torrust_http_tracker/handlers.rs +++ b/src/torrust_http_tracker/handlers.rs @@ -6,12 +6,10 @@ use warp::{reject, Rejection, Reply}; use warp::http::{Response, StatusCode}; use crate::{InfoHash, TorrentPeer, TorrentStats, TorrentTracker}; use crate::key_manager::AuthKey; -use crate::torrust_http_tracker::{AnnounceRequest, AnnounceResponse, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry, ServerError}; +use crate::torrust_http_tracker::{AnnounceRequest, AnnounceResponse, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry, ServerError, WebResult}; use crate::utils::url_encode_bytes; -type WebResult = std::result::Result; - -/// Authenticate AnnounceRequest using optional AuthKey +/// Authenticate InfoHash using optional AuthKey pub async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { match tracker.authenticate_request(info_hash, auth_key).await { Ok(_) => Ok(()), From f23d38a9b454b82b9ffd804b5a09d52deb6ac124 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 01:00:35 +0100 Subject: [PATCH 09/16] refactor: default http port from 7878 > 6969 --- src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index c2b335d5d..07eb418cd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -135,7 +135,7 @@ impl Configuration { }, http_tracker: HttpTrackerConfig { enabled: false, - bind_address: String::from("0.0.0.0:7878"), + bind_address: String::from("0.0.0.0:6969"), announce_interval: 120, ssl_enabled: false, ssl_cert_path: None, From 007a4219b69ff74ca488c64e49034e22dee27c41 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 01:14:15 +0100 Subject: [PATCH 10/16] refactor: changed bytes from u32 to u64 --- src/torrust_http_tracker/mod.rs | 1 + src/torrust_http_tracker/request.rs | 13 +++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/torrust_http_tracker/mod.rs b/src/torrust_http_tracker/mod.rs index 0e5cf91ee..ea6675dce 100644 --- a/src/torrust_http_tracker/mod.rs +++ b/src/torrust_http_tracker/mod.rs @@ -14,4 +14,5 @@ pub use self::routes::*; pub use self::handlers::*; pub use self::filters::*; +pub type Bytes = u64; pub type WebResult = std::result::Result; diff --git a/src/torrust_http_tracker/request.rs b/src/torrust_http_tracker/request.rs index b62b9430a..8d90ede1f 100644 --- a/src/torrust_http_tracker/request.rs +++ b/src/torrust_http_tracker/request.rs @@ -1,15 +1,16 @@ use std::net::SocketAddr; use serde::{Deserialize}; use crate::InfoHash; +use crate::torrust_http_tracker::Bytes; #[derive(Deserialize)] pub struct AnnounceRequestQuery { - pub downloaded: u32, - pub uploaded: u32, + pub downloaded: Bytes, + pub uploaded: Bytes, pub key: String, pub peer_id: String, pub port: u16, - pub left: u32, + pub left: Bytes, pub event: Option, pub compact: Option, } @@ -17,11 +18,11 @@ pub struct AnnounceRequestQuery { pub struct AnnounceRequest { pub info_hash: InfoHash, pub peer_addr: SocketAddr, - pub downloaded: u32, - pub uploaded: u32, + pub downloaded: Bytes, + pub uploaded: Bytes, pub peer_id: String, pub port: u16, - pub left: u32, + pub left: Bytes, pub event: Option, pub compact: Option, } From d3270888b74936d3118b743b9b43df30a82191fc Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 02:24:48 +0100 Subject: [PATCH 11/16] fix: udp thread spawn overhead --- src/common.rs | 2 - src/main.rs | 7 +-- src/torrust_udp_tracker/errors.rs | 45 +++++++++++++++ src/torrust_udp_tracker/mod.rs | 15 +++++ src/torrust_udp_tracker/request.rs | 33 +++++++++++ src/torrust_udp_tracker/response.rs | 6 ++ src/torrust_udp_tracker/server.rs | 90 ++++++++++------------------- 7 files changed, 130 insertions(+), 68 deletions(-) create mode 100644 src/torrust_udp_tracker/errors.rs create mode 100644 src/torrust_udp_tracker/request.rs create mode 100644 src/torrust_udp_tracker/response.rs diff --git a/src/common.rs b/src/common.rs index 1ee1190ab..d47205c3c 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,9 +1,7 @@ use serde::{Deserialize, Serialize}; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; -pub const MAX_PACKET_SIZE: usize = 0xffff; pub const MAX_SCRAPE_TORRENTS: u8 = 74; -pub const PROTOCOL_ID: i64 = 4_497_486_125_440; // protocol constant pub const AUTH_KEY_LENGTH: usize = 32; #[repr(u32)] diff --git a/src/main.rs b/src/main.rs index ce8887742..d8a73854e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -95,15 +95,12 @@ fn start_http_tracker_server(config: &HttpTrackerConfig, tracker: Arc) -> JoinHandle<()> { - info!("Starting UDP server on: {}", config.bind_address); let udp_server = UdpServer::new(tracker).await.unwrap_or_else(|e| { panic!("Could not start UDP server: {}", e); }); - info!("Starting UDP tracker server.."); + info!("Starting UDP server on: {}", config.bind_address); tokio::spawn(async move { - if let Err(e) = udp_server.accept_packets().await { - panic!("Could not start UDP server: {}", e); - } + udp_server.start().await; }) } diff --git a/src/torrust_udp_tracker/errors.rs b/src/torrust_udp_tracker/errors.rs new file mode 100644 index 000000000..d6a24ac38 --- /dev/null +++ b/src/torrust_udp_tracker/errors.rs @@ -0,0 +1,45 @@ +use warp::reject::Reject; +use thiserror::Error; +use crate::TorrentError; + +#[derive(Error, Debug)] +pub enum ServerError { + #[error("internal server error")] + InternalServerError, + + #[error("info_hash is either missing or invalid")] + InvalidInfoHash, + + #[error("could not find remote address")] + AddressNotFound, + + #[error("torrent has no peers")] + NoPeersFound, + + #[error("torrent not on whitelist")] + TorrentNotWhitelisted, + + #[error("peer not authenticated")] + PeerNotAuthenticated, + + #[error("invalid authentication key")] + PeerKeyNotValid, + + #[error("exceeded info_hash limit")] + ExceededInfoHashLimit, +} + +impl Reject for ServerError {} + +impl From for ServerError { + fn from(e: TorrentError) -> Self { + match e { + TorrentError::TorrentNotWhitelisted => ServerError::TorrentNotWhitelisted, + TorrentError::PeerNotAuthenticated => ServerError::PeerNotAuthenticated, + TorrentError::PeerKeyNotValid => ServerError::PeerKeyNotValid, + TorrentError::NoPeersFound => ServerError::NoPeersFound, + TorrentError::CouldNotSendResponse => ServerError::InternalServerError, + TorrentError::InvalidInfoHash => ServerError::InvalidInfoHash, + } + } +} diff --git a/src/torrust_udp_tracker/mod.rs b/src/torrust_udp_tracker/mod.rs index 74f47ad34..dc653f020 100644 --- a/src/torrust_udp_tracker/mod.rs +++ b/src/torrust_udp_tracker/mod.rs @@ -1 +1,16 @@ +pub mod errors; +pub mod request; +pub mod response; pub mod server; + +use self::errors::*; +use self::request::*; +use self::response::*; +use self::server::*; + +pub type Bytes = u64; +pub type Port = u16; +pub type TransactionId = i64; + +pub const MAX_PACKET_SIZE: usize = 0xffff; +pub const PROTOCOL_ID: i64 = 0x41727101980; diff --git a/src/torrust_udp_tracker/request.rs b/src/torrust_udp_tracker/request.rs new file mode 100644 index 000000000..e5fcfdda5 --- /dev/null +++ b/src/torrust_udp_tracker/request.rs @@ -0,0 +1,33 @@ +use std::net::Ipv4Addr; +use aquatic_udp_protocol::{AnnounceEvent, AnnounceRequest}; +use crate::{InfoHash, PeerId}; +use crate::torrust_udp_tracker::{Bytes, Port, TransactionId}; + +struct AnnounceRequest2 { + pub connection_id: i64, + pub transaction_id: i32, + pub info_hash: InfoHash, + pub peer_id: PeerId, + pub bytes_downloaded: Bytes, + pub bytes_uploaded: Bytes, + pub bytes_left: Bytes, + pub event: AnnounceEvent, + pub ip_address: Option, + pub key: u32, + pub peers_wanted: u32, + pub port: Port +} + +pub struct AnnounceRequestWrapper { + pub announce_request: AnnounceRequest, + pub info_hash: InfoHash, +} + +impl AnnounceRequestWrapper { + pub fn new(announce_request: AnnounceRequest) -> Self { + AnnounceRequestWrapper { + announce_request: announce_request.clone(), + info_hash: InfoHash(announce_request.info_hash.0) + } + } +} diff --git a/src/torrust_udp_tracker/response.rs b/src/torrust_udp_tracker/response.rs new file mode 100644 index 000000000..18b3c8807 --- /dev/null +++ b/src/torrust_udp_tracker/response.rs @@ -0,0 +1,6 @@ +use crate::torrust_udp_tracker::TransactionId; + +pub struct ErrorResponse { + pub transaction_id: TransactionId, + pub message: String, +} diff --git a/src/torrust_udp_tracker/server.rs b/src/torrust_udp_tracker/server.rs index b3cbccf7f..5d6666468 100644 --- a/src/torrust_udp_tracker/server.rs +++ b/src/torrust_udp_tracker/server.rs @@ -10,25 +10,9 @@ use crate::common::*; use crate::utils::get_connection_id; use crate::tracker::TorrentTracker; use crate::{InfoHash, TorrentError, TorrentPeer}; - -struct RequestError { - error: TorrentError, - transaction_id: TransactionId -} - -struct AnnounceRequestWrapper { - announce_request: AnnounceRequest, - info_hash: InfoHash, -} - -impl AnnounceRequestWrapper { - pub fn new(announce_request: AnnounceRequest) -> Self { - AnnounceRequestWrapper { - announce_request: announce_request.clone(), - info_hash: InfoHash(announce_request.info_hash.0) - } - } -} +use crate::torrust_udp_tracker::errors::ServerError; +use crate::torrust_udp_tracker::MAX_PACKET_SIZE; +use crate::torrust_udp_tracker::request::{AnnounceRequestWrapper}; pub struct UdpServer { socket: UdpSocket, @@ -45,18 +29,15 @@ impl UdpServer { }) } - pub async fn accept_packets(self) -> Result<(), std::io::Error> { - let tracker = Arc::new(self); - + pub async fn start(&self) { loop { - let mut packet = vec![0u8; MAX_PACKET_SIZE]; - let (size, remote_address) = tracker.socket.recv_from(packet.as_mut_slice()).await?; - - let tracker = tracker.clone(); - tokio::spawn(async move { - debug!("Received {} bytes from {}", size, remote_address); - tracker.handle_packet(remote_address, &packet[..size]).await; - }); + let mut data = [0; MAX_PACKET_SIZE]; + if let Ok((valid_bytes, remote_addr)) = self.socket.recv_from(&mut data).await { + let data = &data[..valid_bytes]; + + debug!("Received {} bytes from {}", data.len(), remote_addr); + self.handle_packet(remote_addr, data).await; + } } } @@ -75,28 +56,36 @@ impl UdpServer { } async fn handle_request(&self, request: Request, remote_addr: SocketAddr) { - // todo: check for expired connection_id - let request_result = match request { + let transaction_id = match &request { + Request::Connect(connect_request) => { + connect_request.transaction_id + } + Request::Announce(announce_request) => { + announce_request.transaction_id + } + Request::Scrape(scrape_request) => { + scrape_request.transaction_id + } + }; + + let res = match request { Request::Connect(connect_request) => { self.handle_connect(remote_addr, &connect_request).await - .map_err(|error| RequestError { error, transaction_id: connect_request.transaction_id }) } Request::Announce(announce_request) => { self.handle_announce(remote_addr, &announce_request).await - .map_err(|error| RequestError { error, transaction_id: announce_request.transaction_id }) } Request::Scrape(scrape_request) => { self.handle_scrape(&scrape_request).await - .map_err(|error| RequestError { error, transaction_id: scrape_request.transaction_id }) } }; - match request_result { + match res { Ok(response) => { let _ = self.send_response(remote_addr, response).await; } - Err(request_error) => { - let _ = self.handle_error(request_error.error, remote_addr, request_error.transaction_id).await; + Err(e) => { + let _ = self.handle_error(e, remote_addr, transaction_id).await; } } } @@ -184,29 +173,8 @@ impl UdpServer { } async fn handle_error(&self, e: TorrentError, remote_addr: SocketAddr, tx_id: TransactionId) { - let mut err_msg = "oops"; - - match e { - TorrentError::TorrentNotWhitelisted => { - debug!("Info_hash not whitelisted."); - err_msg = "info hash not whitelisted"; - } - TorrentError::PeerKeyNotValid => { - debug!("Peer key not valid."); - err_msg = "peer key not valid"; - } - TorrentError::PeerNotAuthenticated => { - debug!("Peer not authenticated."); - err_msg = "peer not authenticated"; - } - TorrentError::NoPeersFound => { - debug!("No peers found."); - err_msg = "no peers found"; - } - _ => {} - } - - self.send_error(remote_addr, tx_id, err_msg).await; + let err = ServerError::from(e); + self.send_error(remote_addr, tx_id, &err.to_string()).await; } async fn send_response(&self, remote_addr: SocketAddr, response: Response) -> Result { From f8b6a97ef8eb74e8781b45ef606e5e3ee04ed932 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 03:41:23 +0100 Subject: [PATCH 12/16] refactor: completed udp tracker refactor --- src/torrust_http_tracker/errors.rs | 13 -- src/torrust_http_tracker/handlers.rs | 34 ++--- src/torrust_udp_tracker/errors.rs | 17 +-- src/torrust_udp_tracker/handlers.rs | 145 ++++++++++++++++++++++ src/torrust_udp_tracker/mod.rs | 10 +- src/torrust_udp_tracker/request.rs | 30 ++--- src/torrust_udp_tracker/server.rs | 177 ++------------------------- src/tracker.rs | 23 ++-- 8 files changed, 206 insertions(+), 243 deletions(-) create mode 100644 src/torrust_udp_tracker/handlers.rs diff --git a/src/torrust_http_tracker/errors.rs b/src/torrust_http_tracker/errors.rs index d6a24ac38..76a3e2330 100644 --- a/src/torrust_http_tracker/errors.rs +++ b/src/torrust_http_tracker/errors.rs @@ -30,16 +30,3 @@ pub enum ServerError { } impl Reject for ServerError {} - -impl From for ServerError { - fn from(e: TorrentError) -> Self { - match e { - TorrentError::TorrentNotWhitelisted => ServerError::TorrentNotWhitelisted, - TorrentError::PeerNotAuthenticated => ServerError::PeerNotAuthenticated, - TorrentError::PeerKeyNotValid => ServerError::PeerKeyNotValid, - TorrentError::NoPeersFound => ServerError::NoPeersFound, - TorrentError::CouldNotSendResponse => ServerError::InternalServerError, - TorrentError::InvalidInfoHash => ServerError::InvalidInfoHash, - } - } -} diff --git a/src/torrust_http_tracker/handlers.rs b/src/torrust_http_tracker/handlers.rs index cd7a8269c..cb972d69a 100644 --- a/src/torrust_http_tracker/handlers.rs +++ b/src/torrust_http_tracker/handlers.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use log::debug; use warp::{reject, Rejection, Reply}; use warp::http::{Response, StatusCode}; -use crate::{InfoHash, TorrentPeer, TorrentStats, TorrentTracker}; +use crate::{InfoHash, TorrentError, TorrentPeer, TorrentStats, TorrentTracker}; use crate::key_manager::AuthKey; use crate::torrust_http_tracker::{AnnounceRequest, AnnounceResponse, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry, ServerError, WebResult}; use crate::utils::url_encode_bytes; @@ -13,7 +13,18 @@ use crate::utils::url_encode_bytes; pub async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { match tracker.authenticate_request(info_hash, auth_key).await { Ok(_) => Ok(()), - Err(e) => Err(ServerError::from(e)) + Err(e) => { + let err = match e { + TorrentError::TorrentNotWhitelisted => ServerError::TorrentNotWhitelisted, + TorrentError::PeerNotAuthenticated => ServerError::PeerNotAuthenticated, + TorrentError::PeerKeyNotValid => ServerError::PeerKeyNotValid, + TorrentError::NoPeersFound => ServerError::NoPeersFound, + TorrentError::CouldNotSendResponse => ServerError::InternalServerError, + TorrentError::InvalidInfoHash => ServerError::InvalidInfoHash, + }; + + Err(err) + } } } @@ -24,19 +35,14 @@ pub async fn handle_announce(announce_request: AnnounceRequest, auth_key: Option } let peer = TorrentPeer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip()); + let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer).await; + // get all peers excluding the client_addr + let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; + if peers.is_none() { return Err(reject::custom(ServerError::NoPeersFound)) } - match tracker.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer).await { - Err(e) => Err(reject::custom(ServerError::from(e))), - Ok(torrent_stats) => { - // get all peers excluding the client_addr - let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; - if peers.is_none() { return Err(reject::custom(ServerError::NoPeersFound)) } - - // success response - let announce_interval = tracker.config.http_tracker.announce_interval; - send_announce_response(&announce_request, torrent_stats, peers.unwrap(), announce_interval) - } - } + // success response + let announce_interval = tracker.config.http_tracker.announce_interval; + send_announce_response(&announce_request, torrent_stats, peers.unwrap(), announce_interval) } /// Handle scrape request diff --git a/src/torrust_udp_tracker/errors.rs b/src/torrust_udp_tracker/errors.rs index d6a24ac38..d487a4487 100644 --- a/src/torrust_udp_tracker/errors.rs +++ b/src/torrust_udp_tracker/errors.rs @@ -1,4 +1,3 @@ -use warp::reject::Reject; use thiserror::Error; use crate::TorrentError; @@ -27,19 +26,7 @@ pub enum ServerError { #[error("exceeded info_hash limit")] ExceededInfoHashLimit, -} -impl Reject for ServerError {} - -impl From for ServerError { - fn from(e: TorrentError) -> Self { - match e { - TorrentError::TorrentNotWhitelisted => ServerError::TorrentNotWhitelisted, - TorrentError::PeerNotAuthenticated => ServerError::PeerNotAuthenticated, - TorrentError::PeerKeyNotValid => ServerError::PeerKeyNotValid, - TorrentError::NoPeersFound => ServerError::NoPeersFound, - TorrentError::CouldNotSendResponse => ServerError::InternalServerError, - TorrentError::InvalidInfoHash => ServerError::InvalidInfoHash, - } - } + #[error("bad request")] + BadRequest, } diff --git a/src/torrust_udp_tracker/handlers.rs b/src/torrust_udp_tracker/handlers.rs new file mode 100644 index 000000000..8549580d7 --- /dev/null +++ b/src/torrust_udp_tracker/handlers.rs @@ -0,0 +1,145 @@ +use std::net::SocketAddr; +use std::sync::Arc; +use aquatic_udp_protocol::{AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId}; +use crate::{InfoHash, MAX_SCRAPE_TORRENTS, TorrentError, TorrentPeer, TorrentTracker}; +use crate::torrust_udp_tracker::errors::ServerError; +use crate::torrust_udp_tracker::request::AnnounceRequestWrapper; +use crate::utils::get_connection_id; + +pub async fn authenticate(info_hash: &InfoHash, tracker: Arc) -> Result<(), ServerError> { + match tracker.authenticate_request(info_hash, &None).await { + Ok(_) => Ok(()), + Err(e) => { + let err = match e { + TorrentError::TorrentNotWhitelisted => ServerError::TorrentNotWhitelisted, + TorrentError::PeerNotAuthenticated => ServerError::PeerNotAuthenticated, + TorrentError::PeerKeyNotValid => ServerError::PeerKeyNotValid, + TorrentError::NoPeersFound => ServerError::NoPeersFound, + TorrentError::CouldNotSendResponse => ServerError::InternalServerError, + TorrentError::InvalidInfoHash => ServerError::InvalidInfoHash, + }; + + Err(err) + } + } +} + +pub async fn handle_packet(remote_addr: SocketAddr, payload: &[u8], tracker: Arc) -> Response { + match Request::from_bytes(&payload[..payload.len()], MAX_SCRAPE_TORRENTS).map_err(|_| ServerError::InternalServerError) { + Ok(request) => { + let transaction_id = match &request { + Request::Connect(connect_request) => { + connect_request.transaction_id + } + Request::Announce(announce_request) => { + announce_request.transaction_id + } + Request::Scrape(scrape_request) => { + scrape_request.transaction_id + } + }; + + match handle_request(request, remote_addr, tracker).await { + Ok(response) => response, + Err(e) => handle_error(e, transaction_id) + } + } + // bad request + Err(_) => handle_error(ServerError::BadRequest, TransactionId(0)) + } +} + +pub async fn handle_request(request: Request, remote_addr: SocketAddr, tracker: Arc) -> Result { + match request { + Request::Connect(connect_request) => { + handle_connect(remote_addr, &connect_request).await + } + Request::Announce(announce_request) => { + handle_announce(remote_addr, &announce_request, tracker).await + } + Request::Scrape(scrape_request) => { + handle_scrape(&scrape_request, tracker).await + } + } +} + +pub async fn handle_connect(remote_addr: SocketAddr, request: &ConnectRequest) -> Result { + let connection_id = get_connection_id(&remote_addr); + + let response = Response::from(ConnectResponse { + transaction_id: request.transaction_id, + connection_id, + }); + + Ok(response) +} + +pub async fn handle_announce(remote_addr: SocketAddr, announce_request: &AnnounceRequest, tracker: Arc) -> Result { + let wrapped_announce_request = AnnounceRequestWrapper::new(announce_request.clone()); + + authenticate(&wrapped_announce_request.info_hash, tracker.clone()).await?; + + let peer = TorrentPeer::from_udp_announce_request(&wrapped_announce_request.announce_request, remote_addr, tracker.config.get_ext_ip()); + + let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer).await; + // get all peers excluding the client_addr + let peers = match tracker.get_torrent_peers(&wrapped_announce_request.info_hash, &peer.peer_addr).await { + Some(v) => v, + None => { return Err(ServerError::NoPeersFound); } + }; + + Ok(Response::from(AnnounceResponse { + transaction_id: wrapped_announce_request.announce_request.transaction_id, + announce_interval: AnnounceInterval(tracker.config.udp_tracker.announce_interval as i32), + leechers: NumberOfPeers(torrent_stats.leechers as i32), + seeders: NumberOfPeers(torrent_stats.seeders as i32), + peers: peers.iter().map(|peer| + ResponsePeer { + ip_address: peer.peer_addr.ip(), + port: Port(peer.peer_addr.port()) + }).collect() + })) +} + +pub async fn handle_scrape(request: &ScrapeRequest, tracker: Arc) -> Result { + let db = tracker.get_torrents().await; + + let mut torrent_stats: Vec = Vec::new(); + + for info_hash in request.info_hashes.iter() { + let info_hash = InfoHash(info_hash.0); + + if authenticate(&info_hash, tracker.clone()).await.is_err() { continue } + + let scrape_entry = match db.get(&info_hash) { + Some(torrent_info) => { + let (seeders, completed, leechers) = torrent_info.get_stats(); + + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders as i32), + completed: NumberOfDownloads(completed as i32), + leechers: NumberOfPeers(leechers as i32), + } + } + None => { + TorrentScrapeStatistics { + seeders: NumberOfPeers(0), + completed: NumberOfDownloads(0), + leechers: NumberOfPeers(0), + } + } + }; + + torrent_stats.push(scrape_entry); + } + + Ok(Response::from(ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats + })) +} + +fn handle_error(e: ServerError, transaction_id: TransactionId) -> Response { + let message = e.to_string(); + Response::from(ErrorResponse { transaction_id, message }) +} diff --git a/src/torrust_udp_tracker/mod.rs b/src/torrust_udp_tracker/mod.rs index dc653f020..1e7d5d68d 100644 --- a/src/torrust_udp_tracker/mod.rs +++ b/src/torrust_udp_tracker/mod.rs @@ -2,11 +2,13 @@ pub mod errors; pub mod request; pub mod response; pub mod server; +pub mod handlers; -use self::errors::*; -use self::request::*; -use self::response::*; -use self::server::*; +pub use self::errors::*; +pub use self::request::*; +pub use self::response::*; +pub use self::server::*; +pub use self::handlers::*; pub type Bytes = u64; pub type Port = u16; diff --git a/src/torrust_udp_tracker/request.rs b/src/torrust_udp_tracker/request.rs index e5fcfdda5..f3757fe56 100644 --- a/src/torrust_udp_tracker/request.rs +++ b/src/torrust_udp_tracker/request.rs @@ -1,22 +1,22 @@ use std::net::Ipv4Addr; use aquatic_udp_protocol::{AnnounceEvent, AnnounceRequest}; use crate::{InfoHash, PeerId}; -use crate::torrust_udp_tracker::{Bytes, Port, TransactionId}; +use crate::torrust_udp_tracker::{Bytes, Port}; -struct AnnounceRequest2 { - pub connection_id: i64, - pub transaction_id: i32, - pub info_hash: InfoHash, - pub peer_id: PeerId, - pub bytes_downloaded: Bytes, - pub bytes_uploaded: Bytes, - pub bytes_left: Bytes, - pub event: AnnounceEvent, - pub ip_address: Option, - pub key: u32, - pub peers_wanted: u32, - pub port: Port -} +// struct AnnounceRequest { +// pub connection_id: i64, +// pub transaction_id: i32, +// pub info_hash: InfoHash, +// pub peer_id: PeerId, +// pub bytes_downloaded: Bytes, +// pub bytes_uploaded: Bytes, +// pub bytes_left: Bytes, +// pub event: AnnounceEvent, +// pub ip_address: Option, +// pub key: u32, +// pub peers_wanted: u32, +// pub port: Port +// } pub struct AnnounceRequestWrapper { pub announce_request: AnnounceRequest, diff --git a/src/torrust_udp_tracker/server.rs b/src/torrust_udp_tracker/server.rs index 5d6666468..94108c767 100644 --- a/src/torrust_udp_tracker/server.rs +++ b/src/torrust_udp_tracker/server.rs @@ -11,7 +11,7 @@ use crate::utils::get_connection_id; use crate::tracker::TorrentTracker; use crate::{InfoHash, TorrentError, TorrentPeer}; use crate::torrust_udp_tracker::errors::ServerError; -use crate::torrust_udp_tracker::MAX_PACKET_SIZE; +use crate::torrust_udp_tracker::{handle_packet, MAX_PACKET_SIZE}; use crate::torrust_udp_tracker::request::{AnnounceRequestWrapper}; pub struct UdpServer { @@ -34,150 +34,14 @@ impl UdpServer { let mut data = [0; MAX_PACKET_SIZE]; if let Ok((valid_bytes, remote_addr)) = self.socket.recv_from(&mut data).await { let data = &data[..valid_bytes]; - debug!("Received {} bytes from {}", data.len(), remote_addr); - self.handle_packet(remote_addr, data).await; - } - } - } - - async fn handle_packet(&self, remote_addr: SocketAddr, payload: &[u8]) { - let request = Request::from_bytes(&payload[..payload.len()], MAX_SCRAPE_TORRENTS); - - match request { - Ok(request) => { - debug!("New request: {:?}", request); - self.handle_request(request, remote_addr).await; - } - Err(err) => { - debug!("request_from_bytes error: {:?}", err); - } - } - } - - async fn handle_request(&self, request: Request, remote_addr: SocketAddr) { - let transaction_id = match &request { - Request::Connect(connect_request) => { - connect_request.transaction_id - } - Request::Announce(announce_request) => { - announce_request.transaction_id - } - Request::Scrape(scrape_request) => { - scrape_request.transaction_id - } - }; - - let res = match request { - Request::Connect(connect_request) => { - self.handle_connect(remote_addr, &connect_request).await - } - Request::Announce(announce_request) => { - self.handle_announce(remote_addr, &announce_request).await - } - Request::Scrape(scrape_request) => { - self.handle_scrape(&scrape_request).await - } - }; - - match res { - Ok(response) => { - let _ = self.send_response(remote_addr, response).await; - } - Err(e) => { - let _ = self.handle_error(e, remote_addr, transaction_id).await; - } - } - } - - async fn handle_connect(&self, remote_addr: SocketAddr, request: &ConnectRequest) -> Result { - let connection_id = get_connection_id(&remote_addr); - - let response = Response::from(ConnectResponse { - transaction_id: request.transaction_id, - connection_id, - }); - - Ok(response) - } - - async fn handle_announce(&self, remote_addr: SocketAddr, announce_request: &AnnounceRequest) -> Result { - let wrapped_announce_request = AnnounceRequestWrapper::new(announce_request.clone()); - self.tracker.authenticate_request(&wrapped_announce_request.info_hash, &None).await?; - - let peer = TorrentPeer::from_udp_announce_request(&wrapped_announce_request.announce_request, remote_addr, self.tracker.config.get_ext_ip()); - - return match self.tracker.update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer).await { - Ok(torrent_stats) => { - // get all peers excluding the client_addr - let peers = match self.tracker.get_torrent_peers(&wrapped_announce_request.info_hash, &peer.peer_addr).await { - Some(v) => v, - None => { - return Err(TorrentError::NoPeersFound); - } - }; - - let response = Response::from(AnnounceResponse { - transaction_id: wrapped_announce_request.announce_request.transaction_id, - announce_interval: AnnounceInterval(self.tracker.config.udp_tracker.announce_interval as i32), - leechers: NumberOfPeers(torrent_stats.leechers as i32), - seeders: NumberOfPeers(torrent_stats.seeders as i32), - peers: peers.iter().map(|peer| - ResponsePeer { - ip_address: peer.peer_addr.ip(), - port: Port(peer.peer_addr.port()) - }).collect() - }); - - Ok(response) + let response = handle_packet(remote_addr, data, self.tracker.clone()).await; + self.send_response(remote_addr, response).await; } - Err(e) => Err(e) - } - } - - async fn handle_scrape(&self, request: &ScrapeRequest) -> Result { - let db = self.tracker.get_torrents().await; - - let mut torrent_stats: Vec = Vec::new(); - - for info_hash in request.info_hashes.iter() { - let info_hash = InfoHash(info_hash.0); - let scrape_entry = match db.get(&info_hash) { - Some(torrent_info) => { - let (seeders, completed, leechers) = torrent_info.get_stats(); - - TorrentScrapeStatistics { - seeders: NumberOfPeers(seeders as i32), - completed: NumberOfDownloads(completed as i32), - leechers: NumberOfPeers(leechers as i32), - } - } - None => { - TorrentScrapeStatistics { - seeders: NumberOfPeers(0), - completed: NumberOfDownloads(0), - leechers: NumberOfPeers(0), - } - } - }; - - torrent_stats.push(scrape_entry); } - - let response = Response::from(ScrapeResponse { - transaction_id: request.transaction_id, - torrent_stats - }); - - Ok(response) - } - - async fn handle_error(&self, e: TorrentError, remote_addr: SocketAddr, tx_id: TransactionId) { - let err = ServerError::from(e); - self.send_error(remote_addr, tx_id, &err.to_string()).await; } - async fn send_response(&self, remote_addr: SocketAddr, response: Response) -> Result { + async fn send_response(&self, remote_addr: SocketAddr, response: Response) { debug!("sending response to: {:?}", &remote_addr); let buffer = vec![0u8; MAX_PACKET_SIZE]; @@ -189,37 +53,14 @@ impl UdpServer { let inner = cursor.get_ref(); debug!("{:?}", &inner[..position]); - match self.send_packet(&remote_addr, &inner[..position]).await { - Ok(byte_size) => Ok(byte_size), - Err(e) => { - debug!("{:?}", e); - Err(()) - } - } - } - Err(_) => { - debug!("could not write response to bytes."); - Err(()) + self.send_packet(&remote_addr, &inner[..position]).await; } + Err(_) => { debug!("could not write response to bytes."); } } } - async fn send_error(&self, remote_addr: SocketAddr, transaction_id: TransactionId, error_msg: &str) { - let response = Response::from(ErrorResponse { - transaction_id, - message: error_msg.to_string(), - }); - - let _ = self.send_response(remote_addr, response).await; - } - - async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) -> Result { - match self.socket.send_to(payload, remote_addr).await { - Err(err) => { - debug!("failed to send a packet: {}", err); - Err(err) - }, - Ok(sz) => Ok(sz), - } + async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) { + // doesn't matter if it reaches or not + let _ = self.socket.send_to(payload, remote_addr).await; } } diff --git a/src/tracker.rs b/src/tracker.rs index 33d7ddb5f..b8b4ac823 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -361,31 +361,26 @@ impl TorrentTracker { } } - pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &TorrentPeer) -> Result { + pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &TorrentPeer) -> TorrentStats { let mut torrents = self.torrents.write().await; let torrent_entry = match torrents.entry(info_hash.clone()) { Entry::Vacant(vacant) => { - Ok(vacant.insert(TorrentEntry::new())) + vacant.insert(TorrentEntry::new()) } Entry::Occupied(entry) => { - Ok(entry.into_mut()) + entry.into_mut() } }; - match torrent_entry { - Ok(torrent_entry) => { - torrent_entry.update_peer(peer); + torrent_entry.update_peer(peer); - let (seeders, completed, leechers) = torrent_entry.get_stats(); + let (seeders, completed, leechers) = torrent_entry.get_stats(); - Ok(TorrentStats { - seeders, - leechers, - completed, - }) - } - Err(e) => Err(e) + TorrentStats { + seeders, + leechers, + completed, } } From f290b294443c4cd2500596575440f8ad5ec52956 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 03:52:38 +0100 Subject: [PATCH 13/16] feat: added ipv6 support for udp --- src/torrust_http_tracker/errors.rs | 1 - src/torrust_udp_tracker/errors.rs | 1 - src/torrust_udp_tracker/request.rs | 6 ++---- src/torrust_udp_tracker/server.rs | 24 +++++++++++------------- 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/src/torrust_http_tracker/errors.rs b/src/torrust_http_tracker/errors.rs index 76a3e2330..f0bedfe1b 100644 --- a/src/torrust_http_tracker/errors.rs +++ b/src/torrust_http_tracker/errors.rs @@ -1,6 +1,5 @@ use warp::reject::Reject; use thiserror::Error; -use crate::TorrentError; #[derive(Error, Debug)] pub enum ServerError { diff --git a/src/torrust_udp_tracker/errors.rs b/src/torrust_udp_tracker/errors.rs index d487a4487..fb29e969e 100644 --- a/src/torrust_udp_tracker/errors.rs +++ b/src/torrust_udp_tracker/errors.rs @@ -1,5 +1,4 @@ use thiserror::Error; -use crate::TorrentError; #[derive(Error, Debug)] pub enum ServerError { diff --git a/src/torrust_udp_tracker/request.rs b/src/torrust_udp_tracker/request.rs index f3757fe56..f3f67fdc1 100644 --- a/src/torrust_udp_tracker/request.rs +++ b/src/torrust_udp_tracker/request.rs @@ -1,7 +1,5 @@ -use std::net::Ipv4Addr; -use aquatic_udp_protocol::{AnnounceEvent, AnnounceRequest}; -use crate::{InfoHash, PeerId}; -use crate::torrust_udp_tracker::{Bytes, Port}; +use aquatic_udp_protocol::{AnnounceRequest}; +use crate::{InfoHash}; // struct AnnounceRequest { // pub connection_id: i64, diff --git a/src/torrust_udp_tracker/server.rs b/src/torrust_udp_tracker/server.rs index 94108c767..0da4ce140 100644 --- a/src/torrust_udp_tracker/server.rs +++ b/src/torrust_udp_tracker/server.rs @@ -1,18 +1,11 @@ -use log::debug; -use std; -use std::net::SocketAddr; -use std::sync::Arc; use std::io::Cursor; -use aquatic_udp_protocol::{AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, IpVersion, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId}; +use std::net::{SocketAddr}; +use std::sync::Arc; +use aquatic_udp_protocol::{IpVersion, Response}; +use log::debug; use tokio::net::UdpSocket; - -use crate::common::*; -use crate::utils::get_connection_id; -use crate::tracker::TorrentTracker; -use crate::{InfoHash, TorrentError, TorrentPeer}; -use crate::torrust_udp_tracker::errors::ServerError; +use crate::TorrentTracker; use crate::torrust_udp_tracker::{handle_packet, MAX_PACKET_SIZE}; -use crate::torrust_udp_tracker::request::{AnnounceRequestWrapper}; pub struct UdpServer { socket: UdpSocket, @@ -47,7 +40,12 @@ impl UdpServer { let buffer = vec![0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(buffer); - match response.write(&mut cursor, IpVersion::IPv4) { + let ip_version = match remote_addr { + SocketAddr::V4(_) => IpVersion::IPv4, + SocketAddr::V6(_) => IpVersion::IPv6 + }; + + match response.write(&mut cursor, ip_version) { Ok(_) => { let position = cursor.position() as usize; let inner = cursor.get_ref(); From 88f87192ff79756f346bb86b85cf4976d1428465 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 03:56:07 +0100 Subject: [PATCH 14/16] refactor: removed unused file --- src/torrust_udp_tracker/mod.rs | 2 -- src/torrust_udp_tracker/response.rs | 6 ------ 2 files changed, 8 deletions(-) delete mode 100644 src/torrust_udp_tracker/response.rs diff --git a/src/torrust_udp_tracker/mod.rs b/src/torrust_udp_tracker/mod.rs index 1e7d5d68d..cd4b99f5b 100644 --- a/src/torrust_udp_tracker/mod.rs +++ b/src/torrust_udp_tracker/mod.rs @@ -1,12 +1,10 @@ pub mod errors; pub mod request; -pub mod response; pub mod server; pub mod handlers; pub use self::errors::*; pub use self::request::*; -pub use self::response::*; pub use self::server::*; pub use self::handlers::*; diff --git a/src/torrust_udp_tracker/response.rs b/src/torrust_udp_tracker/response.rs deleted file mode 100644 index 18b3c8807..000000000 --- a/src/torrust_udp_tracker/response.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::torrust_udp_tracker::TransactionId; - -pub struct ErrorResponse { - pub transaction_id: TransactionId, - pub message: String, -} From d705fba1027ad859ac85200ed9be5787f719ac2e Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 04:16:48 +0100 Subject: [PATCH 15/16] fix: http tracker optional path param (authentication) --- src/torrust_http_tracker/filters.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/torrust_http_tracker/filters.rs b/src/torrust_http_tracker/filters.rs index 85e345b12..9e82fe946 100644 --- a/src/torrust_http_tracker/filters.rs +++ b/src/torrust_http_tracker/filters.rs @@ -45,10 +45,13 @@ async fn info_hashes(raw_query: String) -> WebResult> { } /// Pass Arc along -pub fn with_auth_key() -> impl Filter,), Error = warp::Rejection> + Clone { +pub fn with_auth_key() -> impl Filter,), Error = Infallible> + Clone { warp::path::param::() - .map(|key_string: String| { - AuthKey::from_string(&key_string) + .map(|key: String| { + AuthKey::from_string(&key) + }) + .or_else(|_| async { + Ok::<(Option,), Infallible>((None,)) }) } From 984ec793bc1091c615e83075879b4b94cd4f7a52 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 31 Jan 2022 04:33:48 +0100 Subject: [PATCH 16/16] feat: added root path for announcing to http tracker --- src/torrust_http_tracker/routes.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/torrust_http_tracker/routes.rs b/src/torrust_http_tracker/routes.rs index 9c7b45b4c..ad873e83e 100644 --- a/src/torrust_http_tracker/routes.rs +++ b/src/torrust_http_tracker/routes.rs @@ -6,12 +6,23 @@ use crate::torrust_http_tracker::{handle_announce, handle_error, handle_scrape, /// All routes pub fn routes(tracker: Arc,) -> impl Filter + Clone { - announce(tracker.clone()) + root(tracker.clone()) + .or(announce(tracker.clone())) .or(scrape(tracker.clone())) .recover(handle_error) } -/// GET /announce/ +/// GET / or / +fn root(tracker: Arc,) -> impl Filter + Clone { + warp::any() + .and(warp::filters::method::get()) + .and(with_announce_request()) + .and(with_auth_key()) + .and(with_tracker(tracker)) + .and_then(handle_announce) +} + +/// GET /announce or /announce/ fn announce(tracker: Arc,) -> impl Filter + Clone { warp::path::path("announce") .and(warp::filters::method::get())