diff --git a/app/Actions/Ookla/TriggerRustSpeedtest.php b/app/Actions/Ookla/TriggerRustSpeedtest.php new file mode 100644 index 000000000..5c2fb48b0 --- /dev/null +++ b/app/Actions/Ookla/TriggerRustSpeedtest.php @@ -0,0 +1,115 @@ +isRustServiceHealthy($rustServiceUrl)) { + Log::warning('rust service unhealthy, falling back to php'); + return RunSpeedtest::run($scheduled, $serverId, $dispatchedBy); + } + + // Create the result record + $result = Result::create([ + 'data->server->id' => $serverId, + 'service' => ResultService::Ookla, + 'status' => ResultStatus::Waiting, + 'scheduled' => $scheduled, + 'dispatched_by' => $dispatchedBy, + ]); + + SpeedtestWaiting::dispatch($result); + + // Trigger speedtest via Rust service + try { + $callbackUrl = route('api.v1.speedtest.callback'); + + $response = Http::timeout(10) + ->post("{$rustServiceUrl}/speedtest", [ + 'server_id' => $serverId, + 'callback_url' => $callbackUrl, + 'metadata' => [ + 'result_id' => $result->id, + 'scheduled' => $scheduled, + ], + ]); + + if ($response->successful()) { + $data = $response->json(); + Log::info('speedtest triggered via rust service', [ + 'result_id' => $result->id, + 'test_id' => $data['test_id'] ?? null, + ]); + + // Store the Rust test_id for tracking + $result->update([ + 'data->rust_test_id' => $data['test_id'] ?? null, + 'status' => ResultStatus::Checking, + ]); + + return $result; + } + + Log::error('rust service returned error', [ + 'status' => $response->status(), + 'body' => $response->body(), + ]); + } catch (\Exception $e) { + Log::error('failed to contact rust service', [ + 'error' => $e->getMessage(), + ]); + } + + // Fall back to PHP on any error + Log::warning('rust service request failed, falling back to php'); + + // Delete the created result and use PHP flow + $result->delete(); + + return RunSpeedtest::run($scheduled, $serverId, $dispatchedBy); + } + + /** + * Check if the Rust service is healthy. + */ + protected function isRustServiceHealthy(string $baseUrl): bool + { + try { + $response = Http::timeout(5)->get("{$baseUrl}/health"); + + return $response->successful() && + ($response->json('status') === 'healthy'); + } catch (\Exception $e) { + Log::debug('rust service health check failed', [ + 'error' => $e->getMessage(), + ]); + + return false; + } + } +} diff --git a/app/Http/Controllers/Api/V1/SpeedtestCallbackController.php b/app/Http/Controllers/Api/V1/SpeedtestCallbackController.php new file mode 100644 index 000000000..1aca8d551 --- /dev/null +++ b/app/Http/Controllers/Api/V1/SpeedtestCallbackController.php @@ -0,0 +1,134 @@ + $request->input('test_id'), + 'status' => $request->input('status'), + ]); + + $resultId = $request->input('metadata.result_id'); + + if (! $resultId) { + Log::warning('callback missing result_id in metadata'); + + return response()->json(['error' => 'missing result_id'], 400); + } + + $result = Result::find($resultId); + + if (! $result) { + Log::warning('callback for unknown result', ['result_id' => $resultId]); + + return response()->json(['error' => 'result not found'], 404); + } + + $status = $request->input('status'); + + if ($status === 'failed') { + return $this->handleFailed($result, $request); + } + + if ($status === 'completed') { + return $this->handleCompleted($result, $request); + } + + // Running status update + if ($status === 'running') { + $result->update(['status' => ResultStatus::Running]); + SpeedtestRunning::dispatch($result); + + return response()->json(['status' => 'acknowledged']); + } + + return response()->json(['status' => 'ignored']); + } + + /** + * Handle completed speedtest callback. + */ + protected function handleCompleted(Result $result, Request $request): JsonResponse + { + Log::info('speedtest completed via rust service', [ + 'result_id' => $result->id, + 'test_id' => $request->input('test_id'), + ]); + + // Extract data from Rust service response + $rawOutput = $request->input('raw_output', []); + + // Update result with speedtest data + $result->update([ + 'ping' => $request->input('ping'), + 'download' => $this->mbpsToBytes($request->input('download')), + 'upload' => $this->mbpsToBytes($request->input('upload')), + 'download_bytes' => Arr::get($rawOutput, 'download.bytes'), + 'upload_bytes' => Arr::get($rawOutput, 'upload.bytes'), + 'data' => $rawOutput, + 'status' => ResultStatus::Benchmarking, + ]); + + // Dispatch benchmark job + BenchmarkSpeedtestJob::dispatch($result); + + // Complete the speedtest + $result->update(['status' => ResultStatus::Completed]); + SpeedtestCompleted::dispatch($result); + + return response()->json(['status' => 'processed']); + } + + /** + * Handle failed speedtest callback. + */ + protected function handleFailed(Result $result, Request $request): JsonResponse + { + Log::error('speedtest failed via rust service', [ + 'result_id' => $result->id, + 'error' => $request->input('error'), + ]); + + $result->update([ + 'data->type' => 'log', + 'data->level' => 'error', + 'data->message' => $request->input('error', 'Unknown error from Rust service'), + 'status' => ResultStatus::Failed, + ]); + + SpeedtestFailed::dispatch($result); + + return response()->json(['status' => 'processed']); + } + + /** + * Convert Mbps to bytes per second (Ookla format). + */ + protected function mbpsToBytes(?float $mbps): ?int + { + if ($mbps === null) { + return null; + } + + // Convert Mbps to bytes/s (Ookla stores bandwidth in bytes/s) + return (int) (($mbps * 1_000_000) / 8); + } +} diff --git a/compose.yaml b/compose.yaml index a4ccb9e57..da6512e4b 100644 --- a/compose.yaml +++ b/compose.yaml @@ -16,6 +16,7 @@ services: LARAVEL_SAIL: 1 XDEBUG_MODE: '${SAIL_XDEBUG_MODE:-off}' XDEBUG_CONFIG: '${SAIL_XDEBUG_CONFIG:-client_host=host.docker.internal}' + RUST_SPEEDTEST_URL: 'http://rust-speedtest:3000' volumes: - '.:/var/www/html' networks: @@ -24,6 +25,30 @@ services: - pgsql - mailpit - apprise + - rust-speedtest + rust-speedtest: + build: + context: ./rust-speedtest + dockerfile: Dockerfile + image: speedtest-tracker/rust-speedtest + ports: + - '${FORWARD_RUST_SPEEDTEST_PORT:-3000}:3000' + environment: + PORT: 3000 + MAX_CONCURRENT: '${RUST_MAX_CONCURRENT:-3}' + RUST_LOG: '${RUST_LOG:-rust_speedtest=info,tower_http=info}' + networks: + - sail + healthcheck: + test: + - CMD + - curl + - '-f' + - 'http://localhost:3000/health' + interval: 30s + timeout: 10s + start_period: 5s + retries: 3 pgsql: image: 'postgres:18-alpine' ports: diff --git a/config/speedtest.php b/config/speedtest.php index 4226f5a0a..d5fcbc158 100644 --- a/config/speedtest.php +++ b/config/speedtest.php @@ -21,6 +21,8 @@ /** * Speedtest settings. */ + 'rust_service_url' => env('RUST_SPEEDTEST_URL'), + 'schedule' => env('SPEEDTEST_SCHEDULE', false), 'servers' => env('SPEEDTEST_SERVERS'), diff --git a/routes/api/v1/routes.php b/routes/api/v1/routes.php index 0bc2c6bbc..4d32ac814 100644 --- a/routes/api/v1/routes.php +++ b/routes/api/v1/routes.php @@ -2,6 +2,7 @@ use App\Http\Controllers\Api\V1\OoklaController; use App\Http\Controllers\Api\V1\ResultsController; +use App\Http\Controllers\Api\V1\SpeedtestCallbackController; use App\Http\Controllers\Api\V1\SpeedtestController; use App\Http\Controllers\Api\V1\StatsController; use Illuminate\Support\Facades\Route; @@ -19,6 +20,9 @@ Route::post('/speedtests/run', SpeedtestController::class) ->name('speedtests.run'); + Route::post('/speedtest/callback', SpeedtestCallbackController::class) + ->name('speedtest.callback'); + Route::get('/ookla/list-servers', OoklaController::class) ->name('ookla.list-servers'); diff --git a/rust-speedtest/.gitignore b/rust-speedtest/.gitignore new file mode 100644 index 000000000..ca98cd96e --- /dev/null +++ b/rust-speedtest/.gitignore @@ -0,0 +1,2 @@ +/target/ +Cargo.lock diff --git a/rust-speedtest/Cargo.toml b/rust-speedtest/Cargo.toml new file mode 100644 index 000000000..9b22976af --- /dev/null +++ b/rust-speedtest/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "rust-speedtest" +version = "0.1.0" +edition = "2021" +description = "Concurrent speedtest runner service" + +[dependencies] +axum = "0.8" +tokio = { version = "1.43", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +reqwest = { version = "0.12", features = ["json"] } +uuid = { version = "1.11", features = ["v4", "serde"] } +chrono = { version = "0.4", features = ["serde"] } +tower-http = { version = "0.6", features = ["cors", "trace"] } +anyhow = "1.0" +thiserror = "2.0" + +[dev-dependencies] +tokio-test = "0.4" +axum-test = "18" + +[profile.release] +opt-level = 3 +lto = true diff --git a/rust-speedtest/Dockerfile b/rust-speedtest/Dockerfile new file mode 100644 index 000000000..f2f652983 --- /dev/null +++ b/rust-speedtest/Dockerfile @@ -0,0 +1,51 @@ +# Build stage +FROM rust:1.84-bookworm AS builder + +WORKDIR /app + +# Copy manifests +COPY Cargo.toml Cargo.lock* ./ + +# Create dummy src for dependency caching +RUN mkdir src && echo "fn main() {}" > src/main.rs + +# Build dependencies (cached layer) +RUN cargo build --release && rm -rf src target/release/deps/rust_speedtest* + +# Copy actual source +COPY src ./src + +# Build application +RUN cargo build --release + +# Runtime stage +FROM debian:bookworm-slim + +# Install Ookla Speedtest CLI +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + curl \ + gnupg \ + && curl -s https://packagecloud.io/install/repositories/ookla/speedtest-cli/script.deb.sh | bash \ + && apt-get install -y speedtest \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Copy binary from builder +COPY --from=builder /app/target/release/rust-speedtest /app/rust-speedtest + +# Create non-root user +RUN useradd -r -s /bin/false speedtest && chown -R speedtest:speedtest /app +USER speedtest + +ENV PORT=3000 +ENV MAX_CONCURRENT=3 +ENV RUST_LOG=rust_speedtest=info,tower_http=info + +EXPOSE 3000 + +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:3000/health || exit 1 + +CMD ["/app/rust-speedtest"] diff --git a/rust-speedtest/src/main.rs b/rust-speedtest/src/main.rs new file mode 100644 index 000000000..6b94003b7 --- /dev/null +++ b/rust-speedtest/src/main.rs @@ -0,0 +1,124 @@ +mod models; +mod routes; +mod runner; +mod speedtest; + +use std::env; +use std::net::SocketAddr; +use std::time::Duration; + +use axum::{ + routing::{get, post}, + Router, +}; +use tokio::signal; +use tokio::time::interval; +use tower_http::cors::{Any, CorsLayer}; +use tower_http::trace::TraceLayer; +use tracing::{info, warn}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use crate::runner::SpeedtestRunner; +use crate::speedtest::check_ookla_available; + +const DEFAULT_PORT: u16 = 3000; +const DEFAULT_MAX_CONCURRENT: usize = 3; +const CLEANUP_INTERVAL_SECS: u64 = 300; +const TEST_RETENTION_SECS: u64 = 3600; + +#[tokio::main] +async fn main() { + // Initialize tracing + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "rust_speedtest=debug,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Parse configuration from environment + let port: u16 = env::var("PORT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(DEFAULT_PORT); + + let max_concurrent: usize = env::var("MAX_CONCURRENT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(DEFAULT_MAX_CONCURRENT); + + info!(port = port, max_concurrent = max_concurrent, "starting rust-speedtest service"); + + // Check if Ookla CLI is available + if check_ookla_available().await { + info!("ookla speedtest cli detected"); + } else { + warn!("ookla speedtest cli not found - tests will fail"); + } + + // Create the runner + let runner = SpeedtestRunner::new(Some(max_concurrent)); + + // Spawn cleanup task + let cleanup_runner = runner.clone(); + tokio::spawn(async move { + let mut cleanup_interval = interval(Duration::from_secs(CLEANUP_INTERVAL_SECS)); + loop { + cleanup_interval.tick().await; + cleanup_runner.cleanup_old_tests(TEST_RETENTION_SECS).await; + } + }); + + // Build router + let cors = CorsLayer::new() + .allow_origin(Any) + .allow_methods(Any) + .allow_headers(Any); + + let app = Router::new() + .route("/speedtest", post(routes::queue_speedtest)) + .route("/speedtest/{id}", get(routes::get_speedtest_status)) + .route("/health", get(routes::health_check)) + .route("/stats", get(routes::get_stats)) + .with_state(runner) + .layer(TraceLayer::new_for_http()) + .layer(cors); + + // Start server + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + info!(address = %addr, "listening"); + + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap(); + + info!("server shutdown complete"); +} + +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install ctrl+c handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => info!("received ctrl+c"), + _ = terminate => info!("received sigterm"), + } +} diff --git a/rust-speedtest/src/models.rs b/rust-speedtest/src/models.rs new file mode 100644 index 000000000..bd179af18 --- /dev/null +++ b/rust-speedtest/src/models.rs @@ -0,0 +1,359 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TestStatus { + Pending, + Running, + Completed, + Failed, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpeedtestRequest { + /// Optional server ID to test against + pub server_id: Option, + /// Callback URL to POST results to + pub callback_url: Option, + /// Optional metadata to pass through to callback + pub metadata: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpeedtestResponse { + pub test_id: Uuid, + pub status: TestStatus, + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpeedtestResult { + pub test_id: Uuid, + pub status: TestStatus, + pub download: Option, // Mbps + pub upload: Option, // Mbps + pub ping: Option, // ms + pub jitter: Option, // ms + pub packet_loss: Option, // percentage + pub server: Option, + pub isp: Option, + pub external_ip: Option, + pub result_url: Option, + pub raw_output: Option, + pub error: Option, + pub started_at: DateTime, + pub completed_at: Option>, + pub metadata: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServerInfo { + pub id: i64, + pub name: String, + pub location: String, + pub country: String, + pub host: String, + pub port: Option, +} + +/// Ookla CLI JSON output structure +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OoklaOutput { + #[serde(rename = "type")] + pub result_type: String, + pub timestamp: String, + pub ping: Option, + pub download: Option, + pub upload: Option, + pub packet_loss: Option, + pub isp: Option, + pub interface: Option, + pub server: Option, + pub result: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OoklaPing { + pub jitter: f64, + pub latency: f64, + pub low: Option, + pub high: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OoklaTransfer { + pub bandwidth: i64, // bytes per second + pub bytes: i64, + pub elapsed: i64, + pub latency: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OoklaLatencyStats { + pub iqm: f64, + pub low: f64, + pub high: f64, + pub jitter: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OoklaInterface { + #[serde(rename = "internalIp")] + pub internal_ip: Option, + #[serde(rename = "externalIp")] + pub external_ip: Option, + #[serde(rename = "macAddr")] + pub mac_addr: Option, + #[serde(rename = "isVpn")] + pub is_vpn: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OoklaServer { + pub id: i64, + pub name: String, + pub location: String, + pub country: String, + pub host: String, + pub port: Option, + pub ip: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OoklaResult { + pub id: Option, + pub url: Option, + pub persisted: Option, +} + +/// speedtest-cli (Python) JSON output structure +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpeedtestCliOutput { + pub download: f64, // bits per second + pub upload: f64, // bits per second + pub ping: f64, // ms + pub server: SpeedtestCliServer, + pub timestamp: String, + pub bytes_sent: i64, + pub bytes_received: i64, + pub share: Option, + pub client: SpeedtestCliClient, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpeedtestCliServer { + pub url: Option, + pub lat: Option, + pub lon: Option, + pub name: String, + pub country: String, + pub cc: Option, + pub sponsor: Option, + pub id: String, + pub host: String, + #[serde(rename = "d")] + pub distance: Option, + pub latency: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpeedtestCliClient { + pub ip: String, + pub lat: Option, + pub lon: Option, + pub isp: String, + pub isprating: Option, + pub rating: Option, + pub ispdlavg: Option, + pub ispulavg: Option, + pub loggedin: Option, + pub country: Option, +} + +impl SpeedtestCliOutput { + /// Convert speedtest-cli output to our SpeedtestResult format + pub fn to_result(&self, test_id: Uuid, started_at: DateTime, metadata: Option) -> SpeedtestResult { + // speedtest-cli outputs in bits per second, convert to Mbps + let download = self.download / 1_000_000.0; + let upload = self.upload / 1_000_000.0; + + let server = ServerInfo { + id: self.server.id.parse().unwrap_or(0), + name: self.server.name.clone(), + location: format!("{}, {}", self.server.name, self.server.country), + country: self.server.country.clone(), + host: self.server.host.clone(), + port: None, + }; + + SpeedtestResult { + test_id, + status: TestStatus::Completed, + download: Some(download), + upload: Some(upload), + ping: Some(self.ping), + jitter: None, // speedtest-cli doesn't provide jitter + packet_loss: None, + server: Some(server), + isp: Some(self.client.isp.clone()), + external_ip: Some(self.client.ip.clone()), + result_url: self.share.clone(), + raw_output: serde_json::to_value(self).ok(), + error: None, + started_at, + completed_at: Some(Utc::now()), + metadata, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthResponse { + pub status: String, + pub version: String, + pub active_tests: usize, + pub max_concurrent: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TestStatusResponse { + pub test_id: Uuid, + pub status: TestStatus, + pub result: Option, +} + +impl OoklaOutput { + /// Convert Ookla CLI output to our SpeedtestResult format + pub fn to_result(&self, test_id: Uuid, started_at: DateTime, metadata: Option) -> SpeedtestResult { + let download = self.download.as_ref().map(|d| { + // Convert bytes/s to Mbps + (d.bandwidth as f64 * 8.0) / 1_000_000.0 + }); + + let upload = self.upload.as_ref().map(|u| { + (u.bandwidth as f64 * 8.0) / 1_000_000.0 + }); + + let ping = self.ping.as_ref().map(|p| p.latency); + let jitter = self.ping.as_ref().map(|p| p.jitter); + + let server = self.server.as_ref().map(|s| ServerInfo { + id: s.id, + name: s.name.clone(), + location: s.location.clone(), + country: s.country.clone(), + host: s.host.clone(), + port: s.port, + }); + + let external_ip = self.interface.as_ref().and_then(|i| i.external_ip.clone()); + let result_url = self.result.as_ref().and_then(|r| r.url.clone()); + + SpeedtestResult { + test_id, + status: TestStatus::Completed, + download, + upload, + ping, + jitter, + packet_loss: self.packet_loss, + server, + isp: self.isp.clone(), + external_ip, + result_url, + raw_output: serde_json::to_value(self).ok(), + error: None, + started_at, + completed_at: Some(Utc::now()), + metadata, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_ookla_output() { + let json = r#"{ + "type": "result", + "timestamp": "2024-01-18T10:00:00Z", + "ping": {"jitter": 1.5, "latency": 10.2, "low": 9.0, "high": 12.0}, + "download": {"bandwidth": 12500000, "bytes": 100000000, "elapsed": 8000}, + "upload": {"bandwidth": 6250000, "bytes": 50000000, "elapsed": 8000}, + "packetLoss": 0.5, + "isp": "Test ISP", + "interface": {"internalIp": "192.168.1.1", "externalIp": "1.2.3.4"}, + "server": {"id": 12345, "name": "Test Server", "location": "City", "country": "Country", "host": "speedtest.example.com"}, + "result": {"id": "abc123", "url": "https://speedtest.net/result/abc123"} + }"#; + + let output: OoklaOutput = serde_json::from_str(json).unwrap(); + + assert_eq!(output.result_type, "result"); + assert!(output.ping.is_some()); + assert_eq!(output.ping.as_ref().unwrap().latency, 10.2); + assert!(output.download.is_some()); + assert_eq!(output.download.as_ref().unwrap().bandwidth, 12500000); + } + + #[test] + fn test_convert_to_result() { + let json = r#"{ + "type": "result", + "timestamp": "2024-01-18T10:00:00Z", + "ping": {"jitter": 1.5, "latency": 10.2}, + "download": {"bandwidth": 12500000, "bytes": 100000000, "elapsed": 8000}, + "upload": {"bandwidth": 6250000, "bytes": 50000000, "elapsed": 8000}, + "server": {"id": 12345, "name": "Test Server", "location": "City", "country": "Country", "host": "speedtest.example.com"} + }"#; + + let output: OoklaOutput = serde_json::from_str(json).unwrap(); + let test_id = Uuid::new_v4(); + let started_at = Utc::now(); + + let result = output.to_result(test_id, started_at, None); + + assert_eq!(result.test_id, test_id); + assert_eq!(result.status, TestStatus::Completed); + // 12500000 bytes/s * 8 / 1_000_000 = 100 Mbps + assert!((result.download.unwrap() - 100.0).abs() < 0.01); + // 6250000 bytes/s * 8 / 1_000_000 = 50 Mbps + assert!((result.upload.unwrap() - 50.0).abs() < 0.01); + assert_eq!(result.ping, Some(10.2)); + assert_eq!(result.jitter, Some(1.5)); + } + + #[test] + fn test_speedtest_request_deserialize() { + let json = r#"{"server_id": 12345, "callback_url": "http://localhost/callback"}"#; + let request: SpeedtestRequest = serde_json::from_str(json).unwrap(); + + assert_eq!(request.server_id, Some(12345)); + assert_eq!(request.callback_url, Some("http://localhost/callback".to_string())); + } + + #[test] + fn test_speedtest_response_serialize() { + let response = SpeedtestResponse { + test_id: Uuid::nil(), + status: TestStatus::Pending, + message: "test queued".to_string(), + }; + + let json = serde_json::to_string(&response).unwrap(); + assert!(json.contains("pending")); + assert!(json.contains("test queued")); + } + + #[test] + fn test_test_status_serialization() { + assert_eq!(serde_json::to_string(&TestStatus::Pending).unwrap(), "\"pending\""); + assert_eq!(serde_json::to_string(&TestStatus::Running).unwrap(), "\"running\""); + assert_eq!(serde_json::to_string(&TestStatus::Completed).unwrap(), "\"completed\""); + assert_eq!(serde_json::to_string(&TestStatus::Failed).unwrap(), "\"failed\""); + } +} diff --git a/rust-speedtest/src/routes.rs b/rust-speedtest/src/routes.rs new file mode 100644 index 000000000..ce8f5d418 --- /dev/null +++ b/rust-speedtest/src/routes.rs @@ -0,0 +1,175 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use tracing::info; +use uuid::Uuid; + +use crate::models::{HealthResponse, SpeedtestRequest, SpeedtestResponse, TestStatus, TestStatusResponse}; +use crate::runner::SpeedtestRunner; +use crate::speedtest::check_ookla_available; + +const VERSION: &str = env!("CARGO_PKG_VERSION"); + +/// POST /speedtest - Queue a new speedtest +pub async fn queue_speedtest( + State(runner): State, + Json(request): Json, +) -> impl IntoResponse { + info!(server_id = ?request.server_id, callback = ?request.callback_url, "received speedtest request"); + + match runner.queue_test(request).await { + Ok(test_id) => { + let response = SpeedtestResponse { + test_id, + status: TestStatus::Pending, + message: "test queued".to_string(), + }; + (StatusCode::ACCEPTED, Json(response)) + } + Err(e) => { + let response = SpeedtestResponse { + test_id: Uuid::nil(), + status: TestStatus::Failed, + message: e, + }; + (StatusCode::SERVICE_UNAVAILABLE, Json(response)) + } + } +} + +/// GET /speedtest/:id - Get status of a specific test +pub async fn get_speedtest_status( + State(runner): State, + Path(test_id): Path, +) -> impl IntoResponse { + match runner.get_test_status(test_id).await { + Some((status, result)) => { + let response = TestStatusResponse { + test_id, + status, + result, + }; + (StatusCode::OK, Json(response)).into_response() + } + None => (StatusCode::NOT_FOUND, "test not found").into_response(), + } +} + +/// GET /health - Health check endpoint +pub async fn health_check(State(runner): State) -> impl IntoResponse { + let ookla_available = check_ookla_available().await; + let active_tests = runner.active_test_count().await; + let max_concurrent = runner.max_concurrent(); + + let status = if ookla_available { "healthy" } else { "degraded" }; + + let response = HealthResponse { + status: status.to_string(), + version: VERSION.to_string(), + active_tests, + max_concurrent, + }; + + if ookla_available { + (StatusCode::OK, Json(response)) + } else { + (StatusCode::SERVICE_UNAVAILABLE, Json(response)) + } +} + +/// GET /stats - Get runner statistics +pub async fn get_stats(State(runner): State) -> impl IntoResponse { + let active = runner.active_test_count().await; + let pending = runner.pending_test_count().await; + let max = runner.max_concurrent(); + + Json(serde_json::json!({ + "active_tests": active, + "pending_tests": pending, + "max_concurrent": max, + "available_slots": max.saturating_sub(active), + })) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{routing::{get, post}, Router}; + use axum_test::TestServer; + + fn create_test_app() -> Router { + let runner = SpeedtestRunner::new(Some(2)); + Router::new() + .route("/speedtest", post(queue_speedtest)) + .route("/speedtest/{id}", get(get_speedtest_status)) + .route("/health", get(health_check)) + .route("/stats", get(get_stats)) + .with_state(runner) + } + + #[tokio::test] + async fn test_health_endpoint() { + let app = create_test_app(); + let server = TestServer::new(app).unwrap(); + + let response = server.get("/health").await; + + // May be degraded if ookla cli not installed, but should return 200 or 503 + let status = response.status_code(); + assert!(status == 200 || status == 503); + + let body: HealthResponse = response.json(); + assert!(body.status == "healthy" || body.status == "degraded"); + assert_eq!(body.max_concurrent, 2); + } + + #[tokio::test] + async fn test_stats_endpoint() { + let app = create_test_app(); + let server = TestServer::new(app).unwrap(); + + let response = server.get("/stats").await; + response.assert_status_ok(); + + let body: serde_json::Value = response.json(); + assert_eq!(body["max_concurrent"], 2); + assert_eq!(body["active_tests"], 0); + assert_eq!(body["pending_tests"], 0); + } + + #[tokio::test] + async fn test_queue_speedtest() { + let app = create_test_app(); + let server = TestServer::new(app).unwrap(); + + let response = server + .post("/speedtest") + .json(&SpeedtestRequest { + server_id: None, + callback_url: None, + metadata: None, + }) + .await; + + response.assert_status(axum::http::StatusCode::ACCEPTED); + + let body: SpeedtestResponse = response.json(); + assert_eq!(body.status, TestStatus::Pending); + assert!(!body.test_id.is_nil()); + } + + #[tokio::test] + async fn test_get_nonexistent_test() { + let app = create_test_app(); + let server = TestServer::new(app).unwrap(); + + let response = server + .get(&format!("/speedtest/{}", Uuid::new_v4())) + .await; + + response.assert_status(axum::http::StatusCode::NOT_FOUND); + } +} diff --git a/rust-speedtest/src/runner.rs b/rust-speedtest/src/runner.rs new file mode 100644 index 000000000..80ecee41c --- /dev/null +++ b/rust-speedtest/src/runner.rs @@ -0,0 +1,214 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use tokio::sync::{mpsc, RwLock, Semaphore}; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +use crate::models::{SpeedtestRequest, SpeedtestResult, TestStatus}; +use crate::speedtest::run_speedtest; + +const DEFAULT_MAX_CONCURRENT: usize = 3; +const QUEUE_SIZE: usize = 100; + +#[derive(Clone)] +pub struct SpeedtestRunner { + semaphore: Arc, + max_concurrent: usize, + tests: Arc>>, + request_tx: mpsc::Sender, +} + +struct TestEntry { + status: TestStatus, + result: Option, +} + +struct TestJob { + test_id: Uuid, + request: SpeedtestRequest, +} + +impl SpeedtestRunner { + pub fn new(max_concurrent: Option) -> Self { + let max_concurrent = max_concurrent.unwrap_or(DEFAULT_MAX_CONCURRENT); + let semaphore = Arc::new(Semaphore::new(max_concurrent)); + let tests: Arc>> = Arc::new(RwLock::new(HashMap::new())); + let (request_tx, request_rx) = mpsc::channel::(QUEUE_SIZE); + + let runner = Self { + semaphore, + max_concurrent, + tests, + request_tx, + }; + + // Start the worker that processes queued jobs + runner.spawn_worker(request_rx); + + runner + } + + fn spawn_worker(&self, mut request_rx: mpsc::Receiver) { + let semaphore = self.semaphore.clone(); + let tests = self.tests.clone(); + + tokio::spawn(async move { + while let Some(job) = request_rx.recv().await { + let sem = semaphore.clone(); + let tests_map = tests.clone(); + + tokio::spawn(async move { + // Acquire semaphore permit + let _permit = match sem.acquire().await { + Ok(p) => p, + Err(_) => { + error!(test_id = %job.test_id, "semaphore closed"); + return; + } + }; + + debug!(test_id = %job.test_id, "acquired semaphore permit"); + + // Update status to running + { + let mut map = tests_map.write().await; + if let Some(entry) = map.get_mut(&job.test_id) { + entry.status = TestStatus::Running; + } + } + + // Run the speedtest + let result = run_speedtest( + job.test_id, + job.request.server_id, + job.request.metadata.clone(), + None, + ) + .await; + + // Send callback if URL provided + if let Some(ref callback_url) = job.request.callback_url { + send_callback(callback_url, &result).await; + } + + // Store result + { + let mut map = tests_map.write().await; + if let Some(entry) = map.get_mut(&job.test_id) { + entry.status = result.status; + entry.result = Some(result); + } + } + + debug!(test_id = %job.test_id, "released semaphore permit"); + }); + } + }); + } + + /// Queue a new speedtest request + pub async fn queue_test(&self, request: SpeedtestRequest) -> Result { + let test_id = Uuid::new_v4(); + + // Add to tracking map + { + let mut map = self.tests.write().await; + map.insert( + test_id, + TestEntry { + status: TestStatus::Pending, + result: None, + }, + ); + } + + // Queue the job + let job = TestJob { test_id, request }; + + self.request_tx.send(job).await.map_err(|e| { + error!(test_id = %test_id, error = %e, "failed to queue test"); + format!("queue full or closed: {}", e) + })?; + + info!(test_id = %test_id, "test queued"); + Ok(test_id) + } + + /// Get the status of a specific test + pub async fn get_test_status(&self, test_id: Uuid) -> Option<(TestStatus, Option)> { + let map = self.tests.read().await; + map.get(&test_id) + .map(|e| (e.status, e.result.clone())) + } + + /// Get count of active (running) tests + pub async fn active_test_count(&self) -> usize { + let map = self.tests.read().await; + map.values() + .filter(|e| e.status == TestStatus::Running) + .count() + } + + /// Get count of pending tests + pub async fn pending_test_count(&self) -> usize { + let map = self.tests.read().await; + map.values() + .filter(|e| e.status == TestStatus::Pending) + .count() + } + + /// Get max concurrent limit + pub fn max_concurrent(&self) -> usize { + self.max_concurrent + } + + /// Clean up completed tests older than specified age + pub async fn cleanup_old_tests(&self, max_age_secs: u64) { + let cutoff = chrono::Utc::now() - chrono::Duration::seconds(max_age_secs as i64); + let mut map = self.tests.write().await; + let initial_count = map.len(); + + map.retain(|_, entry| { + match &entry.result { + Some(result) if entry.status == TestStatus::Completed || entry.status == TestStatus::Failed => { + result.completed_at.map(|t| t > cutoff).unwrap_or(true) + } + _ => true, + } + }); + + let removed = initial_count - map.len(); + if removed > 0 { + info!(removed = removed, "cleaned up old test entries"); + } + } +} + +async fn send_callback(url: &str, result: &SpeedtestResult) { + info!(test_id = %result.test_id, url = %url, "sending callback"); + + let client = reqwest::Client::new(); + match client + .post(url) + .json(result) + .timeout(std::time::Duration::from_secs(30)) + .send() + .await + { + Ok(response) => { + if response.status().is_success() { + info!(test_id = %result.test_id, status = %response.status(), "callback sent successfully"); + } else { + warn!( + test_id = %result.test_id, + status = %response.status(), + "callback returned non-success status" + ); + } + } + Err(e) => { + error!(test_id = %result.test_id, error = %e, "failed to send callback"); + } + } +} diff --git a/rust-speedtest/src/speedtest.rs b/rust-speedtest/src/speedtest.rs new file mode 100644 index 000000000..2c2d395e1 --- /dev/null +++ b/rust-speedtest/src/speedtest.rs @@ -0,0 +1,260 @@ +use std::process::Stdio; +use std::sync::OnceLock; +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use chrono::Utc; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; +use tokio::time::timeout; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +use crate::models::{OoklaOutput, SpeedtestCliOutput, SpeedtestResult, TestStatus}; + +const DEFAULT_TIMEOUT_SECS: u64 = 120; +const SPEEDTEST_BINARY: &str = "speedtest"; + +#[derive(Debug, Clone, Copy, PartialEq)] +enum CliType { + Ookla, // Official Ookla CLI + Python, // speedtest-cli Python package + Unknown, +} + +static CLI_TYPE: OnceLock = OnceLock::new(); + +/// Detect which speedtest CLI is installed +async fn detect_cli_type() -> CliType { + // Try Ookla CLI first (has --format flag) + let ookla_check = Command::new(SPEEDTEST_BINARY) + .arg("--help") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output() + .await; + + if let Ok(output) = ookla_check { + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + let combined = format!("{}{}", stdout, stderr); + + if combined.contains("--format") && combined.contains("--accept-license") { + info!("detected ookla speedtest cli"); + return CliType::Ookla; + } else if combined.contains("--json") && combined.contains("speedtest-cli") { + info!("detected speedtest-cli (python)"); + return CliType::Python; + } + } + + warn!("unknown speedtest cli type"); + CliType::Unknown +} + +/// Get or detect the CLI type +async fn get_cli_type() -> CliType { + if let Some(cli_type) = CLI_TYPE.get() { + return *cli_type; + } + + let detected = detect_cli_type().await; + CLI_TYPE.get_or_init(|| detected); + detected +} + +/// Run a single speedtest using the available CLI +pub async fn run_speedtest( + test_id: Uuid, + server_id: Option, + metadata: Option, + timeout_secs: Option, +) -> SpeedtestResult { + let started_at = Utc::now(); + let timeout_duration = Duration::from_secs(timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS)); + + info!(test_id = %test_id, server_id = ?server_id, "starting speedtest"); + + let cli_type = get_cli_type().await; + + let result = match cli_type { + CliType::Ookla => { + timeout(timeout_duration, execute_ookla_cli(test_id, server_id)).await + } + CliType::Python => { + timeout(timeout_duration, execute_python_cli(test_id, server_id)).await + } + CliType::Unknown => { + // Try Python CLI as fallback + timeout(timeout_duration, execute_python_cli(test_id, server_id)).await + } + }; + + match result { + Ok(Ok(speedtest_result)) => { + info!( + test_id = %test_id, + download = ?speedtest_result.download, + upload = ?speedtest_result.upload, + "speedtest completed" + ); + SpeedtestResult { + metadata, + ..speedtest_result + } + } + Ok(Err(e)) => { + error!(test_id = %test_id, error = %e, "speedtest failed"); + SpeedtestResult { + test_id, + status: TestStatus::Failed, + download: None, + upload: None, + ping: None, + jitter: None, + packet_loss: None, + server: None, + isp: None, + external_ip: None, + result_url: None, + raw_output: None, + error: Some(e.to_string()), + started_at, + completed_at: Some(Utc::now()), + metadata, + } + } + Err(_) => { + error!(test_id = %test_id, timeout_secs = timeout_duration.as_secs(), "speedtest timed out"); + SpeedtestResult { + test_id, + status: TestStatus::Failed, + download: None, + upload: None, + ping: None, + jitter: None, + packet_loss: None, + server: None, + isp: None, + external_ip: None, + result_url: None, + raw_output: None, + error: Some(format!("test timed out after {}s", timeout_duration.as_secs())), + started_at, + completed_at: Some(Utc::now()), + metadata, + } + } + } +} + +/// Execute the official Ookla CLI +async fn execute_ookla_cli(test_id: Uuid, server_id: Option) -> Result { + let started_at = Utc::now(); + let mut cmd = Command::new(SPEEDTEST_BINARY); + cmd.arg("--format=json") + .arg("--accept-license") + .arg("--accept-gdpr") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + if let Some(sid) = server_id { + cmd.arg(format!("--server-id={}", sid)); + } + + debug!(test_id = %test_id, "spawning ookla cli"); + + let mut child = cmd.spawn().map_err(|e| { + anyhow!("failed to spawn speedtest cli: {} - is it installed?", e) + })?; + + let stdout = child.stdout.take().ok_or_else(|| anyhow!("failed to capture stdout"))?; + let stderr = child.stderr.take().ok_or_else(|| anyhow!("failed to capture stderr"))?; + + let mut stdout_reader = BufReader::new(stdout).lines(); + let mut stderr_reader = BufReader::new(stderr).lines(); + + let mut final_output: Option = None; + let mut last_error = String::new(); + + // Ookla CLI outputs multiple JSON objects (progress updates), we want the final "result" type + tokio::select! { + _ = async { + while let Ok(Some(line)) = stdout_reader.next_line().await { + debug!(test_id = %test_id, line = %line, "ookla stdout"); + if let Ok(output) = serde_json::from_str::(&line) { + if output.result_type == "result" { + final_output = Some(output); + } + } + } + } => {} + _ = async { + while let Ok(Some(line)) = stderr_reader.next_line().await { + warn!(test_id = %test_id, line = %line, "ookla stderr"); + last_error = line; + } + } => {} + } + + let status = child.wait().await?; + + if !status.success() && final_output.is_none() { + return Err(anyhow!( + "speedtest cli exited with status {}: {}", + status.code().unwrap_or(-1), + last_error + )); + } + + let output = final_output.ok_or_else(|| anyhow!("no result output from speedtest cli"))?; + Ok(output.to_result(test_id, started_at, None)) +} + +/// Execute the speedtest-cli Python CLI +async fn execute_python_cli(test_id: Uuid, server_id: Option) -> Result { + let started_at = Utc::now(); + let mut cmd = Command::new(SPEEDTEST_BINARY); + cmd.arg("--json") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + if let Some(sid) = server_id { + cmd.arg("--server").arg(sid.to_string()); + } + + debug!(test_id = %test_id, "spawning speedtest-cli (python)"); + + let output = cmd.output().await.map_err(|e| { + anyhow!("failed to run speedtest-cli: {} - is it installed?", e) + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(anyhow!( + "speedtest-cli exited with status {}: {}", + output.status.code().unwrap_or(-1), + stderr + )); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + debug!(test_id = %test_id, output = %stdout, "speedtest-cli output"); + + let cli_output: SpeedtestCliOutput = serde_json::from_str(&stdout) + .map_err(|e| anyhow!("failed to parse speedtest-cli output: {}", e))?; + + Ok(cli_output.to_result(test_id, started_at, None)) +} + +/// Check if any speedtest CLI is available +pub async fn check_ookla_available() -> bool { + Command::new(SPEEDTEST_BINARY) + .arg("--version") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .await + .map(|s| s.success()) + .unwrap_or(false) +} diff --git a/todo.md b/todo.md new file mode 100644 index 000000000..51c8688af --- /dev/null +++ b/todo.md @@ -0,0 +1,75 @@ +# Speedtest Tracker - Changelog + +## 2026-01-18: Rust Concurrent Speedtest Runner + +### Added + +#### Rust Service (`rust-speedtest/`) +- `Cargo.toml` - project manifest with axum, tokio, serde, reqwest dependencies +- `src/main.rs` - entry point with server setup, graceful shutdown, cleanup task +- `src/routes.rs` - API endpoints: POST /speedtest, GET /speedtest/:id, GET /health, GET /stats +- `src/speedtest.rs` - Ookla CLI wrapper with JSON parsing, timeout handling +- `src/runner.rs` - concurrent execution manager with semaphore rate limiting +- `src/models.rs` - request/response types, Ookla JSON output structures +- `Dockerfile` - multi-stage build with Ookla CLI installation +- `.gitignore` - ignore target/ and Cargo.lock + +#### Laravel Integration +- `app/Actions/Ookla/TriggerRustSpeedtest.php` - new action that triggers speedtest via Rust service with fallback to PHP +- `app/Http/Controllers/Api/V1/SpeedtestCallbackController.php` - handles callbacks from Rust service + +### Modified +- `config/speedtest.php` - added `rust_service_url` config option (env: RUST_SPEEDTEST_URL) +- `routes/api/v1/routes.php` - added POST /speedtest/callback route +- `compose.yaml` - added rust-speedtest service with healthcheck + +### Configuration +- `RUST_SPEEDTEST_URL` - URL of Rust service (default: http://rust-speedtest:3000 in Docker) +- `RUST_MAX_CONCURRENT` - max concurrent tests (default: 3) +- `RUST_LOG` - log level (default: rust_speedtest=info,tower_http=info) +- `FORWARD_RUST_SPEEDTEST_PORT` - exposed port for Rust service (default: 3000) + +### Architecture +``` +Laravel → HTTP POST /speedtest → Rust Service + ↓ + Queue + Semaphore + ↓ + Ookla CLI (concurrent) + ↓ + HTTP POST callback → Laravel +``` + +### CLI Support +Automatically detects and supports: +- **Ookla CLI** (official) - `--format=json --accept-license --accept-gdpr` +- **speedtest-cli** (Python) - `--json` + +### Usage +```bash +# Build release +cd rust-speedtest && cargo build --release + +# Start service +./target/release/rust-speedtest + +# Run a speedtest +curl -X POST localhost:3000/speedtest -H "Content-Type: application/json" -d '{}' + +# Check status +curl localhost:3000/speedtest/{test_id} + +# Health check +curl localhost:3000/health + +# Stats +curl localhost:3000/stats + +# Docker compose +docker compose up -d +``` + +### Test Results (2026-01-18) +- Single test: 303 Mbps down, 56 Mbps up, 7.7ms ping +- Concurrent (3x): All completed, semaphore rate limiting working +- 9 unit tests passing