diff --git a/Cargo.lock b/Cargo.lock index af57a3768..08a4722e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1742,7 +1742,7 @@ dependencies = [ [[package]] name = "torrust-tracker" -version = "2.1.0" +version = "2.1.1" dependencies = [ "aquatic_udp_protocol", "binascii", @@ -1763,6 +1763,7 @@ dependencies = [ "serde_bencode", "serde_bytes", "serde_json", + "thiserror", "tokio", "toml", "warp", diff --git a/Cargo.toml b/Cargo.toml index 6e78211ea..e3d0a06a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "torrust-tracker" -version = "2.1.0" -authors = ["Mick van Dijke ", "Naim A. "] +version = "2.1.1" +license = "AGPL-3.0" +authors = ["Mick van Dijke "] description = "A feature rich BitTorrent tracker." edition = "2018" @@ -30,5 +31,6 @@ rand = "0.8.4" env_logger = "0.9.0" config = "0.11" derive_more = "0.99" +thiserror = "1.0" aquatic_udp_protocol = "0.1.0" diff --git a/src/common.rs b/src/common.rs index 1ee1190ab..d47205c3c 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,9 +1,7 @@ use serde::{Deserialize, Serialize}; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; -pub const MAX_PACKET_SIZE: usize = 0xffff; pub const MAX_SCRAPE_TORRENTS: u8 = 74; -pub const PROTOCOL_ID: i64 = 4_497_486_125_440; // protocol constant pub const AUTH_KEY_LENGTH: usize = 32; #[repr(u32)] diff --git a/src/config.rs b/src/config.rs index c2b335d5d..07eb418cd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -135,7 +135,7 @@ impl Configuration { }, http_tracker: HttpTrackerConfig { enabled: false, - bind_address: String::from("0.0.0.0:7878"), + bind_address: String::from("0.0.0.0:6969"), announce_interval: 120, ssl_enabled: false, ssl_cert_path: None, diff --git a/src/http_server.rs b/src/http_server.rs deleted file mode 100644 index 9c942856c..000000000 --- a/src/http_server.rs +++ /dev/null @@ -1,374 +0,0 @@ -use std::collections::{HashMap}; -use crate::tracker::{TorrentTracker}; -use serde::{Deserialize, Serialize}; -use std::convert::Infallible; -use std::error::Error; -use std::io::Write; -use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; -use std::str::FromStr; -use log::{debug}; -use warp::{filters, reply::Reply, Filter}; -use warp::http::Response; -use crate::{TorrentError, TorrentPeer, TorrentStats}; -use crate::key_manager::AuthKey; -use crate::utils::url_encode_bytes; -use super::common::*; - -#[derive(Deserialize, Debug)] -pub struct AnnounceRequest { - pub downloaded: u32, - pub uploaded: u32, - pub key: String, - pub peer_id: String, - pub port: u16, - pub info_hash: String, - pub left: u32, - pub event: Option, - pub compact: Option, -} - -impl AnnounceRequest { - pub fn is_compact(&self) -> bool { - self.compact.unwrap_or(0) == 1 - } -} - -#[derive(Deserialize, Debug)] -pub struct ScrapeRequest { - pub info_hash: String, -} - -#[derive(Serialize)] -struct Peer { - peer_id: String, - ip: IpAddr, - port: u16, -} - -#[derive(Serialize)] -struct AnnounceResponse { - interval: u32, - //tracker_id: String, - complete: u32, - incomplete: u32, - peers: Vec -} - -impl AnnounceResponse { - pub fn write(&self) -> String { - serde_bencode::to_string(&self).unwrap() - } - - pub fn write_compact(&self) -> Result, Box> { - let mut peers_v4: Vec = Vec::new(); - let mut peers_v6: Vec = Vec::new(); - - for peer in &self.peers { - match peer.ip { - IpAddr::V4(ip) => { - peers_v4.write(&u32::from(ip).to_be_bytes())?; - peers_v4.write(&peer.port.to_be_bytes())?; - } - IpAddr::V6(ip) => { - peers_v6.write(&u128::from(ip).to_be_bytes())?; - peers_v6.write(&peer.port.to_be_bytes())?; - } - } - } - - debug!("{:?}", String::from_utf8_lossy(peers_v4.as_slice())); - debug!("{:?}", String::from_utf8_lossy(peers_v6.as_slice())); - - let mut bytes: Vec = Vec::new(); - bytes.write(b"d8:intervali")?; - bytes.write(&self.interval.to_string().as_bytes())?; - bytes.write(b"e8:completei")?; - bytes.write(&self.complete.to_string().as_bytes())?; - bytes.write(b"e10:incompletei")?; - bytes.write(&self.incomplete.to_string().as_bytes())?; - bytes.write(b"e5:peers")?; - bytes.write(&peers_v4.len().to_string().as_bytes())?; - bytes.write(b":")?; - bytes.write(peers_v4.as_slice())?; - bytes.write(b"e6:peers6")?; - bytes.write(&peers_v6.len().to_string().as_bytes())?; - bytes.write(b":")?; - bytes.write(peers_v6.as_slice())?; - bytes.write(b"e")?; - - debug!("{:?}", String::from_utf8_lossy(bytes.as_slice())); - Ok(bytes) - } -} - -#[derive(Serialize)] -struct ScrapeResponse { - files: HashMap -} - -impl ScrapeResponse { - pub fn write(&self) -> String { - serde_bencode::to_string(&self).unwrap() - } -} - -#[derive(Serialize)] -struct ScrapeResponseEntry { - complete: u32, - downloaded: u32, - incomplete: u32, -} - -#[derive(Serialize)] -struct ErrorResponse { - failure_reason: String -} - -impl warp::Reply for ErrorResponse { - fn into_response(self) -> warp::reply::Response { - Response::new(format!("{}", serde_bencode::to_string(&self).unwrap()).into()) - } -} - -#[derive(Clone)] -pub struct HttpServer { - tracker: Arc, -} - -impl HttpServer { - pub fn new(tracker: Arc) -> HttpServer { - HttpServer { - tracker - } - } - - // &self did not work here - pub fn routes(http_server: Arc) -> impl Filter + Clone + Send + Sync + 'static { - // optional tracker key - let opt_key = warp::path::param::() - .map(Some) - .or_else(|_| async { - // Ok(None) - Ok::<(Option,), std::convert::Infallible>((None,)) - }); - - // GET /announce?key=:String - // Announce peer - let hs1 = http_server.clone(); - let announce_route = - filters::path::path("announce") - .and(filters::method::get()) - .and(warp::addr::remote()) - .and(opt_key) - .and(filters::query::raw()) - .and(filters::query::query()) - .map(move |remote_addr, key, raw_query, query| { - debug!("Request: {}", raw_query); - (remote_addr, key, raw_query, query, hs1.clone()) - }) - .and_then(move |(remote_addr, key, raw_query, mut query, http_server): (Option, Option, String, AnnounceRequest, Arc)| { - async move { - if remote_addr.is_none() { return HttpServer::send_error("could not get remote address") } - - // query.info_hash somehow receives a corrupt string - // so we have to get the info_hash manually from the raw query - let info_hashes = HttpServer::info_hashes_from_raw_query(&raw_query); - if info_hashes.len() < 1 { return HttpServer::send_error("info_hash not found") } - query.info_hash = info_hashes[0].to_string(); - debug!("{:?}", query.info_hash); - - if let Some(err) = http_server.authenticate_request(&query.info_hash, key).await { return err } - - http_server.handle_announce(query, remote_addr.unwrap()).await - } - }); - - // GET /scrape?key=:String - // Get torrent info - let hs2 = http_server.clone(); - let scrape_route = - filters::path::path("scrape") - .and(filters::method::get()) - .and(opt_key) - .and(filters::query::raw()) - .map(move |key, raw_query| { - debug!("Request: {}", raw_query); - (key, raw_query, hs2.clone()) - }) - .and_then(move |(key, raw_query, http_server): (Option, String, Arc)| { - async move { - let info_hashes = HttpServer::info_hashes_from_raw_query(&raw_query); - if info_hashes.len() < 1 { return HttpServer::send_error("info_hash not found") } - if info_hashes.len() > 50 { return HttpServer::send_error("exceeded the max of 50 info_hashes") } - debug!("{:?}", info_hashes); - - // todo: verify all info_hashes before scrape - if let Some(err) = http_server.authenticate_request(&info_hashes[0].to_string(), key).await { return err } - - http_server.handle_scrape(info_hashes).await - } - }); - - // all routes - warp::any().and(announce_route.or(scrape_route)) - } - - fn info_hashes_from_raw_query(raw_query: &str) -> Vec { - let split_raw_query: Vec<&str> = raw_query.split("&").collect(); - let mut info_hashes: Vec = Vec::new(); - - for v in split_raw_query { - if v.contains("info_hash") { - let raw_info_hash = v.split("=").collect::>()[1]; - let info_hash_bytes = percent_encoding::percent_decode_str(raw_info_hash).collect::>(); - let info_hash = InfoHash::from_str(&hex::encode(info_hash_bytes)); - if let Ok(ih) = info_hash { - info_hashes.push(ih); - } - } - } - - info_hashes - } - - fn send_announce_response(query: &AnnounceRequest, torrent_stats: TorrentStats, peers: Vec, interval: u32) -> Result { - let http_peers: Vec = peers.iter().map(|peer| Peer { - peer_id: String::from_utf8_lossy(&peer.peer_id.0).to_string(), - ip: peer.peer_addr.ip(), - port: peer.peer_addr.port() - }).collect(); - - let res = AnnounceResponse { - interval, - complete: torrent_stats.seeders, - incomplete: torrent_stats.leechers, - peers: http_peers - }; - - // check for compact response request - let response = match query.compact { - None => Response::new(res.write().into()), - Some(int) => { - if int == 1 { - let res_compact = res.write_compact(); - match res_compact { - Ok(response) => Response::new(response.into()), - Err(e) => { - debug!("{}", e); - HttpServer::send_error("server error").unwrap() - } - } - } else { - Response::new(res.write().into()) - } - } - }; - - Ok(response) - } - - fn send_error(msg: &str) -> Result { - Ok(ErrorResponse { - failure_reason: msg.to_string() - }.into_response()) - } - - async fn authenticate_request(&self, info_hash_str: &str, key: Option) -> Option> { - let info_hash= InfoHash::from_str(info_hash_str); - if info_hash.is_err() { return Some(HttpServer::send_error("invalid info_hash")) } - - let auth_key = match key { - None => None, - Some(v) => AuthKey::from_string(&v) - }; - - if let Err(e) = self.tracker.authenticate_request(&info_hash.unwrap(), auth_key).await { - return match e { - TorrentError::TorrentNotWhitelisted => { - debug!("Info_hash not whitelisted."); - Some(HttpServer::send_error("torrent not whitelisted")) - } - TorrentError::PeerKeyNotValid => { - debug!("Peer key not valid."); - Some(HttpServer::send_error("peer key not valid")) - } - TorrentError::PeerNotAuthenticated => { - debug!("Peer not authenticated."); - Some(HttpServer::send_error("peer not authenticated")) - } - _ => { - debug!("Unhandled HTTP error."); - Some(HttpServer::send_error("oops")) - } - } - } - - None - } - - async fn handle_announce(&self, query: AnnounceRequest, remote_addr: SocketAddr) -> Result { - let info_hash = match InfoHash::from_str(&query.info_hash) { - Ok(v) => v, - Err(_) => { - return HttpServer::send_error("info_hash is invalid") - } - }; - - let peer = TorrentPeer::from_http_announce_request(&query, remote_addr, self.tracker.config.get_ext_ip()); - - match self.tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await { - Err(e) => { - debug!("{:?}", e); - HttpServer::send_error("server error") - } - Ok(torrent_stats) => { - // get all peers excluding the client_addr - let peers = self.tracker.get_torrent_peers(&info_hash, &peer.peer_addr).await; - if peers.is_none() { - debug!("No peers found after announce."); - return HttpServer::send_error("peer is invalid") - } - - // todo: add http announce interval config option - // success response - let announce_interval = self.tracker.config.http_tracker.announce_interval; - HttpServer::send_announce_response(&query, torrent_stats, peers.unwrap(), announce_interval) - } - } - } - - async fn handle_scrape(&self, info_hashes: Vec) -> Result { - let mut res = ScrapeResponse { - files: HashMap::new() - }; - let db = self.tracker.get_torrents().await; - - for info_hash in info_hashes.iter() { - let scrape_entry = match db.get(&info_hash) { - Some(torrent_info) => { - let (seeders, completed, leechers) = torrent_info.get_stats(); - - ScrapeResponseEntry { - complete: seeders, - downloaded: completed, - incomplete: leechers - } - } - None => { - ScrapeResponseEntry { - complete: 0, - downloaded: 0, - incomplete: 0 - } - } - }; - - if let Ok(encoded_info_hash) = url_encode_bytes(&info_hash.0) { - res.files.insert(encoded_info_hash, scrape_entry); - } - } - - Ok(Response::new(res.write().into())) - } -} diff --git a/src/lib.rs b/src/lib.rs index d040d3719..c055cfae4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,4 @@ pub mod config; -pub mod udp_server; -pub mod http_server; pub mod tracker; pub mod http_api_server; pub mod common; @@ -8,10 +6,12 @@ pub mod utils; pub mod database; pub mod key_manager; pub mod logging; +pub mod torrust_udp_tracker; +pub mod torrust_http_tracker; pub use self::config::*; -pub use self::udp_server::*; -pub use self::http_server::*; +pub use torrust_udp_tracker::server::*; +pub use torrust_http_tracker::server::*; pub use self::tracker::*; pub use self::http_api_server::*; pub use self::common::*; diff --git a/src/main.rs b/src/main.rs index cfb5365ed..d8a73854e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ +use std::net::SocketAddr; use log::{info}; use torrust_tracker::{http_api_server, Configuration, TorrentTracker, UdpServer, HttpTrackerConfig, UdpTrackerConfig, HttpApiConfig, logging, TrackerServer}; use std::sync::Arc; use tokio::task::JoinHandle; -use torrust_tracker::http_server::HttpServer; +use torrust_tracker::torrust_http_tracker::server::HttpServer; #[tokio::main] async fn main() { @@ -74,39 +75,32 @@ fn start_api_server(config: &HttpApiConfig, tracker: Arc) -> Joi } fn start_http_tracker_server(config: &HttpTrackerConfig, tracker: Arc) -> JoinHandle<()> { - info!("Starting HTTP server on: {}", config.bind_address); - let http_tracker = Arc::new(HttpServer::new(tracker)); - let bind_addr = config.bind_address.parse::().unwrap(); + let http_tracker = HttpServer::new(tracker); + let bind_addr = config.bind_address.parse::().unwrap(); let ssl_enabled = config.ssl_enabled; let ssl_cert_path = config.ssl_cert_path.clone(); let ssl_key_path = config.ssl_key_path.clone(); + tokio::spawn(async move { // run with tls if ssl_enabled and cert and key path are set - if ssl_enabled { - info!("SSL enabled."); - warp::serve(HttpServer::routes(http_tracker)) - .tls() - .cert_path(ssl_cert_path.as_ref().unwrap()) - .key_path(ssl_key_path.as_ref().unwrap()) - .run(bind_addr).await; + if ssl_enabled && ssl_cert_path.is_some() && ssl_key_path.is_some() { + info!("Starting HTTPS server on: {} (TLS)", bind_addr); + http_tracker.start_tls(bind_addr, ssl_cert_path.as_ref().unwrap(), ssl_key_path.as_ref().unwrap()).await; } else { - warp::serve(HttpServer::routes(http_tracker)) - .run(bind_addr).await; + info!("Starting HTTP server on: {}", bind_addr); + http_tracker.start(bind_addr).await; } }) } async fn start_udp_tracker_server(config: &UdpTrackerConfig, tracker: Arc) -> JoinHandle<()> { - info!("Starting UDP server on: {}", config.bind_address); let udp_server = UdpServer::new(tracker).await.unwrap_or_else(|e| { panic!("Could not start UDP server: {}", e); }); - info!("Starting UDP tracker server.."); + info!("Starting UDP server on: {}", config.bind_address); tokio::spawn(async move { - if let Err(e) = udp_server.accept_packets().await { - panic!("Could not start UDP server: {}", e); - } + udp_server.start().await; }) } diff --git a/src/torrust_http_tracker/errors.rs b/src/torrust_http_tracker/errors.rs new file mode 100644 index 000000000..f0bedfe1b --- /dev/null +++ b/src/torrust_http_tracker/errors.rs @@ -0,0 +1,31 @@ +use warp::reject::Reject; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ServerError { + #[error("internal server error")] + InternalServerError, + + #[error("info_hash is either missing or invalid")] + InvalidInfoHash, + + #[error("could not find remote address")] + AddressNotFound, + + #[error("torrent has no peers")] + NoPeersFound, + + #[error("torrent not on whitelist")] + TorrentNotWhitelisted, + + #[error("peer not authenticated")] + PeerNotAuthenticated, + + #[error("invalid authentication key")] + PeerKeyNotValid, + + #[error("exceeded info_hash limit")] + ExceededInfoHashLimit, +} + +impl Reject for ServerError {} diff --git a/src/torrust_http_tracker/filters.rs b/src/torrust_http_tracker/filters.rs new file mode 100644 index 000000000..9e82fe946 --- /dev/null +++ b/src/torrust_http_tracker/filters.rs @@ -0,0 +1,95 @@ +use std::convert::Infallible; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; +use warp::{Filter, reject, Rejection}; +use crate::{InfoHash, MAX_SCRAPE_TORRENTS, TorrentTracker}; +use crate::key_manager::AuthKey; +use crate::torrust_http_tracker::{AnnounceRequest, AnnounceRequestQuery, ScrapeRequest, ServerError, WebResult}; + +/// Pass Arc along +pub fn with_tracker(tracker: Arc) -> impl Filter,), Error = Infallible> + Clone { + warp::any() + .map(move || tracker.clone()) +} + +/// Check for infoHash +pub fn with_info_hash() -> impl Filter,), Error = Rejection> + Clone { + warp::filters::query::raw() + .and_then(info_hashes) +} + +/// Parse InfoHash from raw query string +async fn info_hashes(raw_query: String) -> WebResult> { + let split_raw_query: Vec<&str> = raw_query.split("&").collect(); + let mut info_hashes: Vec = Vec::new(); + + for v in split_raw_query { + if v.contains("info_hash") { + let raw_info_hash = v.split("=").collect::>()[1]; + let info_hash_bytes = percent_encoding::percent_decode_str(raw_info_hash).collect::>(); + let info_hash = InfoHash::from_str(&hex::encode(info_hash_bytes)); + if let Ok(ih) = info_hash { + info_hashes.push(ih); + } + } + } + + if info_hashes.len() > MAX_SCRAPE_TORRENTS as usize { + Err(reject::custom(ServerError::ExceededInfoHashLimit)) + } else if info_hashes.len() < 1 { + Err(reject::custom(ServerError::InvalidInfoHash)) + } else { + Ok(info_hashes) + } +} + +/// Pass Arc along +pub fn with_auth_key() -> impl Filter,), Error = Infallible> + Clone { + warp::path::param::() + .map(|key: String| { + AuthKey::from_string(&key) + }) + .or_else(|_| async { + Ok::<(Option,), Infallible>((None,)) + }) +} + +/// Check for AnnounceRequest +pub fn with_announce_request() -> impl Filter + Clone { + warp::filters::query::query::() + .and(with_info_hash()) + .and(warp::addr::remote()) + .and_then(announce_request) +} + +/// Parse AnnounceRequest from raw AnnounceRequestQuery, InfoHash and Option +async fn announce_request(announce_request_query: AnnounceRequestQuery, info_hashes: Vec, remote_addr: Option) -> WebResult { + if remote_addr.is_none() { return Err(reject::custom(ServerError::AddressNotFound)) } + + Ok(AnnounceRequest { + info_hash: info_hashes[0], + peer_addr: remote_addr.unwrap(), + downloaded: announce_request_query.downloaded, + uploaded: announce_request_query.uploaded, + peer_id: announce_request_query.peer_id, + port: announce_request_query.port, + left: announce_request_query.left, + event: announce_request_query.event, + compact: announce_request_query.compact + }) +} + +/// Check for ScrapeRequest +pub fn with_scrape_request() -> impl Filter + Clone { + warp::any() + .and(with_info_hash()) + .and_then(scrape_request) +} + +/// Parse ScrapeRequest from InfoHash +async fn scrape_request(info_hashes: Vec) -> WebResult { + Ok(ScrapeRequest { + info_hashes, + }) +} diff --git a/src/torrust_http_tracker/handlers.rs b/src/torrust_http_tracker/handlers.rs new file mode 100644 index 000000000..cb972d69a --- /dev/null +++ b/src/torrust_http_tracker/handlers.rs @@ -0,0 +1,116 @@ +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::Arc; +use log::debug; +use warp::{reject, Rejection, Reply}; +use warp::http::{Response, StatusCode}; +use crate::{InfoHash, TorrentError, TorrentPeer, TorrentStats, TorrentTracker}; +use crate::key_manager::AuthKey; +use crate::torrust_http_tracker::{AnnounceRequest, AnnounceResponse, ErrorResponse, Peer, ScrapeRequest, ScrapeResponse, ScrapeResponseEntry, ServerError, WebResult}; +use crate::utils::url_encode_bytes; + +/// Authenticate InfoHash using optional AuthKey +pub async fn authenticate(info_hash: &InfoHash, auth_key: &Option, tracker: Arc) -> Result<(), ServerError> { + match tracker.authenticate_request(info_hash, auth_key).await { + Ok(_) => Ok(()), + Err(e) => { + let err = match e { + TorrentError::TorrentNotWhitelisted => ServerError::TorrentNotWhitelisted, + TorrentError::PeerNotAuthenticated => ServerError::PeerNotAuthenticated, + TorrentError::PeerKeyNotValid => ServerError::PeerKeyNotValid, + TorrentError::NoPeersFound => ServerError::NoPeersFound, + TorrentError::CouldNotSendResponse => ServerError::InternalServerError, + TorrentError::InvalidInfoHash => ServerError::InvalidInfoHash, + }; + + Err(err) + } + } +} + +/// Handle announce request +pub async fn handle_announce(announce_request: AnnounceRequest, auth_key: Option, tracker: Arc,) -> WebResult { + if let Err(e) = authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await { + return Err(reject::custom(e)) + } + + let peer = TorrentPeer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip()); + let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer).await; + // get all peers excluding the client_addr + let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; + if peers.is_none() { return Err(reject::custom(ServerError::NoPeersFound)) } + + // success response + let announce_interval = tracker.config.http_tracker.announce_interval; + send_announce_response(&announce_request, torrent_stats, peers.unwrap(), announce_interval) +} + +/// Handle scrape request +pub async fn handle_scrape(scrape_request: ScrapeRequest, auth_key: Option, tracker: Arc,) -> WebResult { + let mut files: HashMap = HashMap::new(); + let db = tracker.get_torrents().await; + + for info_hash in scrape_request.info_hashes.iter() { + // authenticate every info_hash + if authenticate(info_hash, &auth_key, tracker.clone()).await.is_err() { continue } + + let scrape_entry = match db.get(&info_hash) { + Some(torrent_info) => { + let (seeders, completed, leechers) = torrent_info.get_stats(); + ScrapeResponseEntry { complete: seeders, downloaded: completed, incomplete: leechers } + } + None => { + ScrapeResponseEntry { complete: 0, downloaded: 0, incomplete: 0 } + } + }; + + if let Ok(encoded_info_hash) = url_encode_bytes(&info_hash.0) { + files.insert(encoded_info_hash, scrape_entry); + } + } + + send_scrape_response(files) +} + +/// Handle all server errors and send error reply +pub async fn handle_error(r: Rejection) -> std::result::Result { + if let Some(e) = r.find::() { + debug!("{:?}", e); + let reply = warp::reply::json(&ErrorResponse { failure_reason: e.to_string() }); + Ok(warp::reply::with_status(reply, StatusCode::BAD_REQUEST)) + } else { + let reply = warp::reply::json(&ErrorResponse { failure_reason: "internal server error".to_string() }); + Ok(warp::reply::with_status(reply, StatusCode::INTERNAL_SERVER_ERROR)) + } +} + +/// Send announce response +fn send_announce_response(announce_request: &AnnounceRequest, torrent_stats: TorrentStats, peers: Vec, interval: u32) -> WebResult { + let http_peers: Vec = peers.iter().map(|peer| Peer { + peer_id: String::from_utf8_lossy(&peer.peer_id.0).to_string(), + ip: peer.peer_addr.ip(), + port: peer.peer_addr.port() + }).collect(); + + let res = AnnounceResponse { + interval, + complete: torrent_stats.seeders, + incomplete: torrent_stats.leechers, + peers: http_peers + }; + + // check for compact response request + if let Some(1) = announce_request.compact { + match res.write_compact() { + Ok(body) => Ok(Response::new(body)), + Err(_) => Err(reject::custom(ServerError::InternalServerError)) + } + } else { + Ok(Response::new(res.write().into())) + } +} + +/// Send scrape response +fn send_scrape_response(files: HashMap) -> WebResult { + Ok(Response::new(ScrapeResponse { files }.write())) +} diff --git a/src/torrust_http_tracker/mod.rs b/src/torrust_http_tracker/mod.rs new file mode 100644 index 000000000..ea6675dce --- /dev/null +++ b/src/torrust_http_tracker/mod.rs @@ -0,0 +1,18 @@ +pub mod server; +pub mod request; +pub mod response; +pub mod errors; +pub mod routes; +pub mod handlers; +pub mod filters; + +pub use self::server::*; +pub use self::request::*; +pub use self::response::*; +pub use self::errors::*; +pub use self::routes::*; +pub use self::handlers::*; +pub use self::filters::*; + +pub type Bytes = u64; +pub type WebResult = std::result::Result; diff --git a/src/torrust_http_tracker/request.rs b/src/torrust_http_tracker/request.rs new file mode 100644 index 000000000..8d90ede1f --- /dev/null +++ b/src/torrust_http_tracker/request.rs @@ -0,0 +1,32 @@ +use std::net::SocketAddr; +use serde::{Deserialize}; +use crate::InfoHash; +use crate::torrust_http_tracker::Bytes; + +#[derive(Deserialize)] +pub struct AnnounceRequestQuery { + pub downloaded: Bytes, + pub uploaded: Bytes, + pub key: String, + pub peer_id: String, + pub port: u16, + pub left: Bytes, + pub event: Option, + pub compact: Option, +} + +pub struct AnnounceRequest { + pub info_hash: InfoHash, + pub peer_addr: SocketAddr, + pub downloaded: Bytes, + pub uploaded: Bytes, + pub peer_id: String, + pub port: u16, + pub left: Bytes, + pub event: Option, + pub compact: Option, +} + +pub struct ScrapeRequest { + pub info_hashes: Vec, +} diff --git a/src/torrust_http_tracker/response.rs b/src/torrust_http_tracker/response.rs new file mode 100644 index 000000000..df039a1c2 --- /dev/null +++ b/src/torrust_http_tracker/response.rs @@ -0,0 +1,87 @@ +use std::collections::HashMap; +use std::error::Error; +use std::io::Write; +use std::net::IpAddr; +use serde::{Serialize}; + +#[derive(Serialize)] +pub struct Peer { + pub peer_id: String, + pub ip: IpAddr, + pub port: u16, +} + +#[derive(Serialize)] +pub struct AnnounceResponse { + pub interval: u32, + //pub tracker_id: String, + pub complete: u32, + pub incomplete: u32, + pub peers: Vec +} + +impl AnnounceResponse { + pub fn write(&self) -> String { + serde_bencode::to_string(&self).unwrap() + } + + pub fn write_compact(&self) -> Result, Box> { + let mut peers_v4: Vec = Vec::new(); + let mut peers_v6: Vec = Vec::new(); + + for peer in &self.peers { + match peer.ip { + IpAddr::V4(ip) => { + peers_v4.write(&u32::from(ip).to_be_bytes())?; + peers_v4.write(&peer.port.to_be_bytes())?; + } + IpAddr::V6(ip) => { + peers_v6.write(&u128::from(ip).to_be_bytes())?; + peers_v6.write(&peer.port.to_be_bytes())?; + } + } + } + + let mut bytes: Vec = Vec::new(); + bytes.write(b"d8:intervali")?; + bytes.write(&self.interval.to_string().as_bytes())?; + bytes.write(b"e8:completei")?; + bytes.write(&self.complete.to_string().as_bytes())?; + bytes.write(b"e10:incompletei")?; + bytes.write(&self.incomplete.to_string().as_bytes())?; + bytes.write(b"e5:peers")?; + bytes.write(&peers_v4.len().to_string().as_bytes())?; + bytes.write(b":")?; + bytes.write(peers_v4.as_slice())?; + bytes.write(b"e6:peers6")?; + bytes.write(&peers_v6.len().to_string().as_bytes())?; + bytes.write(b":")?; + bytes.write(peers_v6.as_slice())?; + bytes.write(b"e")?; + + Ok(bytes) + } +} + +#[derive(Serialize)] +pub struct ScrapeResponseEntry { + pub complete: u32, + pub downloaded: u32, + pub incomplete: u32, +} + +#[derive(Serialize)] +pub struct ScrapeResponse { + pub files: HashMap +} + +impl ScrapeResponse { + pub fn write(&self) -> String { + serde_bencode::to_string(&self).unwrap() + } +} + +#[derive(Serialize)] +pub struct ErrorResponse { + pub failure_reason: String +} diff --git a/src/torrust_http_tracker/routes.rs b/src/torrust_http_tracker/routes.rs new file mode 100644 index 000000000..ad873e83e --- /dev/null +++ b/src/torrust_http_tracker/routes.rs @@ -0,0 +1,43 @@ +use std::convert::Infallible; +use std::sync::Arc; +use warp::{Filter, Rejection}; +use crate::TorrentTracker; +use crate::torrust_http_tracker::{handle_announce, handle_error, handle_scrape, with_announce_request, with_auth_key, with_scrape_request, with_tracker}; + +/// All routes +pub fn routes(tracker: Arc,) -> impl Filter + Clone { + root(tracker.clone()) + .or(announce(tracker.clone())) + .or(scrape(tracker.clone())) + .recover(handle_error) +} + +/// GET / or / +fn root(tracker: Arc,) -> impl Filter + Clone { + warp::any() + .and(warp::filters::method::get()) + .and(with_announce_request()) + .and(with_auth_key()) + .and(with_tracker(tracker)) + .and_then(handle_announce) +} + +/// GET /announce or /announce/ +fn announce(tracker: Arc,) -> impl Filter + Clone { + warp::path::path("announce") + .and(warp::filters::method::get()) + .and(with_announce_request()) + .and(with_auth_key()) + .and(with_tracker(tracker)) + .and_then(handle_announce) +} + +/// GET /scrape/ +fn scrape(tracker: Arc,) -> impl Filter + Clone { + warp::path::path("scrape") + .and(warp::filters::method::get()) + .and(with_scrape_request()) + .and(with_auth_key()) + .and(with_tracker(tracker)) + .and_then(handle_scrape) +} diff --git a/src/torrust_http_tracker/server.rs b/src/torrust_http_tracker/server.rs new file mode 100644 index 000000000..90f8a84d0 --- /dev/null +++ b/src/torrust_http_tracker/server.rs @@ -0,0 +1,33 @@ +use std::net::SocketAddr; +use std::sync::Arc; +use crate::TorrentTracker; +use crate::torrust_http_tracker::routes; + +/// Server that listens on HTTP, needs a TorrentTracker +#[derive(Clone)] +pub struct HttpServer { + tracker: Arc, +} + +impl HttpServer { + pub fn new(tracker: Arc) -> HttpServer { + HttpServer { + tracker + } + } + + /// Start the HttpServer + pub async fn start(&self, socket_addr: SocketAddr) { + warp::serve(routes(self.tracker.clone())) + .run(socket_addr).await; + } + + /// Start the HttpServer in TLS mode + pub async fn start_tls(&self, socket_addr: SocketAddr, ssl_cert_path: &str, ssl_key_path: &str) { + warp::serve(routes(self.tracker.clone())) + .tls() + .cert_path(ssl_cert_path) + .key_path(ssl_key_path) + .run(socket_addr).await; + } +} diff --git a/src/torrust_udp_tracker/errors.rs b/src/torrust_udp_tracker/errors.rs new file mode 100644 index 000000000..fb29e969e --- /dev/null +++ b/src/torrust_udp_tracker/errors.rs @@ -0,0 +1,31 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ServerError { + #[error("internal server error")] + InternalServerError, + + #[error("info_hash is either missing or invalid")] + InvalidInfoHash, + + #[error("could not find remote address")] + AddressNotFound, + + #[error("torrent has no peers")] + NoPeersFound, + + #[error("torrent not on whitelist")] + TorrentNotWhitelisted, + + #[error("peer not authenticated")] + PeerNotAuthenticated, + + #[error("invalid authentication key")] + PeerKeyNotValid, + + #[error("exceeded info_hash limit")] + ExceededInfoHashLimit, + + #[error("bad request")] + BadRequest, +} diff --git a/src/torrust_udp_tracker/handlers.rs b/src/torrust_udp_tracker/handlers.rs new file mode 100644 index 000000000..8549580d7 --- /dev/null +++ b/src/torrust_udp_tracker/handlers.rs @@ -0,0 +1,145 @@ +use std::net::SocketAddr; +use std::sync::Arc; +use aquatic_udp_protocol::{AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId}; +use crate::{InfoHash, MAX_SCRAPE_TORRENTS, TorrentError, TorrentPeer, TorrentTracker}; +use crate::torrust_udp_tracker::errors::ServerError; +use crate::torrust_udp_tracker::request::AnnounceRequestWrapper; +use crate::utils::get_connection_id; + +pub async fn authenticate(info_hash: &InfoHash, tracker: Arc) -> Result<(), ServerError> { + match tracker.authenticate_request(info_hash, &None).await { + Ok(_) => Ok(()), + Err(e) => { + let err = match e { + TorrentError::TorrentNotWhitelisted => ServerError::TorrentNotWhitelisted, + TorrentError::PeerNotAuthenticated => ServerError::PeerNotAuthenticated, + TorrentError::PeerKeyNotValid => ServerError::PeerKeyNotValid, + TorrentError::NoPeersFound => ServerError::NoPeersFound, + TorrentError::CouldNotSendResponse => ServerError::InternalServerError, + TorrentError::InvalidInfoHash => ServerError::InvalidInfoHash, + }; + + Err(err) + } + } +} + +pub async fn handle_packet(remote_addr: SocketAddr, payload: &[u8], tracker: Arc) -> Response { + match Request::from_bytes(&payload[..payload.len()], MAX_SCRAPE_TORRENTS).map_err(|_| ServerError::InternalServerError) { + Ok(request) => { + let transaction_id = match &request { + Request::Connect(connect_request) => { + connect_request.transaction_id + } + Request::Announce(announce_request) => { + announce_request.transaction_id + } + Request::Scrape(scrape_request) => { + scrape_request.transaction_id + } + }; + + match handle_request(request, remote_addr, tracker).await { + Ok(response) => response, + Err(e) => handle_error(e, transaction_id) + } + } + // bad request + Err(_) => handle_error(ServerError::BadRequest, TransactionId(0)) + } +} + +pub async fn handle_request(request: Request, remote_addr: SocketAddr, tracker: Arc) -> Result { + match request { + Request::Connect(connect_request) => { + handle_connect(remote_addr, &connect_request).await + } + Request::Announce(announce_request) => { + handle_announce(remote_addr, &announce_request, tracker).await + } + Request::Scrape(scrape_request) => { + handle_scrape(&scrape_request, tracker).await + } + } +} + +pub async fn handle_connect(remote_addr: SocketAddr, request: &ConnectRequest) -> Result { + let connection_id = get_connection_id(&remote_addr); + + let response = Response::from(ConnectResponse { + transaction_id: request.transaction_id, + connection_id, + }); + + Ok(response) +} + +pub async fn handle_announce(remote_addr: SocketAddr, announce_request: &AnnounceRequest, tracker: Arc) -> Result { + let wrapped_announce_request = AnnounceRequestWrapper::new(announce_request.clone()); + + authenticate(&wrapped_announce_request.info_hash, tracker.clone()).await?; + + let peer = TorrentPeer::from_udp_announce_request(&wrapped_announce_request.announce_request, remote_addr, tracker.config.get_ext_ip()); + + let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer).await; + // get all peers excluding the client_addr + let peers = match tracker.get_torrent_peers(&wrapped_announce_request.info_hash, &peer.peer_addr).await { + Some(v) => v, + None => { return Err(ServerError::NoPeersFound); } + }; + + Ok(Response::from(AnnounceResponse { + transaction_id: wrapped_announce_request.announce_request.transaction_id, + announce_interval: AnnounceInterval(tracker.config.udp_tracker.announce_interval as i32), + leechers: NumberOfPeers(torrent_stats.leechers as i32), + seeders: NumberOfPeers(torrent_stats.seeders as i32), + peers: peers.iter().map(|peer| + ResponsePeer { + ip_address: peer.peer_addr.ip(), + port: Port(peer.peer_addr.port()) + }).collect() + })) +} + +pub async fn handle_scrape(request: &ScrapeRequest, tracker: Arc) -> Result { + let db = tracker.get_torrents().await; + + let mut torrent_stats: Vec = Vec::new(); + + for info_hash in request.info_hashes.iter() { + let info_hash = InfoHash(info_hash.0); + + if authenticate(&info_hash, tracker.clone()).await.is_err() { continue } + + let scrape_entry = match db.get(&info_hash) { + Some(torrent_info) => { + let (seeders, completed, leechers) = torrent_info.get_stats(); + + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders as i32), + completed: NumberOfDownloads(completed as i32), + leechers: NumberOfPeers(leechers as i32), + } + } + None => { + TorrentScrapeStatistics { + seeders: NumberOfPeers(0), + completed: NumberOfDownloads(0), + leechers: NumberOfPeers(0), + } + } + }; + + torrent_stats.push(scrape_entry); + } + + Ok(Response::from(ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats + })) +} + +fn handle_error(e: ServerError, transaction_id: TransactionId) -> Response { + let message = e.to_string(); + Response::from(ErrorResponse { transaction_id, message }) +} diff --git a/src/torrust_udp_tracker/mod.rs b/src/torrust_udp_tracker/mod.rs new file mode 100644 index 000000000..cd4b99f5b --- /dev/null +++ b/src/torrust_udp_tracker/mod.rs @@ -0,0 +1,16 @@ +pub mod errors; +pub mod request; +pub mod server; +pub mod handlers; + +pub use self::errors::*; +pub use self::request::*; +pub use self::server::*; +pub use self::handlers::*; + +pub type Bytes = u64; +pub type Port = u16; +pub type TransactionId = i64; + +pub const MAX_PACKET_SIZE: usize = 0xffff; +pub const PROTOCOL_ID: i64 = 0x41727101980; diff --git a/src/torrust_udp_tracker/request.rs b/src/torrust_udp_tracker/request.rs new file mode 100644 index 000000000..f3f67fdc1 --- /dev/null +++ b/src/torrust_udp_tracker/request.rs @@ -0,0 +1,31 @@ +use aquatic_udp_protocol::{AnnounceRequest}; +use crate::{InfoHash}; + +// struct AnnounceRequest { +// pub connection_id: i64, +// pub transaction_id: i32, +// pub info_hash: InfoHash, +// pub peer_id: PeerId, +// pub bytes_downloaded: Bytes, +// pub bytes_uploaded: Bytes, +// pub bytes_left: Bytes, +// pub event: AnnounceEvent, +// pub ip_address: Option, +// pub key: u32, +// pub peers_wanted: u32, +// pub port: Port +// } + +pub struct AnnounceRequestWrapper { + pub announce_request: AnnounceRequest, + pub info_hash: InfoHash, +} + +impl AnnounceRequestWrapper { + pub fn new(announce_request: AnnounceRequest) -> Self { + AnnounceRequestWrapper { + announce_request: announce_request.clone(), + info_hash: InfoHash(announce_request.info_hash.0) + } + } +} diff --git a/src/torrust_udp_tracker/server.rs b/src/torrust_udp_tracker/server.rs new file mode 100644 index 000000000..0da4ce140 --- /dev/null +++ b/src/torrust_udp_tracker/server.rs @@ -0,0 +1,64 @@ +use std::io::Cursor; +use std::net::{SocketAddr}; +use std::sync::Arc; +use aquatic_udp_protocol::{IpVersion, Response}; +use log::debug; +use tokio::net::UdpSocket; +use crate::TorrentTracker; +use crate::torrust_udp_tracker::{handle_packet, MAX_PACKET_SIZE}; + +pub struct UdpServer { + socket: UdpSocket, + tracker: Arc, +} + +impl UdpServer { + pub async fn new(tracker: Arc) -> Result { + let srv = UdpSocket::bind(&tracker.config.udp_tracker.bind_address).await?; + + Ok(UdpServer { + socket: srv, + tracker, + }) + } + + pub async fn start(&self) { + loop { + let mut data = [0; MAX_PACKET_SIZE]; + if let Ok((valid_bytes, remote_addr)) = self.socket.recv_from(&mut data).await { + let data = &data[..valid_bytes]; + debug!("Received {} bytes from {}", data.len(), remote_addr); + let response = handle_packet(remote_addr, data, self.tracker.clone()).await; + self.send_response(remote_addr, response).await; + } + } + } + + async fn send_response(&self, remote_addr: SocketAddr, response: Response) { + debug!("sending response to: {:?}", &remote_addr); + + let buffer = vec![0u8; MAX_PACKET_SIZE]; + let mut cursor = Cursor::new(buffer); + + let ip_version = match remote_addr { + SocketAddr::V4(_) => IpVersion::IPv4, + SocketAddr::V6(_) => IpVersion::IPv6 + }; + + match response.write(&mut cursor, ip_version) { + Ok(_) => { + let position = cursor.position() as usize; + let inner = cursor.get_ref(); + + debug!("{:?}", &inner[..position]); + self.send_packet(&remote_addr, &inner[..position]).await; + } + Err(_) => { debug!("could not write response to bytes."); } + } + } + + async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) { + // doesn't matter if it reaches or not + let _ = self.socket.send_to(payload, remote_addr).await; + } +} diff --git a/src/tracker.rs b/src/tracker.rs index 86a2d2e7e..b8b4ac823 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -3,16 +3,17 @@ use serde; use std::borrow::Cow; use std::collections::BTreeMap; use tokio::sync::RwLock; -use crate::common::{InfoHash, NumberOfBytesDef, AnnounceEventDef, PeerId}; -use std::net::{SocketAddr, IpAddr}; -use crate::{Configuration, http_server, key_manager, MAX_SCRAPE_TORRENTS}; +use crate::common::{AnnounceEventDef, InfoHash, NumberOfBytesDef, PeerId}; +use std::net::{IpAddr, SocketAddr}; +use crate::{Configuration, key_manager, MAX_SCRAPE_TORRENTS}; use std::collections::btree_map::Entry; use crate::database::SqliteDatabase; use std::sync::Arc; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; use log::debug; -use crate::key_manager::{AuthKey}; +use crate::key_manager::AuthKey; use r2d2_sqlite::rusqlite; +use crate::torrust_http_tracker::AnnounceRequest; const TWO_HOURS: std::time::Duration = std::time::Duration::from_secs(3600 * 2); const FIVE_MINUTES: std::time::Duration = std::time::Duration::from_secs(300); @@ -77,7 +78,7 @@ impl TorrentPeer { } } - pub fn from_http_announce_request(announce_request: &http_server::AnnounceRequest, remote_addr: SocketAddr, peer_addr: Option) -> Self { + pub fn from_http_announce_request(announce_request: &AnnounceRequest, remote_addr: SocketAddr, peer_addr: Option) -> Self { // Potentially substitute localhost IP with external IP let peer_addr = match peer_addr { None => SocketAddr::new(IpAddr::from(remote_addr.ip()), announce_request.port), @@ -245,7 +246,8 @@ pub enum TorrentError { PeerNotAuthenticated, PeerKeyNotValid, NoPeersFound, - CouldNotSendResponse + CouldNotSendResponse, + InvalidInfoHash, } pub struct TorrentTracker { @@ -267,6 +269,18 @@ impl TorrentTracker { } } + fn is_public(&self) -> bool { + self.config.mode == TrackerMode::PublicMode + } + + fn is_private(&self) -> bool { + self.config.mode == TrackerMode::PrivateMode || self.config.mode == TrackerMode::PrivateListedMode + } + + fn is_whitelisted(&self) -> bool { + self.config.mode == TrackerMode::ListedMode || self.config.mode == TrackerMode::PrivateListedMode + } + pub async fn generate_auth_key(&self, seconds_valid: u64) -> Result { let auth_key = key_manager::generate_auth_key(seconds_valid); @@ -285,49 +299,32 @@ impl TorrentTracker { key_manager::verify_auth_key(&db_key) } - pub async fn authenticate_request(&self, info_hash: &InfoHash, key: Option) -> Result<(), TorrentError> { - match self.config.mode { - TrackerMode::PublicMode => Ok(()), - TrackerMode::ListedMode => { - if !self.is_info_hash_whitelisted(info_hash).await { - return Err(TorrentError::TorrentNotWhitelisted) - } + pub async fn authenticate_request(&self, info_hash: &InfoHash, key: &Option) -> Result<(), TorrentError> { + // no authentication needed in public mode + if self.is_public() { return Ok(()) } - Ok(()) - } - TrackerMode::PrivateMode => { - match key { - Some(key) => { - if self.verify_auth_key(&key).await.is_err() { - return Err(TorrentError::PeerKeyNotValid) - } - - Ok(()) - } - None => { - return Err(TorrentError::PeerNotAuthenticated) + // check if auth_key is set and valid + if self.is_private() { + match key { + Some(key) => { + if self.verify_auth_key(key).await.is_err() { + return Err(TorrentError::PeerKeyNotValid) } } + None => { + return Err(TorrentError::PeerNotAuthenticated) + } } - TrackerMode::PrivateListedMode => { - match key { - Some(key) => { - if self.verify_auth_key(&key).await.is_err() { - return Err(TorrentError::PeerKeyNotValid) - } - - if !self.is_info_hash_whitelisted(info_hash).await { - return Err(TorrentError::TorrentNotWhitelisted) - } + } - Ok(()) - } - None => { - return Err(TorrentError::PeerNotAuthenticated) - } - } + // check if info_hash is whitelisted + if self.is_whitelisted() { + if self.is_info_hash_whitelisted(info_hash).await == false { + return Err(TorrentError::TorrentNotWhitelisted) } } + + Ok(()) } // Adding torrents is not relevant to public trackers. @@ -351,7 +348,7 @@ impl TorrentTracker { pub async fn get_torrent_peers( &self, info_hash: &InfoHash, - peer_addr: &std::net::SocketAddr + peer_addr: &SocketAddr ) -> Option> { let read_lock = self.torrents.read().await; match read_lock.get(info_hash) { @@ -364,31 +361,26 @@ impl TorrentTracker { } } - pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &TorrentPeer) -> Result { + pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &TorrentPeer) -> TorrentStats { let mut torrents = self.torrents.write().await; let torrent_entry = match torrents.entry(info_hash.clone()) { Entry::Vacant(vacant) => { - Ok(vacant.insert(TorrentEntry::new())) + vacant.insert(TorrentEntry::new()) } Entry::Occupied(entry) => { - Ok(entry.into_mut()) + entry.into_mut() } }; - match torrent_entry { - Ok(torrent_entry) => { - torrent_entry.update_peer(peer); + torrent_entry.update_peer(peer); - let (seeders, completed, leechers) = torrent_entry.get_stats(); + let (seeders, completed, leechers) = torrent_entry.get_stats(); - Ok(TorrentStats { - seeders, - leechers, - completed, - }) - } - Err(e) => Err(e) + TorrentStats { + seeders, + leechers, + completed, } } diff --git a/src/udp_server.rs b/src/udp_server.rs deleted file mode 100644 index cf0474f7c..000000000 --- a/src/udp_server.rs +++ /dev/null @@ -1,257 +0,0 @@ -use log::{debug}; -use std; -use std::net::{SocketAddr}; -use std::sync::Arc; -use std::io::{Cursor}; -use aquatic_udp_protocol::{AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, IpVersion, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId}; -use tokio::net::UdpSocket; - -use super::common::*; -use crate::utils::get_connection_id; -use crate::tracker::TorrentTracker; -use crate::{TorrentPeer, TorrentError}; - -struct RequestError { - error: TorrentError, - transaction_id: TransactionId -} - -struct AnnounceRequestWrapper { - announce_request: AnnounceRequest, - info_hash: super::common::InfoHash, -} - -impl AnnounceRequestWrapper { - pub fn new(announce_request: AnnounceRequest) -> Self { - AnnounceRequestWrapper { - announce_request: announce_request.clone(), - info_hash: InfoHash(announce_request.info_hash.0) - } - } -} - -pub struct UdpServer { - socket: UdpSocket, - tracker: Arc, -} - -impl UdpServer { - pub async fn new(tracker: Arc) -> Result { - let srv = UdpSocket::bind(&tracker.config.udp_tracker.bind_address).await?; - - Ok(UdpServer { - socket: srv, - tracker, - }) - } - - pub async fn accept_packets(self) -> Result<(), std::io::Error> { - let tracker = Arc::new(self); - - loop { - let mut packet = vec![0u8; MAX_PACKET_SIZE]; - let (size, remote_address) = tracker.socket.recv_from(packet.as_mut_slice()).await?; - - let tracker = tracker.clone(); - tokio::spawn(async move { - debug!("Received {} bytes from {}", size, remote_address); - tracker.handle_packet(remote_address, &packet[..size]).await; - }); - } - } - - async fn handle_packet(&self, remote_addr: SocketAddr, payload: &[u8]) { - let request = Request::from_bytes(&payload[..payload.len()], MAX_SCRAPE_TORRENTS); - - match request { - Ok(request) => { - debug!("New request: {:?}", request); - self.handle_request(request, remote_addr).await; - } - Err(err) => { - debug!("request_from_bytes error: {:?}", err); - } - } - } - - async fn handle_request(&self, request: Request, remote_addr: SocketAddr) { - // todo: check for expired connection_id - let request_result = match request { - Request::Connect(connect_request) => { - self.handle_connect(remote_addr, &connect_request).await - .map_err(|error| RequestError { error, transaction_id: connect_request.transaction_id }) - } - Request::Announce(announce_request) => { - self.handle_announce(remote_addr, &announce_request).await - .map_err(|error| RequestError { error, transaction_id: announce_request.transaction_id }) - } - Request::Scrape(scrape_request) => { - self.handle_scrape(&scrape_request).await - .map_err(|error| RequestError { error, transaction_id: scrape_request.transaction_id }) - } - }; - - match request_result { - Ok(response) => { - let _ = self.send_response(remote_addr, response).await; - } - Err(request_error) => { - let _ = self.handle_error(request_error.error, remote_addr, request_error.transaction_id).await; - } - } - } - - async fn handle_connect(&self, remote_addr: SocketAddr, request: &ConnectRequest) -> Result { - let connection_id = get_connection_id(&remote_addr); - - let response = Response::from(ConnectResponse { - transaction_id: request.transaction_id, - connection_id, - }); - - Ok(response) - } - - async fn handle_announce(&self, remote_addr: SocketAddr, announce_request: &AnnounceRequest) -> Result { - let wrapped_announce_request = AnnounceRequestWrapper::new(announce_request.clone()); - self.tracker.authenticate_request(&wrapped_announce_request.info_hash, None).await?; - - let peer = TorrentPeer::from_udp_announce_request(&wrapped_announce_request.announce_request, remote_addr, self.tracker.config.get_ext_ip()); - - return match self.tracker.update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer).await { - Ok(torrent_stats) => { - // get all peers excluding the client_addr - let peers = match self.tracker.get_torrent_peers(&wrapped_announce_request.info_hash, &peer.peer_addr).await { - Some(v) => v, - None => { - return Err(TorrentError::NoPeersFound); - } - }; - - let response = Response::from(AnnounceResponse { - transaction_id: wrapped_announce_request.announce_request.transaction_id, - announce_interval: AnnounceInterval(self.tracker.config.udp_tracker.announce_interval as i32), - leechers: NumberOfPeers(torrent_stats.leechers as i32), - seeders: NumberOfPeers(torrent_stats.seeders as i32), - peers: peers.iter().map(|peer| - ResponsePeer { - ip_address: peer.peer_addr.ip(), - port: Port(peer.peer_addr.port()) - }).collect() - }); - - Ok(response) - } - Err(e) => Err(e) - } - } - - async fn handle_scrape(&self, request: &ScrapeRequest) -> Result { - let db = self.tracker.get_torrents().await; - - let mut torrent_stats: Vec = Vec::new(); - - for info_hash in request.info_hashes.iter() { - let info_hash = InfoHash(info_hash.0); - let scrape_entry = match db.get(&info_hash) { - Some(torrent_info) => { - let (seeders, completed, leechers) = torrent_info.get_stats(); - - TorrentScrapeStatistics { - seeders: NumberOfPeers(seeders as i32), - completed: NumberOfDownloads(completed as i32), - leechers: NumberOfPeers(leechers as i32), - } - } - None => { - TorrentScrapeStatistics { - seeders: NumberOfPeers(0), - completed: NumberOfDownloads(0), - leechers: NumberOfPeers(0), - } - } - }; - - torrent_stats.push(scrape_entry); - } - - let response = Response::from(ScrapeResponse { - transaction_id: request.transaction_id, - torrent_stats - }); - - Ok(response) - } - - async fn handle_error(&self, e: TorrentError, remote_addr: SocketAddr, tx_id: TransactionId) { - let mut err_msg = "oops"; - - match e { - TorrentError::TorrentNotWhitelisted => { - debug!("Info_hash not whitelisted."); - err_msg = "info hash not whitelisted"; - } - TorrentError::PeerKeyNotValid => { - debug!("Peer key not valid."); - err_msg = "peer key not valid"; - } - TorrentError::PeerNotAuthenticated => { - debug!("Peer not authenticated."); - err_msg = "peer not authenticated"; - } - TorrentError::NoPeersFound => { - debug!("No peers found."); - err_msg = "no peers found"; - } - _ => {} - } - - self.send_error(remote_addr, tx_id, err_msg).await; - } - - async fn send_response(&self, remote_addr: SocketAddr, response: Response) -> Result { - debug!("sending response to: {:?}", &remote_addr); - - let buffer = vec![0u8; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(buffer); - - match response.write(&mut cursor, IpVersion::IPv4) { - Ok(_) => { - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - debug!("{:?}", &inner[..position]); - match self.send_packet(&remote_addr, &inner[..position]).await { - Ok(byte_size) => Ok(byte_size), - Err(e) => { - debug!("{:?}", e); - Err(()) - } - } - } - Err(_) => { - debug!("could not write response to bytes."); - Err(()) - } - } - } - - async fn send_error(&self, remote_addr: SocketAddr, transaction_id: TransactionId, error_msg: &str) { - let response = Response::from(ErrorResponse { - transaction_id, - message: error_msg.to_string(), - }); - - let _ = self.send_response(remote_addr, response).await; - } - - async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) -> Result { - match self.socket.send_to(payload, remote_addr).await { - Err(err) => { - debug!("failed to send a packet: {}", err); - Err(err) - }, - Ok(sz) => Ok(sz), - } - } -}