Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ pub struct HttpApiConfig {
#[derive(Serialize, Deserialize)]
pub struct Configuration {
pub log_level: Option<String>,
pub log_interval: Option<u64>,
pub mode: TrackerMode,
pub db_driver: DatabaseDrivers,
pub db_path: String,
pub persistence: bool,
pub persistence_interval: Option<u64>,
pub cleanup_interval: Option<u64>,
pub cleanup_peerless: bool,
pub external_ip: Option<String>,
Expand Down Expand Up @@ -135,10 +137,12 @@ impl Configuration {
pub fn default() -> Configuration {
let mut configuration = Configuration {
log_level: Option::from(String::from("info")),
log_interval: Some(60),
mode: TrackerMode::PublicMode,
db_driver: DatabaseDrivers::Sqlite3,
db_path: String::from("data.db"),
persistence: false,
persistence_interval: Some(900),
cleanup_interval: Some(600),
cleanup_peerless: true,
external_ip: Some(String::from("0.0.0.0")),
Expand Down
49 changes: 48 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ async fn main() {
panic!("Could not load persistent torrents.")
};
info!("Persistent torrents loaded.");

let _torrent_periodic_job = start_torrent_periodic_job(config.clone(), tracker.clone()).unwrap();
}

// start torrent cleanup job (periodically removes old peers)
Expand Down Expand Up @@ -68,6 +70,9 @@ async fn main() {
let _ = start_http_tracker_server(&http_tracker, tracker.clone());
}

// start a thread to post statistics
let _ = start_statistics_job(config.clone(), tracker.clone()).unwrap();

// handle the signals here
tokio::select! {
_ = tokio::signal::ctrl_c() => {
Expand All @@ -82,13 +87,35 @@ async fn main() {
// Save torrents if enabled
if config.persistence {
info!("Saving torrents into SQL from memory...");
let _ = tracker.save_torrents().await;
let _ = tracker.periodic_saving().await;
info!("Torrents saved");
}
}
}
}

fn start_torrent_periodic_job(config: Arc<Configuration>, tracker: Arc<TorrentTracker>) -> Option<JoinHandle<()>> {
let weak_tracker = std::sync::Arc::downgrade(&tracker);
let interval = config.persistence_interval.unwrap_or(900);

return Some(tokio::spawn(async move {
let interval = std::time::Duration::from_secs(interval);
let mut interval = tokio::time::interval(interval);
interval.tick().await; // first tick is immediate...
// periodically call tracker.cleanup_torrents()
loop {
interval.tick().await;
if let Some(tracker) = weak_tracker.upgrade() {
info!("Executing periodic saving...");
tracker.periodic_saving().await;
info!("Periodic saving done.");
} else {
break;
}
}
}));
}

fn start_torrent_cleanup_job(config: Arc<Configuration>, tracker: Arc<TorrentTracker>) -> Option<JoinHandle<()>> {
let weak_tracker = std::sync::Arc::downgrade(&tracker);
let interval = config.cleanup_interval.unwrap_or(600);
Expand All @@ -109,6 +136,26 @@ fn start_torrent_cleanup_job(config: Arc<Configuration>, tracker: Arc<TorrentTra
}));
}

fn start_statistics_job(config: Arc<Configuration>, tracker: Arc<TorrentTracker>) -> Option<JoinHandle<()>> {
let weak_tracker = std::sync::Arc::downgrade(&tracker);
let interval = config.log_interval.unwrap_or(60);

return Some(tokio::spawn(async move {
let interval = std::time::Duration::from_secs(interval);
let mut interval = tokio::time::interval(interval);
interval.tick().await; // first tick is immediate...
// periodically call tracker.cleanup_torrents()
loop {
interval.tick().await;
if let Some(tracker) = weak_tracker.upgrade() {
tracker.post_log().await;
} else {
break;
}
}
}));
}

fn start_api_server(config: &HttpApiConfig, tracker: Arc<TorrentTracker>) -> JoinHandle<()> {
info!("Starting HTTP API server on: {}", config.bind_address);
let bind_addr = config.bind_address.parse::<std::net::SocketAddr>().unwrap();
Expand Down
66 changes: 66 additions & 0 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::common::{InfoHash};
use std::net::{SocketAddr};
use crate::{Configuration, database, key_manager};
use std::collections::btree_map::Entry;
use std::mem;
use std::sync::Arc;
use log::info;
use crate::key_manager::AuthKey;
Expand Down Expand Up @@ -36,6 +37,8 @@ pub enum TrackerMode {
pub struct TorrentTracker {
pub config: Arc<Configuration>,
torrents: tokio::sync::RwLock<std::collections::BTreeMap<InfoHash, TorrentEntry>>,
updates: tokio::sync::RwLock<std::collections::HashMap<InfoHash, u32>>,
shadow: tokio::sync::RwLock<std::collections::HashMap<InfoHash, u32>>,
database: Box<dyn Database>,
pub stats_tracker: StatsTracker
}
Expand All @@ -50,6 +53,8 @@ impl TorrentTracker {
Ok(TorrentTracker {
config,
torrents: RwLock::new(std::collections::BTreeMap::new()),
updates: RwLock::new(std::collections::HashMap::new()),
shadow: RwLock::new(std::collections::HashMap::new()),
database,
stats_tracker
})
Expand Down Expand Up @@ -178,6 +183,15 @@ impl TorrentTracker {

let (seeders, completed, leechers) = torrent_entry.get_stats();

if self.config.persistence {
let mut updates = self.updates.write().await;
if updates.contains_key(info_hash) {
updates.remove(info_hash);
}
updates.insert(*info_hash, completed);
drop(updates);
}

TorrentStats {
seeders,
leechers,
Expand Down Expand Up @@ -212,6 +226,19 @@ impl TorrentTracker {
self.stats_tracker.get_stats().await
}

pub async fn post_log(&self) {
let torrents = self.torrents.read().await;
let torrents_size = mem::size_of_val(&*torrents);
drop(torrents);
let updates = self.updates.read().await;
let updates_size = mem::size_of_val(&*updates);
drop(updates);
let shadow = self.shadow.read().await;
let shadow_size = mem::size_of_val(&*shadow);
drop(shadow);
info!("Stats [::] Torrents: {} byte(s) | Updates: {} byte(s) | Shadow: {} byte(s)", torrents_size, updates_size, shadow_size);
}

// remove torrents without peers if enabled, and defragment memory
pub async fn cleanup_torrents(&self) {
info!("Cleaning torrents...");
Expand Down Expand Up @@ -261,4 +288,43 @@ impl TorrentTracker {
}
info!("Torrents cleaned up.");
}

pub async fn periodic_saving(&self) {
// Get a lock for writing
let mut shadow = self.shadow.write().await;

// We will get the data and insert it into the shadow, while clearing updates.
let mut updates = self.updates.write().await;

for (infohash, completed) in updates.iter() {
if shadow.contains_key(infohash) {
shadow.remove(infohash);
}
shadow.insert(*infohash, *completed);
}
updates.clear();
drop(updates);

// We get shadow data into local array to be handled.
let mut shadow_copy: BTreeMap<InfoHash, TorrentEntry> = BTreeMap::new();
for (infohash, completed) in shadow.iter() {
shadow_copy.insert(*infohash, TorrentEntry{
peers: Default::default(),
completed: *completed,
seeders: 0
});
}

// Drop the lock
drop(shadow);

// We will now save the data from the shadow into the database.
// This should not put any strain on the server itself, other then the harddisk/ssd.
let result = self.database.save_persistent_torrent_data(&shadow_copy).await;
if result.is_ok() {
let mut shadow = self.shadow.write().await;
shadow.clear();
drop(shadow);
}
}
}