diff --git a/src/config.rs b/src/config.rs index f9166e577..67078d608 100644 --- a/src/config.rs +++ b/src/config.rs @@ -52,10 +52,12 @@ pub struct HttpApiConfig { #[derive(Serialize, Deserialize)] pub struct Configuration { pub log_level: Option, + pub log_interval: Option, pub mode: TrackerMode, pub db_driver: DatabaseDrivers, pub db_path: String, pub persistence: bool, + pub persistence_interval: Option, pub cleanup_interval: Option, pub cleanup_peerless: bool, pub external_ip: Option, @@ -135,10 +137,12 @@ impl Configuration { pub fn default() -> Configuration { let mut configuration = Configuration { log_level: Option::from(String::from("info")), + log_interval: Some(60), mode: TrackerMode::PublicMode, db_driver: DatabaseDrivers::Sqlite3, db_path: String::from("data.db"), persistence: false, + persistence_interval: Some(900), cleanup_interval: Some(600), cleanup_peerless: true, external_ip: Some(String::from("0.0.0.0")), diff --git a/src/main.rs b/src/main.rs index 721385760..9ba45427e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,6 +29,8 @@ async fn main() { panic!("Could not load persistent torrents.") }; info!("Persistent torrents loaded."); + + let _torrent_periodic_job = start_torrent_periodic_job(config.clone(), tracker.clone()).unwrap(); } // start torrent cleanup job (periodically removes old peers) @@ -68,6 +70,9 @@ async fn main() { let _ = start_http_tracker_server(&http_tracker, tracker.clone()); } + // start a thread to post statistics + let _ = start_statistics_job(config.clone(), tracker.clone()).unwrap(); + // handle the signals here tokio::select! { _ = tokio::signal::ctrl_c() => { @@ -82,13 +87,35 @@ async fn main() { // Save torrents if enabled if config.persistence { info!("Saving torrents into SQL from memory..."); - let _ = tracker.save_torrents().await; + let _ = tracker.periodic_saving().await; info!("Torrents saved"); } } } } +fn start_torrent_periodic_job(config: Arc, tracker: Arc) -> Option> { + let weak_tracker = std::sync::Arc::downgrade(&tracker); + let interval = config.persistence_interval.unwrap_or(900); + + return Some(tokio::spawn(async move { + let interval = std::time::Duration::from_secs(interval); + let mut interval = tokio::time::interval(interval); + interval.tick().await; // first tick is immediate... + // periodically call tracker.cleanup_torrents() + loop { + interval.tick().await; + if let Some(tracker) = weak_tracker.upgrade() { + info!("Executing periodic saving..."); + tracker.periodic_saving().await; + info!("Periodic saving done."); + } else { + break; + } + } + })); +} + fn start_torrent_cleanup_job(config: Arc, tracker: Arc) -> Option> { let weak_tracker = std::sync::Arc::downgrade(&tracker); let interval = config.cleanup_interval.unwrap_or(600); @@ -109,6 +136,26 @@ fn start_torrent_cleanup_job(config: Arc, tracker: Arc, tracker: Arc) -> Option> { + let weak_tracker = std::sync::Arc::downgrade(&tracker); + let interval = config.log_interval.unwrap_or(60); + + return Some(tokio::spawn(async move { + let interval = std::time::Duration::from_secs(interval); + let mut interval = tokio::time::interval(interval); + interval.tick().await; // first tick is immediate... + // periodically call tracker.cleanup_torrents() + loop { + interval.tick().await; + if let Some(tracker) = weak_tracker.upgrade() { + tracker.post_log().await; + } else { + break; + } + } + })); +} + fn start_api_server(config: &HttpApiConfig, tracker: Arc) -> JoinHandle<()> { info!("Starting HTTP API server on: {}", config.bind_address); let bind_addr = config.bind_address.parse::().unwrap(); diff --git a/src/tracker.rs b/src/tracker.rs index eb6f006bf..4aaa767be 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -6,6 +6,7 @@ use crate::common::{InfoHash}; use std::net::{SocketAddr}; use crate::{Configuration, database, key_manager}; use std::collections::btree_map::Entry; +use std::mem; use std::sync::Arc; use log::info; use crate::key_manager::AuthKey; @@ -36,6 +37,8 @@ pub enum TrackerMode { pub struct TorrentTracker { pub config: Arc, torrents: tokio::sync::RwLock>, + updates: tokio::sync::RwLock>, + shadow: tokio::sync::RwLock>, database: Box, pub stats_tracker: StatsTracker } @@ -50,6 +53,8 @@ impl TorrentTracker { Ok(TorrentTracker { config, torrents: RwLock::new(std::collections::BTreeMap::new()), + updates: RwLock::new(std::collections::HashMap::new()), + shadow: RwLock::new(std::collections::HashMap::new()), database, stats_tracker }) @@ -178,6 +183,15 @@ impl TorrentTracker { let (seeders, completed, leechers) = torrent_entry.get_stats(); + if self.config.persistence { + let mut updates = self.updates.write().await; + if updates.contains_key(info_hash) { + updates.remove(info_hash); + } + updates.insert(*info_hash, completed); + drop(updates); + } + TorrentStats { seeders, leechers, @@ -212,6 +226,19 @@ impl TorrentTracker { self.stats_tracker.get_stats().await } + pub async fn post_log(&self) { + let torrents = self.torrents.read().await; + let torrents_size = mem::size_of_val(&*torrents); + drop(torrents); + let updates = self.updates.read().await; + let updates_size = mem::size_of_val(&*updates); + drop(updates); + let shadow = self.shadow.read().await; + let shadow_size = mem::size_of_val(&*shadow); + drop(shadow); + info!("Stats [::] Torrents: {} byte(s) | Updates: {} byte(s) | Shadow: {} byte(s)", torrents_size, updates_size, shadow_size); + } + // remove torrents without peers if enabled, and defragment memory pub async fn cleanup_torrents(&self) { info!("Cleaning torrents..."); @@ -261,4 +288,43 @@ impl TorrentTracker { } info!("Torrents cleaned up."); } + + pub async fn periodic_saving(&self) { + // Get a lock for writing + let mut shadow = self.shadow.write().await; + + // We will get the data and insert it into the shadow, while clearing updates. + let mut updates = self.updates.write().await; + + for (infohash, completed) in updates.iter() { + if shadow.contains_key(infohash) { + shadow.remove(infohash); + } + shadow.insert(*infohash, *completed); + } + updates.clear(); + drop(updates); + + // We get shadow data into local array to be handled. + let mut shadow_copy: BTreeMap = BTreeMap::new(); + for (infohash, completed) in shadow.iter() { + shadow_copy.insert(*infohash, TorrentEntry{ + peers: Default::default(), + completed: *completed, + seeders: 0 + }); + } + + // Drop the lock + drop(shadow); + + // We will now save the data from the shadow into the database. + // This should not put any strain on the server itself, other then the harddisk/ssd. + let result = self.database.save_persistent_torrent_data(&shadow_copy).await; + if result.is_ok() { + let mut shadow = self.shadow.write().await; + shadow.clear(); + drop(shadow); + } + } }