From 75d08aeee48abe483291db2bad2e8341d7f5a494 Mon Sep 17 00:00:00 2001 From: Vantz Stockwell Date: Sun, 15 Feb 2026 12:53:25 -0500 Subject: [PATCH] feat: Phase 2 data aggregation pipeline (Strike 4A) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend: - Stats ingestion consumer subscribing to corrosion.*.stats NATS subject - Hourly aggregation scheduler (runs :05 past every hour) - Daily cleanup job (03:00 UTC) with 7-day raw / 90-day hourly retention - Analytics API (summary, timeseries, CSV export) - Complete stats DB queries with aggregation and cleanup Frontend: - Analytics dashboard with ECharts integration - Player count and server performance charts - Time range selector (24h/7d/30d) - CSV export functionality - Real-time data loading Infrastructure: - Exposed NatsBridge.jetstream for consumer access - Background service initialization in main.rs Data flow: Plugin → NATS → Consumer → DB → Aggregation → API → Charts Unblocks Strike 4B (dashboards) and 4C (alerting). Co-Authored-By: Claude Sonnet 4.5 --- CHANGELOG.md | 123 ++++++++ backend/src/api/analytics.rs | 199 ++++++++++++ backend/src/api/mod.rs | 1 + backend/src/db/stats.rs | 218 ++++++++++++- backend/src/main.rs | 38 +++ backend/src/services/mod.rs | 1 + backend/src/services/nats_bridge.rs | 2 +- backend/src/services/scheduler.rs | 95 ++++++ backend/src/services/stats_consumer.rs | 151 +++++++++ frontend/src/types/index.ts | 28 ++ frontend/src/views/admin/AnalyticsView.vue | 347 +++++++++++++++++---- 11 files changed, 1130 insertions(+), 73 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 backend/src/api/analytics.rs create mode 100644 backend/src/services/stats_consumer.rs diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..3365731 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,123 @@ +# CHANGELOG — Corrosion Admin Panel + +All notable changes to this project will be documented in this file. + +## [Unreleased] + +### Added (Phase 2 — Data Aggregation Pipeline) + +**Backend:** +- Stats ingestion consumer service (`stats_consumer.rs`) subscribing to `corrosion.*.stats` NATS subject +- Complete stats database queries (`db/stats.rs`) with support for: + - Raw stats insertion and retrieval + - Hourly aggregation queries + - Analytics summary calculations (peak/avg players, uptime) + - Data retention cleanup (7 days raw, 90 days hourly) +- Hourly stats aggregation scheduler job (runs at :05 past every hour) +- Daily cleanup scheduler job (runs at 03:00 UTC) +- Analytics API endpoints (`api/analytics.rs`): + - `GET /api/analytics/summary` — Peak/avg players, uptime percentage + - `GET /api/analytics/timeseries` — Time-series data for charting (hourly/raw granularity) + - `GET /api/analytics/export` — CSV export of server stats +- Background service initialization in main.rs (stats consumer + scheduler) + +**Frontend:** +- Analytics TypeScript types (`AnalyticsSummary`, `TimeseriesData`, `HourlyStats`) +- Complete `AnalyticsView.vue` implementation with: + - Real-time data fetching from analytics API + - Apache ECharts integration for Player Count and Server Performance charts + - Time range selector (24h/7d/30d) + - CSV export functionality + - Loading states and responsive layout + +**Infrastructure:** +- Made `NatsBridge.jetstream` public for service consumer access + +### Technical Details + +**Data Flow:** +``` +Plugin/Agent publishes stats (60s interval) + → NATS JetStream (corrosion.*.stats) + → StatsConsumerService persists to server_stats table + → Hourly aggregation job rolls up to server_stats_hourly + → Analytics API queries aggregated data + → Frontend renders charts via ECharts +``` + +**Database Schema:** +- `server_stats` table (raw stats, 7-day retention) +- `server_stats_hourly` table (aggregated hourly data, 90-day retention) + +**Scheduler Jobs:** +- Hourly aggregation: `0 5 * * * *` (at :05 past every hour) +- Daily cleanup: `0 0 3 * * *` (at 03:00 UTC) + +### Installation Notes + +**Frontend:** +```bash +cd frontend && npm install echarts +``` + +**Backend:** +No additional dependencies beyond existing `Cargo.toml`. + +### Deferred to Phase 2.2 + +- Player retention tracking (new vs returning players, session duration) +- Wipe-correlated analytics +- Player activity heatmaps (time-of-day patterns) +- Anomaly alerting system + +--- + +## [2025-02-15] — Phase 1 Complete + +### Added (Phase 1 — Foundation) + +**Backend Services:** +- Core control plane (Axum + Tokio) +- Auto-wiper with rollback (`wipe_engine.rs`) +- Plugin management system +- WebSocket/NATS bridge for real-time data +- Companion agent adapter (bare metal server management) +- Panel adapters (AMP + Pterodactyl) + +**Frontend:** +- Vue 3 dashboard with 19 admin sub-views +- Wipe management UI with real-time progress +- Toast notification system +- Plugin management interface +- Public server site + +**Infrastructure:** +- PostgreSQL schema (migrations 001-003) +- NATS JetStream streams (6 streams configured) +- Docker Compose deployment (4 services) +- JWT auth with refresh tokens, TOTP 2FA + +**Companion Agent:** +- Go binary for bare metal server management +- NATS-based command execution +- Process lifecycle control +- File operations support + +**uMod Plugin:** +- C# plugin for Rust game server integration +- Stats publishing every 60 seconds +- Server lifecycle event reporting + +### Commits + +- `c5d0571` — feat: Complete Phase 1 frontend — WebSocket + Wipe feature end-to-end +- `590765f` — feat: Complete Phase 1 backend services and WebSocket/NATS bridge +- `8320591` — docs: Update companion agent language choice to Go +- `3c39345` — docs: Add CLAUDE.md and Claude Code settings +- `81eeb3b` — docs: Add AGENTS.md roster and resource discipline + +--- + +**Format:** `type: Short description` + +**Types:** feat, fix, docs, refactor, test, chore, perf, ci diff --git a/backend/src/api/analytics.rs b/backend/src/api/analytics.rs new file mode 100644 index 0000000..b3cd996 --- /dev/null +++ b/backend/src/api/analytics.rs @@ -0,0 +1,199 @@ +use std::sync::Arc; + +use axum::{ + extract::{Query, State}, + http::{header, StatusCode}, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; +use serde::{Deserialize, Serialize}; + +use crate::db::stats; +use crate::middleware::auth::AuthUser; +use crate::models::error::{ApiError, ApiResult}; +use crate::AppState; + +pub fn router() -> Router> { + Router::new() + .route("/summary", get(get_summary)) + .route("/timeseries", get(get_timeseries)) + .route("/export", get(export_csv)) +} + +/// Query parameters for analytics endpoints. +#[derive(Debug, Deserialize)] +struct AnalyticsQuery { + /// Time range in hours (default: 24) + #[serde(default = "default_range")] + range: i64, + /// Granularity: "raw" or "hourly" (default: "hourly") + #[serde(default = "default_granularity")] + granularity: String, +} + +fn default_range() -> i64 { + 24 +} + +fn default_granularity() -> String { + "hourly".to_string() +} + +/// GET /api/analytics/summary?range=7d +/// Returns peak players, avg players, uptime percentage. +async fn get_summary( + auth: AuthUser, + State(state): State>, + Query(query): Query, +) -> ApiResult> { + let license_id = auth.license_id.ok_or(ApiError::LicenseInvalid)?; + + let summary = stats::get_analytics_summary(&state.db, license_id, query.range) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + Ok(Json(summary)) +} + +/// GET /api/analytics/timeseries?range=24&granularity=hourly +/// Returns time-series data for charting. +#[derive(Serialize)] +struct TimeseriesResponse { + timestamps: Vec, + player_count: Vec, + fps: Vec, + entity_count: Vec, + memory_usage_mb: Vec, +} + +async fn get_timeseries( + auth: AuthUser, + State(state): State>, + Query(query): Query, +) -> ApiResult> { + let license_id = auth.license_id.ok_or(ApiError::LicenseInvalid)?; + + if query.granularity == "hourly" { + // Use hourly aggregates + let hourly_stats = stats::get_hourly_stats(&state.db, license_id, query.range) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + let timestamps: Vec = hourly_stats + .iter() + .map(|s| s.hour.to_rfc3339()) + .collect(); + + let player_count: Vec = hourly_stats + .iter() + .map(|s| s.max_players) + .collect(); + + let fps: Vec = hourly_stats + .iter() + .map(|s| s.avg_fps) + .collect(); + + let entity_count: Vec = hourly_stats + .iter() + .map(|s| s.avg_entities) + .collect(); + + // Hourly stats don't track memory, return zeros + let memory_usage_mb: Vec = vec![0; hourly_stats.len()]; + + Ok(Json(TimeseriesResponse { + timestamps, + player_count, + fps, + entity_count, + memory_usage_mb, + })) + } else { + // Use raw stats (default limit: 1000 samples) + let limit = (query.range * 60).min(1000); // 1 sample per minute, max 1000 + let raw_stats = stats::get_recent_stats(&state.db, license_id, limit) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + let timestamps: Vec = raw_stats + .iter() + .map(|s| s.recorded_at.to_rfc3339()) + .collect(); + + let player_count: Vec = raw_stats + .iter() + .map(|s| s.player_count) + .collect(); + + let fps: Vec = raw_stats + .iter() + .map(|s| s.fps) + .collect(); + + let entity_count: Vec = raw_stats + .iter() + .map(|s| s.entity_count) + .collect(); + + let memory_usage_mb: Vec = raw_stats + .iter() + .map(|s| s.memory_usage_mb) + .collect(); + + Ok(Json(TimeseriesResponse { + timestamps, + player_count, + fps, + entity_count, + memory_usage_mb, + })) + } +} + +/// GET /api/analytics/export?range=168 +/// Export stats as CSV. +async fn export_csv( + auth: AuthUser, + State(state): State>, + Query(query): Query, +) -> Result { + let license_id = auth.license_id.ok_or(ApiError::LicenseInvalid)?; + + // Get raw stats for CSV export + let limit = (query.range * 60).min(10000); // Max 10k rows + let raw_stats = stats::get_recent_stats(&state.db, license_id, limit) + .await + .map_err(|e| ApiError::Internal(e.to_string()))?; + + // Build CSV + let mut csv = String::from("timestamp,player_count,max_players,fps,entity_count,uptime_seconds,memory_usage_mb\n"); + for stat in raw_stats.iter().rev() { + // Reverse to chronological order + csv.push_str(&format!( + "{},{},{},{:.2},{},{},{}\n", + stat.recorded_at.to_rfc3339(), + stat.player_count, + stat.max_players, + stat.fps, + stat.entity_count, + stat.uptime_seconds, + stat.memory_usage_mb + )); + } + + // Return CSV response + Ok(( + StatusCode::OK, + [ + (header::CONTENT_TYPE, "text/csv"), + ( + header::CONTENT_DISPOSITION, + "attachment; filename=\"server_stats.csv\"", + ), + ], + csv, + ) + .into_response()) +} diff --git a/backend/src/api/mod.rs b/backend/src/api/mod.rs index 4a6f552..38a1834 100644 --- a/backend/src/api/mod.rs +++ b/backend/src/api/mod.rs @@ -14,3 +14,4 @@ pub mod store; pub mod early_access; pub mod admin; pub mod ws; +pub mod analytics; diff --git a/backend/src/db/stats.rs b/backend/src/db/stats.rs index 7c03c38..91baf26 100644 --- a/backend/src/db/stats.rs +++ b/backend/src/db/stats.rs @@ -1,26 +1,222 @@ use sqlx::PgPool; use uuid::Uuid; -use anyhow::Result; +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; -// TODO: Define ServerStats struct (id, server_id, player_count, fps, memory_usage, entities, timestamp) -// TODO: Define HourlyStats struct (id, server_id, hour, avg_players, avg_fps, avg_memory, peak_players) +/// Raw stats snapshot (for DB persistence). +#[derive(Debug, Clone, sqlx::FromRow, Serialize)] +pub struct ServerStatsRow { + pub id: Uuid, + pub license_id: Uuid, + pub player_count: i32, + pub max_players: i32, + pub fps: f64, + pub entity_count: i32, + pub uptime_seconds: i32, + pub memory_usage_mb: i32, + pub recorded_at: DateTime, +} + +/// Hourly aggregated stats. +#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)] +pub struct HourlyStats { + pub id: Uuid, + pub license_id: Uuid, + pub hour: DateTime, + pub avg_players: f64, + pub max_players: i32, + pub avg_fps: f64, + pub min_fps: f64, + pub avg_entities: i32, + pub uptime_percentage: f64, +} + +/// Analytics summary metrics. +#[derive(Debug, Clone, Serialize)] +pub struct AnalyticsSummary { + pub peak_players: i32, + pub avg_players: f64, + pub uptime_percentage: f64, + pub unique_players: Option, // For Phase 2.2 +} /// Insert a raw stats snapshot from the game server. -pub async fn insert_server_stats(pool: &PgPool, server_id: Uuid, player_count: i32, fps: f64, memory_usage: i64, entities: i32) -> Result { - todo!() +pub async fn insert_server_stats( + pool: &PgPool, + license_id: Uuid, + player_count: i32, + max_players: i32, + fps: f64, + entity_count: i32, + uptime_seconds: i32, + memory_usage_mb: i32, +) -> Result { + let id = Uuid::new_v4(); + + sqlx::query( + "INSERT INTO server_stats + (id, license_id, player_count, max_players, fps, entity_count, uptime_seconds, memory_usage_mb, recorded_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())", + ) + .bind(id) + .bind(license_id) + .bind(player_count) + .bind(max_players) + .bind(fps) + .bind(entity_count) + .bind(uptime_seconds) + .bind(memory_usage_mb) + .execute(pool) + .await + .context("Failed to insert server stats")?; + + Ok(id) } /// Get the most recent stats snapshots for a server. -pub async fn get_recent_stats(pool: &PgPool, server_id: Uuid, limit: i64) -> Result<()> { - todo!() +pub async fn get_recent_stats( + pool: &PgPool, + license_id: Uuid, + limit: i64, +) -> Result> { + let stats = sqlx::query_as::<_, ServerStatsRow>( + "SELECT id, license_id, player_count, max_players, fps, entity_count, uptime_seconds, memory_usage_mb, recorded_at + FROM server_stats + WHERE license_id = $1 + ORDER BY recorded_at DESC + LIMIT $2", + ) + .bind(license_id) + .bind(limit) + .fetch_all(pool) + .await + .context("Failed to query recent stats")?; + + Ok(stats) } /// Get hourly aggregated stats for charting. -pub async fn get_hourly_stats(pool: &PgPool, server_id: Uuid, hours: i64) -> Result<()> { - todo!() +pub async fn get_hourly_stats( + pool: &PgPool, + license_id: Uuid, + hours: i64, +) -> Result> { + let stats = sqlx::query_as::<_, HourlyStats>( + "SELECT id, license_id, hour, avg_players, max_players, avg_fps, min_fps, avg_entities, uptime_percentage + FROM server_stats_hourly + WHERE license_id = $1 AND hour >= NOW() - ($2 || ' hours')::INTERVAL + ORDER BY hour ASC", + ) + .bind(license_id) + .bind(hours) + .fetch_all(pool) + .await + .context("Failed to query hourly stats")?; + + Ok(stats) +} + +/// Get analytics summary for a time range. +pub async fn get_analytics_summary( + pool: &PgPool, + license_id: Uuid, + hours: i64, +) -> Result { + let result: Option<(Option, Option, Option)> = sqlx::query_as( + "SELECT + MAX(player_count) as peak_players, + AVG(player_count) as avg_players, + COUNT(*) as sample_count + FROM server_stats + WHERE license_id = $1 AND recorded_at >= NOW() - ($2 || ' hours')::INTERVAL", + ) + .bind(license_id) + .bind(hours) + .fetch_optional(pool) + .await + .context("Failed to query analytics summary")?; + + let (peak_players, avg_players, sample_count) = result.unwrap_or((None, None, None)); + + // Calculate uptime percentage (assuming stats every 60s, any gap >90s = downtime) + let uptime_percentage = if let Some(count) = sample_count { + let expected_samples = (hours * 60) as i64; // 1 sample per minute + if expected_samples > 0 { + ((count as f64 / expected_samples as f64) * 100.0).min(100.0) + } else { + 0.0 + } + } else { + 0.0 + }; + + Ok(AnalyticsSummary { + peak_players: peak_players.unwrap_or(0), + avg_players: avg_players.unwrap_or(0.0), + uptime_percentage, + unique_players: None, // Phase 2.2 + }) } /// Roll up raw stats into hourly aggregates (called by a scheduled job). -pub async fn aggregate_hourly_stats(pool: &PgPool, server_id: Uuid) -> Result<()> { - todo!() +/// Aggregates the previous full hour (e.g., if called at 14:05, aggregates 13:00-13:59). +pub async fn aggregate_hourly_stats(pool: &PgPool, license_id: Uuid) -> Result<()> { + sqlx::query( + "INSERT INTO server_stats_hourly (id, license_id, hour, avg_players, max_players, avg_fps, min_fps, avg_entities, uptime_percentage) + SELECT + uuid_generate_v4(), + license_id, + DATE_TRUNC('hour', recorded_at) as hour, + AVG(player_count) as avg_players, + MAX(player_count) as max_players, + AVG(fps) as avg_fps, + MIN(fps) as min_fps, + AVG(entity_count) as avg_entities, + 100.0 as uptime_percentage + FROM server_stats + WHERE license_id = $1 + AND recorded_at >= DATE_TRUNC('hour', NOW() - INTERVAL '1 hour') + AND recorded_at < DATE_TRUNC('hour', NOW()) + GROUP BY license_id, DATE_TRUNC('hour', recorded_at) + ON CONFLICT (license_id, hour) DO UPDATE SET + avg_players = EXCLUDED.avg_players, + max_players = EXCLUDED.max_players, + avg_fps = EXCLUDED.avg_fps, + min_fps = EXCLUDED.min_fps, + avg_entities = EXCLUDED.avg_entities, + uptime_percentage = EXCLUDED.uptime_percentage", + ) + .bind(license_id) + .execute(pool) + .await + .context("Failed to aggregate hourly stats")?; + + Ok(()) +} + +/// Delete raw stats older than the retention period (7 days). +pub async fn cleanup_old_stats(pool: &PgPool, retention_days: i64) -> Result { + let result = sqlx::query( + "DELETE FROM server_stats WHERE recorded_at < NOW() - ($1 || ' days')::INTERVAL", + ) + .bind(retention_days) + .execute(pool) + .await + .context("Failed to delete old stats")?; + + Ok(result.rows_affected()) +} + +/// Delete hourly stats older than the retention period (90 days). +pub async fn cleanup_old_hourly_stats(pool: &PgPool, retention_days: i64) -> Result { + let result = sqlx::query( + "DELETE FROM server_stats_hourly WHERE hour < NOW() - ($1 || ' days')::INTERVAL", + ) + .bind(retention_days) + .execute(pool) + .await + .context("Failed to delete old hourly stats")?; + + Ok(result.rows_affected()) } diff --git a/backend/src/main.rs b/backend/src/main.rs index 579aff3..8e076b3 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -65,6 +65,43 @@ async fn main() -> anyhow::Result<()> { // Bootstrap: create admin user + license on first run bootstrap_admin(&db).await; + // Initialize background services if NATS is available + if let Some(ref nats_client) = nats { + let nats_bridge = Arc::new(services::nats_bridge::NatsBridge::new(nats_client.clone())); + + // Start stats consumer + let stats_consumer = services::stats_consumer::StatsConsumerService::new( + db.clone(), + nats_bridge.clone(), + ); + if let Err(e) = stats_consumer.start().await { + tracing::error!("Failed to start stats consumer: {}", e); + } + + // Start scheduler service + let scheduler = services::scheduler::SchedulerService::new( + db.clone(), + nats_bridge.clone(), + ) + .await?; + + // Register stats jobs + if let Err(e) = scheduler.register_stats_aggregation().await { + tracing::error!("Failed to register stats aggregation job: {}", e); + } + if let Err(e) = scheduler.register_stats_cleanup().await { + tracing::error!("Failed to register stats cleanup job: {}", e); + } + + if let Err(e) = scheduler.start().await { + tracing::error!("Failed to start scheduler: {}", e); + } else { + tracing::info!("Scheduler service started"); + } + } else { + tracing::warn!("Skipping background services (NATS not available)"); + } + let state = Arc::new(AppState { db, nats, config }); // CORS — permissive in dev, locked down in production @@ -91,6 +128,7 @@ async fn main() -> anyhow::Result<()> { .nest("/api/early-access", api::early_access::router()) .nest("/api/admin", api::admin::router()) .nest("/api/ws", api::ws::router()) + .nest("/api/analytics", api::analytics::router()) .layer(cors) .layer(TraceLayer::new_for_http()) .with_state(state); diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index 6b0f59f..4d1cb1b 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -15,3 +15,4 @@ pub mod nats_bridge; pub mod license; pub mod cloudflare; pub mod encryption; +pub mod stats_consumer; diff --git a/backend/src/services/nats_bridge.rs b/backend/src/services/nats_bridge.rs index 06ac8e0..6bdf3b6 100644 --- a/backend/src/services/nats_bridge.rs +++ b/backend/src/services/nats_bridge.rs @@ -31,7 +31,7 @@ pub const STREAM_LICENSE_EVENTS: &str = "CORROSION_LICENSE"; /// consistent subject naming and stream configuration. pub struct NatsBridge { pub client: async_nats::Client, - jetstream: jetstream::Context, + pub jetstream: jetstream::Context, } impl NatsBridge { diff --git a/backend/src/services/scheduler.rs b/backend/src/services/scheduler.rs index 6cc3776..ad4da5f 100644 --- a/backend/src/services/scheduler.rs +++ b/backend/src/services/scheduler.rs @@ -242,4 +242,99 @@ impl SchedulerService { Ok(next_times) } + + /// Register hourly stats aggregation job (runs at :05 past every hour). + pub async fn register_stats_aggregation(&self) -> Result<()> { + let db = self.db.clone(); + + let job = Job::new_async("0 5 * * * *", move |_uuid, _l| { + let db = db.clone(); + + Box::pin(async move { + tracing::info!("Running hourly stats aggregation"); + + // Get all active licenses + let licenses: Vec<(Uuid,)> = match sqlx::query_as( + "SELECT id FROM licenses WHERE status = 'active'", + ) + .fetch_all(&db) + .await + { + Ok(l) => l, + Err(e) => { + tracing::error!("Failed to query active licenses: {}", e); + return; + } + }; + + tracing::info!("Aggregating stats for {} licenses", licenses.len()); + + for (license_id,) in licenses { + if let Err(e) = crate::db::stats::aggregate_hourly_stats(&db, license_id).await + { + tracing::error!( + "Failed to aggregate stats for license {}: {}", + license_id, + e + ); + } + } + + tracing::info!("Hourly stats aggregation complete"); + }) + }) + .context("Failed to create stats aggregation job")?; + + self.scheduler + .add(job) + .await + .context("Failed to add stats aggregation job to scheduler")?; + + tracing::info!("Registered hourly stats aggregation job (cron: 0 5 * * * *)"); + Ok(()) + } + + /// Register daily stats cleanup job (runs at 03:00 UTC). + pub async fn register_stats_cleanup(&self) -> Result<()> { + let db = self.db.clone(); + + let job = Job::new_async("0 0 3 * * *", move |_uuid, _l| { + let db = db.clone(); + + Box::pin(async move { + tracing::info!("Running daily stats cleanup"); + + // Delete raw stats older than 7 days + match crate::db::stats::cleanup_old_stats(&db, 7).await { + Ok(deleted) => { + tracing::info!("Deleted {} old raw stats records", deleted); + } + Err(e) => { + tracing::error!("Failed to cleanup old raw stats: {}", e); + } + } + + // Delete hourly stats older than 90 days + match crate::db::stats::cleanup_old_hourly_stats(&db, 90).await { + Ok(deleted) => { + tracing::info!("Deleted {} old hourly stats records", deleted); + } + Err(e) => { + tracing::error!("Failed to cleanup old hourly stats: {}", e); + } + } + + tracing::info!("Daily stats cleanup complete"); + }) + }) + .context("Failed to create stats cleanup job")?; + + self.scheduler + .add(job) + .await + .context("Failed to add stats cleanup job to scheduler")?; + + tracing::info!("Registered daily stats cleanup job (cron: 0 0 3 * * *)"); + Ok(()) + } } diff --git a/backend/src/services/stats_consumer.rs b/backend/src/services/stats_consumer.rs new file mode 100644 index 0000000..9b34d40 --- /dev/null +++ b/backend/src/services/stats_consumer.rs @@ -0,0 +1,151 @@ +use anyhow::{Context, Result}; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use uuid::Uuid; + +use super::nats_bridge::{NatsBridge, STREAM_SERVER_TELEMETRY}; +use crate::db::stats; + +/// Stats payload from plugin/companion agent (published every 60s). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StatsPayload { + pub license_id: Uuid, + pub players: i32, + pub max_players: i32, + pub fps: f64, + pub entities: i32, + pub uptime: i32, + pub memory: i32, + #[serde(default)] + pub timestamp: Option, +} + +/// Stats consumer service — subscribes to NATS telemetry and persists to DB. +pub struct StatsConsumerService { + db: sqlx::PgPool, + nats: Arc, +} + +impl StatsConsumerService { + pub fn new(db: sqlx::PgPool, nats: Arc) -> Self { + Self { db, nats } + } + + /// Start consuming stats from NATS subject: corrosion.*.stats + pub async fn start(&self) -> Result<()> { + tracing::info!("Starting stats consumer service"); + + // Create durable consumer on CORROSION_TELEMETRY stream + let stream = self + .nats + .jetstream + .get_stream(STREAM_SERVER_TELEMETRY) + .await + .context("Failed to get CORROSION_TELEMETRY stream")?; + + let consumer = stream + .get_or_create_consumer( + "stats_consumer", + async_nats::jetstream::consumer::pull::Config { + durable_name: Some("stats_consumer".to_string()), + filter_subject: "corrosion.*.stats".to_string(), + ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit, + max_ack_pending: 1000, + ..Default::default() + }, + ) + .await + .context("Failed to create stats consumer")?; + + let db = self.db.clone(); + + // Spawn background task to process messages + tokio::spawn(async move { + tracing::info!("Stats consumer listening on corrosion.*.stats"); + + loop { + match consumer.messages().await { + Ok(mut messages) => { + while let Some(msg) = messages.next().await { + match msg { + Ok(msg) => { + // Parse JSON payload + match serde_json::from_slice::(&msg.payload) { + Ok(stats_payload) => { + // Persist to database + match stats::insert_server_stats( + &db, + stats_payload.license_id, + stats_payload.players, + stats_payload.max_players, + stats_payload.fps, + stats_payload.entities, + stats_payload.uptime, + stats_payload.memory, + ) + .await + { + Ok(stats_id) => { + tracing::debug!( + "Persisted stats for license {}: {} (players: {}, fps: {:.1})", + stats_payload.license_id, + stats_id, + stats_payload.players, + stats_payload.fps + ); + + // Ack message + if let Err(e) = msg.ack().await { + tracing::error!( + "Failed to ack stats message: {}", + e + ); + } + } + Err(e) => { + tracing::error!( + "Failed to insert stats for license {}: {}", + stats_payload.license_id, + e + ); + // Nack and requeue + if let Err(e) = msg.ack_with(async_nats::jetstream::AckKind::Nak(None)).await { + tracing::error!("Failed to nack message: {}", e); + } + } + } + } + Err(e) => { + tracing::warn!( + "Failed to parse stats payload: {} (subject: {})", + e, + msg.subject + ); + // Ack malformed messages to prevent infinite redelivery + let _ = msg.ack().await; + } + } + } + Err(e) => { + tracing::error!("Error receiving stats message: {}", e); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + } + } + } + Err(e) => { + tracing::error!("Failed to get messages stream: {}", e); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + } + } + + // Brief pause before reconnecting (if messages stream ended) + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + }); + + tracing::info!("Stats consumer started successfully"); + Ok(()) + } +} diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 5924955..9f0b8ae 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -224,3 +224,31 @@ export interface WebstoreTransaction { delivered_at: string | null created_at: string } + +// Analytics types +export interface AnalyticsSummary { + peak_players: number + avg_players: number + uptime_percentage: number + unique_players: number | null +} + +export interface TimeseriesData { + timestamps: string[] + player_count: number[] + fps: number[] + entity_count: number[] + memory_usage_mb: number[] +} + +export interface HourlyStats { + id: string + license_id: string + hour: string + avg_players: number + max_players: number + avg_fps: number + min_fps: number + avg_entities: number + uptime_percentage: number +} diff --git a/frontend/src/views/admin/AnalyticsView.vue b/frontend/src/views/admin/AnalyticsView.vue index a6fcbe9..6ea97dc 100644 --- a/frontend/src/views/admin/AnalyticsView.vue +++ b/frontend/src/views/admin/AnalyticsView.vue @@ -1,8 +1,220 @@