use anyhow::{Context, Result}; use futures::StreamExt; use serde::{Deserialize, Serialize}; use std::sync::Arc; use uuid::Uuid; use super::nats_bridge::{NatsBridge, STREAM_SERVER_TELEMETRY}; use crate::db::stats; /// Stats payload from plugin/companion agent (published every 60s). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StatsPayload { pub license_id: Uuid, pub players: i32, pub max_players: i32, pub fps: f64, pub entities: i32, pub uptime: i32, pub memory: i32, #[serde(default)] pub timestamp: Option, } /// Stats consumer service — subscribes to NATS telemetry and persists to DB. pub struct StatsConsumerService { db: sqlx::PgPool, nats: Arc, } impl StatsConsumerService { pub fn new(db: sqlx::PgPool, nats: Arc) -> Self { Self { db, nats } } /// Start consuming stats from NATS subject: corrosion.*.stats pub async fn start(&self) -> Result<()> { tracing::info!("Starting stats consumer service"); // Create durable consumer on CORROSION_TELEMETRY stream let stream = self .nats .jetstream .get_stream(STREAM_SERVER_TELEMETRY) .await .context("Failed to get CORROSION_TELEMETRY stream")?; let consumer = stream .get_or_create_consumer( "stats_consumer", async_nats::jetstream::consumer::pull::Config { durable_name: Some("stats_consumer".to_string()), filter_subject: "corrosion.*.stats".to_string(), ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit, max_ack_pending: 1000, ..Default::default() }, ) .await .context("Failed to create stats consumer")?; let db = self.db.clone(); // Spawn background task to process messages tokio::spawn(async move { tracing::info!("Stats consumer listening on corrosion.*.stats"); loop { match consumer.messages().await { Ok(mut messages) => { while let Some(msg) = messages.next().await { match msg { Ok(msg) => { // Parse JSON payload match serde_json::from_slice::(&msg.payload) { Ok(stats_payload) => { // Fetch current map_id for map analytics tracking let map_id = stats::get_current_map_id(&db, stats_payload.license_id) .await .unwrap_or(None); // Persist to database match stats::insert_server_stats( &db, stats_payload.license_id, stats_payload.players, stats_payload.max_players, stats_payload.fps, stats_payload.entities, stats_payload.uptime, stats_payload.memory, map_id, ) .await { Ok(stats_id) => { tracing::debug!( "Persisted stats for license {}: {} (players: {}, fps: {:.1})", stats_payload.license_id, stats_id, stats_payload.players, stats_payload.fps ); // Ack message if let Err(e) = msg.ack().await { tracing::error!( "Failed to ack stats message: {}", e ); } } Err(e) => { tracing::error!( "Failed to insert stats for license {}: {}", stats_payload.license_id, e ); // Nack and requeue if let Err(e) = msg.ack_with(async_nats::jetstream::AckKind::Nak(None)).await { tracing::error!("Failed to nack message: {}", e); } } } } Err(e) => { tracing::warn!( "Failed to parse stats payload: {} (subject: {})", e, msg.subject ); // Ack malformed messages to prevent infinite redelivery let _ = msg.ack().await; } } } Err(e) => { tracing::error!("Error receiving stats message: {}", e); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } } } } Err(e) => { tracing::error!("Failed to get messages stream: {}", e); tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; } } // Brief pause before reconnecting (if messages stream ended) tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } }); tracing::info!("Stats consumer started successfully"); Ok(()) } }