From 4b8fbfbbc9dc33e72c7e592dbd74d3f1f206e36d Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 10 Feb 2023 14:23:17 +0000 Subject: [PATCH 1/4] refactor: the tracker is responsible for assigning the IP to peers --- src/http/warp_implementation/handlers.rs | 10 +- src/tracker/mod.rs | 113 +++++++++++++++++- src/tracker/peer.rs | 144 ++++------------------- src/udp/handlers.rs | 14 +-- 4 files changed, 151 insertions(+), 130 deletions(-) diff --git a/src/http/warp_implementation/handlers.rs b/src/http/warp_implementation/handlers.rs index 229cb4587..0fd332cae 100644 --- a/src/http/warp_implementation/handlers.rs +++ b/src/http/warp_implementation/handlers.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::convert::Infallible; -use std::net::IpAddr; +use std::net::{IpAddr, SocketAddr}; use std::panic::Location; use std::sync::Arc; @@ -41,11 +41,15 @@ pub async fn handle_announce( auth_key: Option, tracker: Arc, ) -> WebResult { + debug!("http announce request: {:#?}", announce_request); + authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await?; - debug!("{:?}", announce_request); + // build the peer + let peer_ip = tracker.assign_ip_address_to_peer(&announce_request.peer_addr); + let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port); + let peer = peer::Peer::from_http_announce_request(&announce_request, &peer_socket_address); - let peer = peer::Peer::from_http_announce_request(&announce_request, announce_request.peer_addr, tracker.config.get_ext_ip()); let torrent_stats = tracker .update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer) .await; diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index acbf7d536..f31a71fbb 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -8,7 +8,7 @@ pub mod torrent; use std::collections::btree_map::Entry; use std::collections::BTreeMap; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::panic::Location; use std::sync::Arc; use std::time::Duration; @@ -76,6 +76,12 @@ impl Tracker { self.mode == mode::Mode::Listed || self.mode == mode::Mode::PrivateListed } + /// It assigns a socket address to the peer + #[must_use] + pub fn assign_ip_address_to_peer(&self, remote_client_ip: &IpAddr) -> IpAddr { + assign_ip_address_to_peer(remote_client_ip, self.config.get_ext_ip()) + } + /// # Errors /// /// Will return a `database::Error` if unable to add the `auth_key` to the database. @@ -378,6 +384,15 @@ impl Tracker { } } +#[must_use] +pub fn assign_ip_address_to_peer(remote_client_ip: &IpAddr, tracker_external_ip: Option) -> IpAddr { + if let Some(host_ip) = tracker_external_ip.filter(|_| remote_client_ip.is_loopback()) { + host_ip + } else { + *remote_client_ip + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -424,4 +439,100 @@ mod tests { } ); } + + mod the_tracker_assigning_the_ip_to_the_peer { + + use std::net::{IpAddr, Ipv4Addr}; + + use crate::tracker::assign_ip_address_to_peer; + + #[test] + fn should_use_the_source_ip_instead_of_the_ip_in_the_announce_request() { + let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + + assert_eq!(peer_ip, remote_ip); + } + + mod when_the_client_ip_is_a_ipv4_loopback_ip { + + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use std::str::FromStr; + + use crate::tracker::assign_ip_address_to_peer; + + #[test] + fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() { + let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + + assert_eq!(peer_ip, remote_ip); + } + + #[test] + fn it_should_use_the_external_tracker_ip_in_tracker_configuration_if_it_is_defined() { + let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + + let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + + assert_eq!(peer_ip, tracker_external_ip); + } + + #[test] + fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv6_ip() + { + let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + + let tracker_external_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap()); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + + assert_eq!(peer_ip, tracker_external_ip); + } + } + + mod when_client_ip_is_a_ipv6_loopback_ip { + + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use std::str::FromStr; + + use crate::tracker::assign_ip_address_to_peer; + + #[test] + fn it_should_use_the_loopback_ip_if_the_tracker_does_not_have_the_external_ip_configuration() { + let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + + assert_eq!(peer_ip, remote_ip); + } + + #[test] + fn it_should_use_the_external_ip_in_tracker_configuration_if_it_is_defined() { + let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); + + let tracker_external_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap()); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + + assert_eq!(peer_ip, tracker_external_ip); + } + + #[test] + fn it_should_use_the_external_ip_in_the_tracker_configuration_if_it_is_defined_even_if_the_external_ip_is_an_ipv4_ip() + { + let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); + + let tracker_external_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()); + + let peer_ip = assign_ip_address_to_peer(&remote_ip, Some(tracker_external_ip)); + + assert_eq!(peer_ip, tracker_external_ip); + } + } + } } diff --git a/src/tracker/peer.rs b/src/tracker/peer.rs index 04e4cdb45..e824a0cbc 100644 --- a/src/tracker/peer.rs +++ b/src/tracker/peer.rs @@ -1,4 +1,4 @@ -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::panic::Location; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; @@ -29,16 +29,10 @@ pub struct Peer { impl Peer { #[must_use] - pub fn from_udp_announce_request( - announce_request: &aquatic_udp_protocol::AnnounceRequest, - remote_ip: IpAddr, - host_opt_ip: Option, - ) -> Self { - let peer_addr = Peer::peer_addr_from_ip_and_port_and_opt_host_ip(remote_ip, host_opt_ip, announce_request.port.0); - + pub fn from_udp_announce_request(announce_request: &aquatic_udp_protocol::AnnounceRequest, peer_addr: &SocketAddr) -> Self { Peer { peer_id: Id(announce_request.peer_id.0), - peer_addr, + peer_addr: *peer_addr, updated: Current::now(), uploaded: announce_request.bytes_uploaded, downloaded: announce_request.bytes_downloaded, @@ -48,9 +42,7 @@ impl Peer { } #[must_use] - pub fn from_http_announce_request(announce_request: &Announce, remote_ip: IpAddr, host_opt_ip: Option) -> Self { - let peer_addr = Peer::peer_addr_from_ip_and_port_and_opt_host_ip(remote_ip, host_opt_ip, announce_request.port); - + pub fn from_http_announce_request(announce_request: &Announce, peer_addr: &SocketAddr) -> Self { let event: AnnounceEvent = if let Some(event) = &announce_request.event { match event.as_ref() { "started" => AnnounceEvent::Started, @@ -65,7 +57,7 @@ impl Peer { #[allow(clippy::cast_possible_truncation)] Peer { peer_id: announce_request.peer_id, - peer_addr, + peer_addr: *peer_addr, updated: Current::now(), uploaded: NumberOfBytes(i128::from(announce_request.uploaded) as i64), downloaded: NumberOfBytes(i128::from(announce_request.downloaded) as i64), @@ -74,16 +66,6 @@ impl Peer { } } - // potentially substitute localhost ip with external ip - #[must_use] - pub fn peer_addr_from_ip_and_port_and_opt_host_ip(remote_ip: IpAddr, host_opt_ip: Option, port: u16) -> SocketAddr { - if let Some(host_ip) = host_opt_ip.filter(|_| remote_ip.is_loopback()) { - SocketAddr::new(host_ip, port) - } else { - SocketAddr::new(remote_ip, port) - } - } - #[must_use] pub fn is_seeder(&self) -> bool { self.left.0 <= 0 && self.event != AnnounceEvent::Stopped @@ -446,6 +428,7 @@ mod test { AnnounceEvent, AnnounceRequest, NumberOfBytes, NumberOfPeers, PeerId as AquaticPeerId, PeerKey, Port, TransactionId, }; + use crate::tracker::assign_ip_address_to_peer; use crate::tracker::peer::Peer; use crate::udp::connection_cookie::{into_connection_id, make}; @@ -498,7 +481,10 @@ mod test { let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); let announce_request = AnnounceRequestBuilder::default().into(); - let torrent_peer = Peer::from_udp_announce_request(&announce_request, remote_ip, None); + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port.0); + + let torrent_peer = Peer::from_udp_announce_request(&announce_request, &peer_socket_address); assert_eq!(torrent_peer.peer_addr, SocketAddr::new(remote_ip, announce_request.port.0)); } @@ -508,99 +494,21 @@ mod test { let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); let announce_request = AnnounceRequestBuilder::default().into(); - let torrent_peer = Peer::from_udp_announce_request(&announce_request, remote_ip, None); - - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(remote_ip, announce_request.port.0)); - } - - mod when_source_udp_ip_is_a_ipv_4_loopback_ip { - - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use std::str::FromStr; - - use crate::tracker::peer::test::torrent_peer_constructor_from_udp_requests::AnnounceRequestBuilder; - use crate::tracker::peer::Peer; - - #[test] - fn it_should_use_the_loopback_ip_if_the_server_does_not_have_the_external_ip_configuration() { - let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); - let announce_request = AnnounceRequestBuilder::default().into(); - - let torrent_peer = Peer::from_udp_announce_request(&announce_request, remote_ip, None); - - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(remote_ip, announce_request.port.0)); - } - - #[test] - fn it_should_use_the_external_host_ip_in_tracker_configuration_if_defined() { - let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); - let announce_request = AnnounceRequestBuilder::default().into(); - - let host_opt_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()); - let torrent_peer = Peer::from_udp_announce_request(&announce_request, remote_ip, Some(host_opt_ip)); - - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(host_opt_ip, announce_request.port.0)); - } - - #[test] - fn it_should_use_the_external_ip_in_tracker_configuration_if_defined_even_if_the_external_ip_is_an_ipv6_ip() { - let remote_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); - let announce_request = AnnounceRequestBuilder::default().into(); + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port.0); - let host_opt_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap()); - let torrent_peer = Peer::from_udp_announce_request(&announce_request, remote_ip, Some(host_opt_ip)); + let torrent_peer = Peer::from_udp_announce_request(&announce_request, &peer_socket_address); - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(host_opt_ip, announce_request.port.0)); - } - } - - mod when_source_udp_ip_is_a_ipv6_loopback_ip { - - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; - use std::str::FromStr; - - use crate::tracker::peer::test::torrent_peer_constructor_from_udp_requests::AnnounceRequestBuilder; - use crate::tracker::peer::Peer; - - #[test] - fn it_should_use_the_loopback_ip_if_the_server_does_not_have_the_external_ip_configuration() { - let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); - let announce_request = AnnounceRequestBuilder::default().into(); - - let torrent_peer = Peer::from_udp_announce_request(&announce_request, remote_ip, None); - - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(remote_ip, announce_request.port.0)); - } - - #[test] - fn it_should_use_the_external_host_ip_in_tracker_configuration_if_defined() { - let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); - let announce_request = AnnounceRequestBuilder::default().into(); - - let host_opt_ip = IpAddr::V6(Ipv6Addr::from_str("2345:0425:2CA1:0000:0000:0567:5673:23b5").unwrap()); - let torrent_peer = Peer::from_udp_announce_request(&announce_request, remote_ip, Some(host_opt_ip)); - - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(host_opt_ip, announce_request.port.0)); - } - - #[test] - fn it_should_use_the_external_ip_in_tracker_configuration_if_defined_even_if_the_external_ip_is_an_ipv4_ip() { - let remote_ip = IpAddr::V6(Ipv6Addr::LOCALHOST); - let announce_request = AnnounceRequestBuilder::default().into(); - - let host_opt_ip = IpAddr::V4(Ipv4Addr::from_str("126.0.0.1").unwrap()); - let torrent_peer = Peer::from_udp_announce_request(&announce_request, remote_ip, Some(host_opt_ip)); - - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(host_opt_ip, announce_request.port.0)); - } + assert_eq!(torrent_peer.peer_addr, SocketAddr::new(remote_ip, announce_request.port.0)); } } mod torrent_peer_constructor_from_for_http_requests { - use std::net::{IpAddr, Ipv4Addr}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use crate::http::warp_implementation::request::Announce; use crate::protocol::info_hash::InfoHash; + use crate::tracker::assign_ip_address_to_peer; use crate::tracker::peer::{self, Peer}; fn sample_http_announce_request(peer_addr: IpAddr, port: u16) -> Announce { @@ -618,13 +526,16 @@ mod test { } #[test] - fn it_should_use_the_source_ip_in_the_udp_heder_as_the_peer_ip_address_ignoring_the_peer_ip_in_the_announce_request() { + fn it_should_use_the_source_ip_in_the_udp_header_as_the_peer_ip_address_ignoring_the_peer_ip_in_the_announce_request() { let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); let ip_in_announce_request = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)); let announce_request = sample_http_announce_request(ip_in_announce_request, 8080); - let torrent_peer = Peer::from_http_announce_request(&announce_request, remote_ip, None); + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port); + + let torrent_peer = Peer::from_http_announce_request(&announce_request, &peer_socket_address); assert_eq!(torrent_peer.peer_addr.ip(), remote_ip); assert_ne!(torrent_peer.peer_addr.ip(), ip_in_announce_request); @@ -639,18 +550,13 @@ mod test { let announce_request = sample_http_announce_request(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), port_in_announce_request); - let torrent_peer = Peer::from_http_announce_request(&announce_request, remote_ip, None); + let peer_ip = assign_ip_address_to_peer(&remote_ip, None); + let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port); + + let torrent_peer = Peer::from_http_announce_request(&announce_request, &peer_socket_address); assert_eq!(torrent_peer.peer_addr.port(), announce_request.port); assert_ne!(torrent_peer.peer_addr.port(), remote_port); } - - // todo: other cases are already covered by UDP cases. - // Code review: - // We should extract the method "peer_addr_from_ip_and_port_and_opt_host_ip" from TorrentPeer. - // It could be another service responsible for assigning the IP to the peer. - // So we can test that behavior independently from where you use it. - // We could also build the peer with the IP in the announce request and let the tracker decide - // wether it has to change it or not depending on tracker configuration. } } diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index b36399f89..b6d4bed7b 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -6,6 +6,7 @@ use aquatic_udp_protocol::{ AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, NumberOfDownloads, NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId, }; +use log::debug; use super::connection_cookie::{check, from_connection_id, into_connection_id, make}; use crate::protocol::common::MAX_SCRAPE_TORRENTS; @@ -93,6 +94,8 @@ pub async fn handle_announce( announce_request: &AnnounceRequest, tracker: Arc, ) -> Result { + debug!("udp announce request: {:#?}", announce_request); + check(&remote_addr, &from_connection_id(&announce_request.connection_id))?; let wrapped_announce_request = AnnounceWrapper::new(announce_request); @@ -104,13 +107,10 @@ pub async fn handle_announce( source: (Arc::new(e) as Arc).into(), })?; - let peer = peer::Peer::from_udp_announce_request( - &wrapped_announce_request.announce_request, - remote_addr.ip(), - tracker.config.get_ext_ip(), - ); - - //let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer).await; + // build the peer + let peer_ip = tracker.assign_ip_address_to_peer(&remote_addr.ip()); + let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port.0); + let peer = peer::Peer::from_udp_announce_request(&wrapped_announce_request.announce_request, &peer_socket_address); let torrent_stats = tracker .update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer) From 05ea74177dd033aed5704131869b8ae60a223432 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 10 Feb 2023 16:38:46 +0000 Subject: [PATCH 2/4] refactor: move code from domain to delivery layer --- src/http/warp_implementation/handlers.rs | 19 +- src/http/warp_implementation/mod.rs | 1 + src/http/warp_implementation/peer_builder.rs | 32 ++++ src/tracker/peer.rs | 181 +------------------ src/udp/handlers.rs | 9 +- src/udp/mod.rs | 1 + src/udp/peer_builder.rs | 18 ++ src/udp/request.rs | 15 -- 8 files changed, 63 insertions(+), 213 deletions(-) create mode 100644 src/http/warp_implementation/peer_builder.rs create mode 100644 src/udp/peer_builder.rs diff --git a/src/http/warp_implementation/handlers.rs b/src/http/warp_implementation/handlers.rs index 0fd332cae..f914e7555 100644 --- a/src/http/warp_implementation/handlers.rs +++ b/src/http/warp_implementation/handlers.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::convert::Infallible; -use std::net::{IpAddr, SocketAddr}; +use std::net::IpAddr; use std::panic::Location; use std::sync::Arc; @@ -10,6 +10,7 @@ use warp::{reject, Rejection, Reply}; use super::error::Error; use super::{request, response, WebResult}; +use crate::http::warp_implementation::peer_builder; use crate::protocol::info_hash::InfoHash; use crate::tracker::{self, auth, peer, statistics, torrent}; @@ -31,11 +32,9 @@ pub async fn authenticate( }) } -/// Handle announce request -/// /// # Errors /// -/// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_scrape_response`. +/// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_announce_response`. pub async fn handle_announce( announce_request: request::Announce, auth_key: Option, @@ -45,10 +44,9 @@ pub async fn handle_announce( authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await?; - // build the peer let peer_ip = tracker.assign_ip_address_to_peer(&announce_request.peer_addr); - let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port); - let peer = peer::Peer::from_http_announce_request(&announce_request, &peer_socket_address); + + let peer = peer_builder::from_request(&announce_request, &peer_ip); let torrent_stats = tracker .update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer) @@ -57,9 +55,6 @@ pub async fn handle_announce( // get all torrent peers excluding the peer_addr let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; - let announce_interval = tracker.config.announce_interval; - - // send stats event match announce_request.peer_addr { IpAddr::V4(_) => { tracker.send_stats_event(statistics::Event::Tcp4Announce).await; @@ -73,13 +68,11 @@ pub async fn handle_announce( &announce_request, &torrent_stats, &peers, - announce_interval, + tracker.config.announce_interval, tracker.config.min_announce_interval, ) } -/// Handle scrape request -/// /// # Errors /// /// Will return `warp::Rejection` that wraps the `ServerError` if unable to `send_scrape_response`. diff --git a/src/http/warp_implementation/mod.rs b/src/http/warp_implementation/mod.rs index 4fbfb48fb..1dec73b29 100644 --- a/src/http/warp_implementation/mod.rs +++ b/src/http/warp_implementation/mod.rs @@ -3,6 +3,7 @@ use warp::Rejection; pub mod error; pub mod filters; pub mod handlers; +pub mod peer_builder; pub mod request; pub mod response; pub mod routes; diff --git a/src/http/warp_implementation/peer_builder.rs b/src/http/warp_implementation/peer_builder.rs new file mode 100644 index 000000000..70cf7b508 --- /dev/null +++ b/src/http/warp_implementation/peer_builder.rs @@ -0,0 +1,32 @@ +use std::net::{IpAddr, SocketAddr}; + +use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; + +use super::request::Announce; +use crate::protocol::clock::{Current, Time}; +use crate::tracker::peer::Peer; + +#[must_use] +pub fn from_request(announce_request: &Announce, peer_ip: &IpAddr) -> Peer { + let event: AnnounceEvent = if let Some(event) = &announce_request.event { + match event.as_ref() { + "started" => AnnounceEvent::Started, + "stopped" => AnnounceEvent::Stopped, + "completed" => AnnounceEvent::Completed, + _ => AnnounceEvent::None, + } + } else { + AnnounceEvent::None + }; + + #[allow(clippy::cast_possible_truncation)] + Peer { + peer_id: announce_request.peer_id, + peer_addr: SocketAddr::new(*peer_ip, announce_request.port), + updated: Current::now(), + uploaded: NumberOfBytes(i128::from(announce_request.uploaded) as i64), + downloaded: NumberOfBytes(i128::from(announce_request.downloaded) as i64), + left: NumberOfBytes(i128::from(announce_request.left) as i64), + event, + } +} diff --git a/src/tracker/peer.rs b/src/tracker/peer.rs index e824a0cbc..24cc99f9b 100644 --- a/src/tracker/peer.rs +++ b/src/tracker/peer.rs @@ -6,8 +6,7 @@ use serde; use serde::Serialize; use thiserror::Error; -use crate::http::warp_implementation::request::Announce; -use crate::protocol::clock::{Current, DurationSinceUnixEpoch, Time}; +use crate::protocol::clock::DurationSinceUnixEpoch; use crate::protocol::common::{AnnounceEventDef, NumberOfBytesDef}; use crate::protocol::utils::ser_unix_time_value; @@ -28,44 +27,6 @@ pub struct Peer { } impl Peer { - #[must_use] - pub fn from_udp_announce_request(announce_request: &aquatic_udp_protocol::AnnounceRequest, peer_addr: &SocketAddr) -> Self { - Peer { - peer_id: Id(announce_request.peer_id.0), - peer_addr: *peer_addr, - updated: Current::now(), - uploaded: announce_request.bytes_uploaded, - downloaded: announce_request.bytes_downloaded, - left: announce_request.bytes_left, - event: announce_request.event, - } - } - - #[must_use] - pub fn from_http_announce_request(announce_request: &Announce, peer_addr: &SocketAddr) -> Self { - let event: AnnounceEvent = if let Some(event) = &announce_request.event { - match event.as_ref() { - "started" => AnnounceEvent::Started, - "stopped" => AnnounceEvent::Stopped, - "completed" => AnnounceEvent::Completed, - _ => AnnounceEvent::None, - } - } else { - AnnounceEvent::None - }; - - #[allow(clippy::cast_possible_truncation)] - Peer { - peer_id: announce_request.peer_id, - peer_addr: *peer_addr, - updated: Current::now(), - uploaded: NumberOfBytes(i128::from(announce_request.uploaded) as i64), - downloaded: NumberOfBytes(i128::from(announce_request.downloaded) as i64), - left: NumberOfBytes(i128::from(announce_request.left) as i64), - event, - } - } - #[must_use] pub fn is_seeder(&self) -> bool { self.left.0 <= 0 && self.event != AnnounceEvent::Stopped @@ -419,144 +380,4 @@ mod test { ); } } - - mod torrent_peer_constructor_from_udp_requests { - - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - - use aquatic_udp_protocol::{ - AnnounceEvent, AnnounceRequest, NumberOfBytes, NumberOfPeers, PeerId as AquaticPeerId, PeerKey, Port, TransactionId, - }; - - use crate::tracker::assign_ip_address_to_peer; - use crate::tracker::peer::Peer; - use crate::udp::connection_cookie::{into_connection_id, make}; - - // todo: duplicate functions is PR 82. Remove duplication once both PR are merged. - - fn sample_ipv4_remote_addr() -> SocketAddr { - sample_ipv4_socket_address() - } - - fn sample_ipv4_socket_address() -> SocketAddr { - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080) - } - - struct AnnounceRequestBuilder { - request: AnnounceRequest, - } - - impl AnnounceRequestBuilder { - pub fn default() -> AnnounceRequestBuilder { - let client_ip = Ipv4Addr::new(126, 0, 0, 1); - let client_port = 8080; - let info_hash_aquatic = aquatic_udp_protocol::InfoHash([0u8; 20]); - - let default_request = AnnounceRequest { - connection_id: into_connection_id(&make(&sample_ipv4_remote_addr())), - transaction_id: TransactionId(0i32), - info_hash: info_hash_aquatic, - peer_id: AquaticPeerId(*b"-qB00000000000000000"), - bytes_downloaded: NumberOfBytes(0i64), - bytes_uploaded: NumberOfBytes(0i64), - bytes_left: NumberOfBytes(0i64), - event: AnnounceEvent::Started, - ip_address: Some(client_ip), - key: PeerKey(0u32), - peers_wanted: NumberOfPeers(1i32), - port: Port(client_port), - }; - AnnounceRequestBuilder { - request: default_request, - } - } - - pub fn into(self) -> AnnounceRequest { - self.request - } - } - - #[test] - fn it_should_use_the_udp_source_ip_as_the_peer_ip_address_instead_of_the_ip_in_the_announce_request() { - let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); - let announce_request = AnnounceRequestBuilder::default().into(); - - let peer_ip = assign_ip_address_to_peer(&remote_ip, None); - let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port.0); - - let torrent_peer = Peer::from_udp_announce_request(&announce_request, &peer_socket_address); - - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(remote_ip, announce_request.port.0)); - } - - #[test] - fn it_should_always_use_the_port_in_the_announce_request_for_the_peer_port() { - let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); - let announce_request = AnnounceRequestBuilder::default().into(); - - let peer_ip = assign_ip_address_to_peer(&remote_ip, None); - let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port.0); - - let torrent_peer = Peer::from_udp_announce_request(&announce_request, &peer_socket_address); - - assert_eq!(torrent_peer.peer_addr, SocketAddr::new(remote_ip, announce_request.port.0)); - } - } - - mod torrent_peer_constructor_from_for_http_requests { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - - use crate::http::warp_implementation::request::Announce; - use crate::protocol::info_hash::InfoHash; - use crate::tracker::assign_ip_address_to_peer; - use crate::tracker::peer::{self, Peer}; - - fn sample_http_announce_request(peer_addr: IpAddr, port: u16) -> Announce { - Announce { - info_hash: InfoHash([0u8; 20]), - peer_addr, - downloaded: 0u64, - uploaded: 0u64, - peer_id: peer::Id(*b"-qB00000000000000000"), - port, - left: 0u64, - event: None, - compact: None, - } - } - - #[test] - fn it_should_use_the_source_ip_in_the_udp_header_as_the_peer_ip_address_ignoring_the_peer_ip_in_the_announce_request() { - let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); - - let ip_in_announce_request = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)); - let announce_request = sample_http_announce_request(ip_in_announce_request, 8080); - - let peer_ip = assign_ip_address_to_peer(&remote_ip, None); - let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port); - - let torrent_peer = Peer::from_http_announce_request(&announce_request, &peer_socket_address); - - assert_eq!(torrent_peer.peer_addr.ip(), remote_ip); - assert_ne!(torrent_peer.peer_addr.ip(), ip_in_announce_request); - } - - #[test] - fn it_should_always_use_the_port_in_the_announce_request_for_the_peer_port_ignoring_the_port_in_the_udp_header() { - let remote_ip = IpAddr::V4(Ipv4Addr::new(126, 0, 0, 2)); - let remote_port = 8080; - - let port_in_announce_request = 8081; - let announce_request = - sample_http_announce_request(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), port_in_announce_request); - - let peer_ip = assign_ip_address_to_peer(&remote_ip, None); - let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port); - - let torrent_peer = Peer::from_http_announce_request(&announce_request, &peer_socket_address); - - assert_eq!(torrent_peer.peer_addr.port(), announce_request.port); - assert_ne!(torrent_peer.peer_addr.port(), remote_port); - } - } } diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index b6d4bed7b..53efa7ecc 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -11,8 +11,9 @@ use log::debug; use super::connection_cookie::{check, from_connection_id, into_connection_id, make}; use crate::protocol::common::MAX_SCRAPE_TORRENTS; use crate::protocol::info_hash::InfoHash; -use crate::tracker::{self, peer, statistics}; +use crate::tracker::{self, statistics}; use crate::udp::error::Error; +use crate::udp::peer_builder; use crate::udp::request::AnnounceWrapper; pub async fn handle_packet(remote_addr: SocketAddr, payload: Vec, tracker: Arc) -> Response { @@ -107,10 +108,9 @@ pub async fn handle_announce( source: (Arc::new(e) as Arc).into(), })?; - // build the peer let peer_ip = tracker.assign_ip_address_to_peer(&remote_addr.ip()); - let peer_socket_address = SocketAddr::new(peer_ip, announce_request.port.0); - let peer = peer::Peer::from_udp_announce_request(&wrapped_announce_request.announce_request, &peer_socket_address); + + let peer = peer_builder::from_request(&wrapped_announce_request, &peer_ip); let torrent_stats = tracker .update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer) @@ -164,7 +164,6 @@ pub async fn handle_announce( }) }; - // send stats event match remote_addr { SocketAddr::V4(_) => { tracker.send_stats_event(statistics::Event::Udp4Announce).await; diff --git a/src/udp/mod.rs b/src/udp/mod.rs index 8b8c8c4f8..b6431f752 100644 --- a/src/udp/mod.rs +++ b/src/udp/mod.rs @@ -3,6 +3,7 @@ pub mod error; pub mod handlers; pub mod request; pub mod server; +pub mod peer_builder; pub type Bytes = u64; pub type Port = u16; diff --git a/src/udp/peer_builder.rs b/src/udp/peer_builder.rs new file mode 100644 index 000000000..84eae64f9 --- /dev/null +++ b/src/udp/peer_builder.rs @@ -0,0 +1,18 @@ +use std::net::{IpAddr, SocketAddr}; + +use super::request::AnnounceWrapper; +use crate::protocol::clock::{Current, Time}; +use crate::tracker::peer::{Id, Peer}; + +#[must_use] +pub fn from_request(announce_wrapper: &AnnounceWrapper, peer_ip: &IpAddr) -> Peer { + Peer { + peer_id: Id(announce_wrapper.announce_request.peer_id.0), + peer_addr: SocketAddr::new(*peer_ip, announce_wrapper.announce_request.port.0), + updated: Current::now(), + uploaded: announce_wrapper.announce_request.bytes_uploaded, + downloaded: announce_wrapper.announce_request.bytes_downloaded, + left: announce_wrapper.announce_request.bytes_left, + event: announce_wrapper.announce_request.event, + } +} diff --git a/src/udp/request.rs b/src/udp/request.rs index c4326b291..28d75f860 100644 --- a/src/udp/request.rs +++ b/src/udp/request.rs @@ -2,21 +2,6 @@ use aquatic_udp_protocol::AnnounceRequest; use crate::protocol::info_hash::InfoHash; -// struct AnnounceRequest { -// pub connection_id: i64, -// pub transaction_id: i32, -// pub info_hash: InfoHash, -// pub peer_id: PeerId, -// pub bytes_downloaded: Bytes, -// pub bytes_uploaded: Bytes, -// pub bytes_left: Bytes, -// pub event: AnnounceEvent, -// pub ip_address: Option, -// pub key: u32, -// pub peers_wanted: u32, -// pub port: Port -// } - pub struct AnnounceWrapper { pub announce_request: AnnounceRequest, pub info_hash: InfoHash, From 156ac4d0c9bb9a734d586564de4eb24bac60f399 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 10 Feb 2023 17:17:23 +0000 Subject: [PATCH 3/4] refactor: clean announce request handlers There is duplicate code in announce handlers for UDP and HTTP tracker. This change makes them more similar in order to extract the common part later. --- src/http/warp_implementation/handlers.rs | 16 ++++---- src/tracker/mod.rs | 2 +- src/udp/handlers.rs | 51 +++++++++++++----------- 3 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/http/warp_implementation/handlers.rs b/src/http/warp_implementation/handlers.rs index f914e7555..2a0aa005c 100644 --- a/src/http/warp_implementation/handlers.rs +++ b/src/http/warp_implementation/handlers.rs @@ -42,20 +42,20 @@ pub async fn handle_announce( ) -> WebResult { debug!("http announce request: {:#?}", announce_request); - authenticate(&announce_request.info_hash, &auth_key, tracker.clone()).await?; + let info_hash = announce_request.info_hash; + let remote_client_ip = announce_request.peer_addr; - let peer_ip = tracker.assign_ip_address_to_peer(&announce_request.peer_addr); + authenticate(&info_hash, &auth_key, tracker.clone()).await?; + + let peer_ip = tracker.assign_ip_address_to_peer(&remote_client_ip); let peer = peer_builder::from_request(&announce_request, &peer_ip); - let torrent_stats = tracker - .update_torrent_with_peer_and_get_stats(&announce_request.info_hash, &peer) - .await; + let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; - // get all torrent peers excluding the peer_addr - let peers = tracker.get_torrent_peers(&announce_request.info_hash, &peer.peer_addr).await; + let peers = tracker.get_other_peers(&info_hash, &peer.peer_addr).await; - match announce_request.peer_addr { + match remote_client_ip { IpAddr::V4(_) => { tracker.send_stats_event(statistics::Event::Tcp4Announce).await; } diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index f31a71fbb..a6ea6d3b0 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -279,7 +279,7 @@ impl Tracker { } /// Get all torrent peers for a given torrent filtering out the peer with the client address - pub async fn get_torrent_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr) -> Vec { + pub async fn get_other_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr) -> Vec { let read_lock = self.torrents.read().await; match read_lock.get(info_hash) { diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 53efa7ecc..283041333 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -87,6 +87,18 @@ pub async fn handle_connect( Ok(response) } +/// # Errors +/// +/// Will return `Error` if unable to `authenticate_request`. +pub async fn authenticate(info_hash: &InfoHash, tracker: Arc) -> Result<(), Error> { + tracker + .authenticate_request(info_hash, &None) + .await + .map_err(|e| Error::TrackerError { + source: (Arc::new(e) as Arc).into(), + }) +} + /// # Errors /// /// If a error happens in the `handle_announce` function, it will just return the `ServerError`. @@ -101,25 +113,27 @@ pub async fn handle_announce( let wrapped_announce_request = AnnounceWrapper::new(announce_request); - tracker - .authenticate_request(&wrapped_announce_request.info_hash, &None) - .await - .map_err(|e| Error::TrackerError { - source: (Arc::new(e) as Arc).into(), - })?; + let info_hash = wrapped_announce_request.info_hash; + let remote_client_ip = remote_addr.ip(); + + authenticate(&info_hash, tracker.clone()).await?; - let peer_ip = tracker.assign_ip_address_to_peer(&remote_addr.ip()); + let peer_ip = tracker.assign_ip_address_to_peer(&remote_client_ip); let peer = peer_builder::from_request(&wrapped_announce_request, &peer_ip); - let torrent_stats = tracker - .update_torrent_with_peer_and_get_stats(&wrapped_announce_request.info_hash, &peer) - .await; + let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; - // get all peers excluding the client_addr - let peers = tracker - .get_torrent_peers(&wrapped_announce_request.info_hash, &peer.peer_addr) - .await; + let peers = tracker.get_other_peers(&info_hash, &peer.peer_addr).await; + + match remote_client_ip { + IpAddr::V4(_) => { + tracker.send_stats_event(statistics::Event::Udp4Announce).await; + } + IpAddr::V6(_) => { + tracker.send_stats_event(statistics::Event::Udp6Announce).await; + } + } #[allow(clippy::cast_possible_truncation)] let announce_response = if remote_addr.is_ipv4() { @@ -164,15 +178,6 @@ pub async fn handle_announce( }) }; - match remote_addr { - SocketAddr::V4(_) => { - tracker.send_stats_event(statistics::Event::Udp4Announce).await; - } - SocketAddr::V6(_) => { - tracker.send_stats_event(statistics::Event::Udp6Announce).await; - } - } - Ok(announce_response) } From cecbc17352af2d61ba6c6aa6ebcfbb62283004f4 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 10 Feb 2023 18:16:49 +0000 Subject: [PATCH 4/4] refactor: extract duplicate code from announce request handlers --- src/http/warp_implementation/handlers.rs | 12 ++++-------- src/tracker/mod.rs | 20 +++++++++++++++++++- src/tracker/peer.rs | 6 +++++- src/udp/handlers.rs | 22 ++++++++++------------ src/udp/mod.rs | 2 +- 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/http/warp_implementation/handlers.rs b/src/http/warp_implementation/handlers.rs index 2a0aa005c..fd927150f 100644 --- a/src/http/warp_implementation/handlers.rs +++ b/src/http/warp_implementation/handlers.rs @@ -47,13 +47,9 @@ pub async fn handle_announce( authenticate(&info_hash, &auth_key, tracker.clone()).await?; - let peer_ip = tracker.assign_ip_address_to_peer(&remote_client_ip); + let mut peer = peer_builder::from_request(&announce_request, &remote_client_ip); - let peer = peer_builder::from_request(&announce_request, &peer_ip); - - let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; - - let peers = tracker.get_other_peers(&info_hash, &peer.peer_addr).await; + let response = tracker.announce(&info_hash, &mut peer, &remote_client_ip).await; match remote_client_ip { IpAddr::V4(_) => { @@ -66,8 +62,8 @@ pub async fn handle_announce( send_announce_response( &announce_request, - &torrent_stats, - &peers, + &response.swam_stats, + &response.peers, tracker.config.announce_interval, tracker.config.min_announce_interval, ) diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index a6ea6d3b0..42dbec17c 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -17,6 +17,8 @@ use tokio::sync::mpsc::error::SendError; use tokio::sync::{RwLock, RwLockReadGuard}; use self::error::Error; +use self::peer::Peer; +use self::torrent::SwamStats; use crate::config::Configuration; use crate::databases::driver::Driver; use crate::databases::{self, Database}; @@ -41,6 +43,11 @@ pub struct TorrentsMetrics { pub torrents: u64, } +pub struct AnnounceResponse { + pub peers: Vec, + pub swam_stats: SwamStats, +} + impl Tracker { /// # Errors /// @@ -76,7 +83,18 @@ impl Tracker { self.mode == mode::Mode::Listed || self.mode == mode::Mode::PrivateListed } - /// It assigns a socket address to the peer + /// It handles an announce request + pub async fn announce(&self, info_hash: &InfoHash, peer: &mut Peer, remote_client_ip: &IpAddr) -> AnnounceResponse { + peer.change_ip(&self.assign_ip_address_to_peer(remote_client_ip)); + + let swam_stats = self.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + + let peers = self.get_other_peers(info_hash, &peer.peer_addr).await; + + AnnounceResponse { peers, swam_stats } + } + + /// It assigns an IP address to the peer #[must_use] pub fn assign_ip_address_to_peer(&self, remote_client_ip: &IpAddr) -> IpAddr { assign_ip_address_to_peer(remote_client_ip, self.config.get_ext_ip()) diff --git a/src/tracker/peer.rs b/src/tracker/peer.rs index 24cc99f9b..7559463db 100644 --- a/src/tracker/peer.rs +++ b/src/tracker/peer.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::panic::Location; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; @@ -31,6 +31,10 @@ impl Peer { pub fn is_seeder(&self) -> bool { self.left.0 <= 0 && self.event != AnnounceEvent::Stopped } + + pub fn change_ip(&mut self, new_ip: &IpAddr) { + self.peer_addr = SocketAddr::new(*new_ip, self.peer_addr.port()); + } } #[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord, Copy)] diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 283041333..8978beb70 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -118,13 +118,9 @@ pub async fn handle_announce( authenticate(&info_hash, tracker.clone()).await?; - let peer_ip = tracker.assign_ip_address_to_peer(&remote_client_ip); + let mut peer = peer_builder::from_request(&wrapped_announce_request, &remote_client_ip); - let peer = peer_builder::from_request(&wrapped_announce_request, &peer_ip); - - let torrent_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; - - let peers = tracker.get_other_peers(&info_hash, &peer.peer_addr).await; + let response = tracker.announce(&info_hash, &mut peer, &remote_client_ip).await; match remote_client_ip { IpAddr::V4(_) => { @@ -140,9 +136,10 @@ pub async fn handle_announce( Response::from(AnnounceResponse { transaction_id: wrapped_announce_request.announce_request.transaction_id, announce_interval: AnnounceInterval(i64::from(tracker.config.announce_interval) as i32), - leechers: NumberOfPeers(i64::from(torrent_stats.leechers) as i32), - seeders: NumberOfPeers(i64::from(torrent_stats.seeders) as i32), - peers: peers + leechers: NumberOfPeers(i64::from(response.swam_stats.leechers) as i32), + seeders: NumberOfPeers(i64::from(response.swam_stats.seeders) as i32), + peers: response + .peers .iter() .filter_map(|peer| { if let IpAddr::V4(ip) = peer.peer_addr.ip() { @@ -160,9 +157,10 @@ pub async fn handle_announce( Response::from(AnnounceResponse { transaction_id: wrapped_announce_request.announce_request.transaction_id, announce_interval: AnnounceInterval(i64::from(tracker.config.announce_interval) as i32), - leechers: NumberOfPeers(i64::from(torrent_stats.leechers) as i32), - seeders: NumberOfPeers(i64::from(torrent_stats.seeders) as i32), - peers: peers + leechers: NumberOfPeers(i64::from(response.swam_stats.leechers) as i32), + seeders: NumberOfPeers(i64::from(response.swam_stats.seeders) as i32), + peers: response + .peers .iter() .filter_map(|peer| { if let IpAddr::V6(ip) = peer.peer_addr.ip() { diff --git a/src/udp/mod.rs b/src/udp/mod.rs index b6431f752..7b755a20b 100644 --- a/src/udp/mod.rs +++ b/src/udp/mod.rs @@ -1,9 +1,9 @@ pub mod connection_cookie; pub mod error; pub mod handlers; +pub mod peer_builder; pub mod request; pub mod server; -pub mod peer_builder; pub type Bytes = u64; pub type Port = u16;