feat: Phase 2 data aggregation pipeline (Strike 4A)
Backend: - Stats ingestion consumer subscribing to corrosion.*.stats NATS subject - Hourly aggregation scheduler (runs :05 past every hour) - Daily cleanup job (03:00 UTC) with 7-day raw / 90-day hourly retention - Analytics API (summary, timeseries, CSV export) - Complete stats DB queries with aggregation and cleanup Frontend: - Analytics dashboard with ECharts integration - Player count and server performance charts - Time range selector (24h/7d/30d) - CSV export functionality - Real-time data loading Infrastructure: - Exposed NatsBridge.jetstream for consumer access - Background service initialization in main.rs Data flow: Plugin → NATS → Consumer → DB → Aggregation → API → Charts Unblocks Strike 4B (dashboards) and 4C (alerting). Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -15,3 +15,4 @@ pub mod nats_bridge;
|
||||
pub mod license;
|
||||
pub mod cloudflare;
|
||||
pub mod encryption;
|
||||
pub mod stats_consumer;
|
||||
|
||||
@@ -31,7 +31,7 @@ pub const STREAM_LICENSE_EVENTS: &str = "CORROSION_LICENSE";
|
||||
/// consistent subject naming and stream configuration.
|
||||
pub struct NatsBridge {
|
||||
pub client: async_nats::Client,
|
||||
jetstream: jetstream::Context,
|
||||
pub jetstream: jetstream::Context,
|
||||
}
|
||||
|
||||
impl NatsBridge {
|
||||
|
||||
@@ -242,4 +242,99 @@ impl SchedulerService {
|
||||
|
||||
Ok(next_times)
|
||||
}
|
||||
|
||||
/// Register hourly stats aggregation job (runs at :05 past every hour).
|
||||
pub async fn register_stats_aggregation(&self) -> Result<()> {
|
||||
let db = self.db.clone();
|
||||
|
||||
let job = Job::new_async("0 5 * * * *", move |_uuid, _l| {
|
||||
let db = db.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
tracing::info!("Running hourly stats aggregation");
|
||||
|
||||
// Get all active licenses
|
||||
let licenses: Vec<(Uuid,)> = match sqlx::query_as(
|
||||
"SELECT id FROM licenses WHERE status = 'active'",
|
||||
)
|
||||
.fetch_all(&db)
|
||||
.await
|
||||
{
|
||||
Ok(l) => l,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to query active licenses: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!("Aggregating stats for {} licenses", licenses.len());
|
||||
|
||||
for (license_id,) in licenses {
|
||||
if let Err(e) = crate::db::stats::aggregate_hourly_stats(&db, license_id).await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to aggregate stats for license {}: {}",
|
||||
license_id,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Hourly stats aggregation complete");
|
||||
})
|
||||
})
|
||||
.context("Failed to create stats aggregation job")?;
|
||||
|
||||
self.scheduler
|
||||
.add(job)
|
||||
.await
|
||||
.context("Failed to add stats aggregation job to scheduler")?;
|
||||
|
||||
tracing::info!("Registered hourly stats aggregation job (cron: 0 5 * * * *)");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Register daily stats cleanup job (runs at 03:00 UTC).
|
||||
pub async fn register_stats_cleanup(&self) -> Result<()> {
|
||||
let db = self.db.clone();
|
||||
|
||||
let job = Job::new_async("0 0 3 * * *", move |_uuid, _l| {
|
||||
let db = db.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
tracing::info!("Running daily stats cleanup");
|
||||
|
||||
// Delete raw stats older than 7 days
|
||||
match crate::db::stats::cleanup_old_stats(&db, 7).await {
|
||||
Ok(deleted) => {
|
||||
tracing::info!("Deleted {} old raw stats records", deleted);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to cleanup old raw stats: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Delete hourly stats older than 90 days
|
||||
match crate::db::stats::cleanup_old_hourly_stats(&db, 90).await {
|
||||
Ok(deleted) => {
|
||||
tracing::info!("Deleted {} old hourly stats records", deleted);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to cleanup old hourly stats: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Daily stats cleanup complete");
|
||||
})
|
||||
})
|
||||
.context("Failed to create stats cleanup job")?;
|
||||
|
||||
self.scheduler
|
||||
.add(job)
|
||||
.await
|
||||
.context("Failed to add stats cleanup job to scheduler")?;
|
||||
|
||||
tracing::info!("Registered daily stats cleanup job (cron: 0 0 3 * * *)");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
151
backend/src/services/stats_consumer.rs
Normal file
151
backend/src/services/stats_consumer.rs
Normal file
@@ -0,0 +1,151 @@
|
||||
use anyhow::{Context, Result};
|
||||
use futures::StreamExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::nats_bridge::{NatsBridge, STREAM_SERVER_TELEMETRY};
|
||||
use crate::db::stats;
|
||||
|
||||
/// Stats payload from plugin/companion agent (published every 60s).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct StatsPayload {
|
||||
pub license_id: Uuid,
|
||||
pub players: i32,
|
||||
pub max_players: i32,
|
||||
pub fps: f64,
|
||||
pub entities: i32,
|
||||
pub uptime: i32,
|
||||
pub memory: i32,
|
||||
#[serde(default)]
|
||||
pub timestamp: Option<String>,
|
||||
}
|
||||
|
||||
/// Stats consumer service — subscribes to NATS telemetry and persists to DB.
|
||||
pub struct StatsConsumerService {
|
||||
db: sqlx::PgPool,
|
||||
nats: Arc<NatsBridge>,
|
||||
}
|
||||
|
||||
impl StatsConsumerService {
|
||||
pub fn new(db: sqlx::PgPool, nats: Arc<NatsBridge>) -> Self {
|
||||
Self { db, nats }
|
||||
}
|
||||
|
||||
/// Start consuming stats from NATS subject: corrosion.*.stats
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
tracing::info!("Starting stats consumer service");
|
||||
|
||||
// Create durable consumer on CORROSION_TELEMETRY stream
|
||||
let stream = self
|
||||
.nats
|
||||
.jetstream
|
||||
.get_stream(STREAM_SERVER_TELEMETRY)
|
||||
.await
|
||||
.context("Failed to get CORROSION_TELEMETRY stream")?;
|
||||
|
||||
let consumer = stream
|
||||
.get_or_create_consumer(
|
||||
"stats_consumer",
|
||||
async_nats::jetstream::consumer::pull::Config {
|
||||
durable_name: Some("stats_consumer".to_string()),
|
||||
filter_subject: "corrosion.*.stats".to_string(),
|
||||
ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit,
|
||||
max_ack_pending: 1000,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("Failed to create stats consumer")?;
|
||||
|
||||
let db = self.db.clone();
|
||||
|
||||
// Spawn background task to process messages
|
||||
tokio::spawn(async move {
|
||||
tracing::info!("Stats consumer listening on corrosion.*.stats");
|
||||
|
||||
loop {
|
||||
match consumer.messages().await {
|
||||
Ok(mut messages) => {
|
||||
while let Some(msg) = messages.next().await {
|
||||
match msg {
|
||||
Ok(msg) => {
|
||||
// Parse JSON payload
|
||||
match serde_json::from_slice::<StatsPayload>(&msg.payload) {
|
||||
Ok(stats_payload) => {
|
||||
// Persist to database
|
||||
match stats::insert_server_stats(
|
||||
&db,
|
||||
stats_payload.license_id,
|
||||
stats_payload.players,
|
||||
stats_payload.max_players,
|
||||
stats_payload.fps,
|
||||
stats_payload.entities,
|
||||
stats_payload.uptime,
|
||||
stats_payload.memory,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(stats_id) => {
|
||||
tracing::debug!(
|
||||
"Persisted stats for license {}: {} (players: {}, fps: {:.1})",
|
||||
stats_payload.license_id,
|
||||
stats_id,
|
||||
stats_payload.players,
|
||||
stats_payload.fps
|
||||
);
|
||||
|
||||
// Ack message
|
||||
if let Err(e) = msg.ack().await {
|
||||
tracing::error!(
|
||||
"Failed to ack stats message: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to insert stats for license {}: {}",
|
||||
stats_payload.license_id,
|
||||
e
|
||||
);
|
||||
// Nack and requeue
|
||||
if let Err(e) = msg.ack_with(async_nats::jetstream::AckKind::Nak(None)).await {
|
||||
tracing::error!("Failed to nack message: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Failed to parse stats payload: {} (subject: {})",
|
||||
e,
|
||||
msg.subject
|
||||
);
|
||||
// Ack malformed messages to prevent infinite redelivery
|
||||
let _ = msg.ack().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error receiving stats message: {}", e);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get messages stream: {}", e);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Brief pause before reconnecting (if messages stream ended)
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
});
|
||||
|
||||
tracing::info!("Stats consumer started successfully");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user