//! Host heartbeat: real telemetry, never fabricated. //! //! The Go agent shipped `disk_free_mb: 50000` and `cpu_percent: 0.0` as //! hardcoded placeholders. This module is the first time the panel's //! Resources view receives the truth. Anything we cannot measure is omitted //! or null — never invented. use chrono::{SecondsFormat, Utc}; use rand::Rng; use serde::Serialize; use std::path::Path; use std::sync::Arc; use std::time::Duration; use sysinfo::{Disks, System}; use crate::agent::Agent; use crate::prober::ProbeReport; use crate::subjects; use crate::version; #[derive(Debug, Serialize)] pub struct HeartbeatPayload { /// Wire schema version — lets the backend distinguish v2 host heartbeats /// from legacy Go companion heartbeats during any transition window. pub schema: u32, pub timestamp: String, pub agent: AgentInfo, pub host: HostInfo, pub instances: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub probe: Option, } #[derive(Debug, Serialize)] pub struct AgentInfo { pub version: String, pub commit: String, pub os: String, pub arch: String, pub uptime_seconds: u64, } #[derive(Debug, Serialize)] pub struct HostInfo { #[serde(skip_serializing_if = "Option::is_none")] pub hostname: Option, pub cpu_percent: f32, pub cpu_cores: usize, pub mem_total_mb: u64, pub mem_used_mb: u64, pub uptime_seconds: u64, pub disks: Vec, } #[derive(Debug, Serialize)] pub struct DiskInfo { pub mount: String, pub total_mb: u64, pub free_mb: u64, } #[derive(Debug, Serialize)] pub struct InstanceInfo { pub id: String, pub game: String, #[serde(skip_serializing_if = "Option::is_none")] pub label: Option, /// Process-managed: running/stopped/starting/stopping/crashed. /// Unmanaged (no executable configured): configured/missing_root. pub state: String, pub uptime_seconds: u64, #[serde(skip_serializing_if = "Option::is_none")] pub root_disk_free_mb: Option, } pub async fn run(agent: Arc) { let cancel = agent.shutdown.clone(); let mut sys = System::new(); // CPU usage is a delta between refreshes; prime it once so the first // heartbeat carries a real figure instead of 0. sys.refresh_cpu_usage(); tokio::time::sleep(Duration::from_millis(250)).await; loop { let payload = collect(&agent, &mut sys).await; match serde_json::to_vec(&payload) { Ok(bytes) => { let subject = subjects::host_heartbeat(&agent.cfg.license_id); if let Err(e) = agent.nats.publish(subject, bytes.into()).await { tracing::warn!("heartbeat publish failed: {e}"); } else { tracing::debug!( "heartbeat sent: cpu {:.1}%, {} instance(s)", payload.host.cpu_percent, payload.instances.len() ); } } Err(e) => tracing::error!("heartbeat serialize failed: {e}"), } let jitter = rand::thread_rng().gen_range(0.8..1.2); let interval = Duration::from_secs_f64(agent.cfg.heartbeat_seconds as f64 * jitter); tokio::select! { _ = tokio::time::sleep(interval) => {} _ = cancel.cancelled() => { tracing::info!("telemetry stopping"); break; } } } } pub async fn collect(agent: &Agent, sys: &mut System) -> HeartbeatPayload { sys.refresh_cpu_usage(); sys.refresh_memory(); let disks = Disks::new_with_refreshed_list(); let disk_infos: Vec = disks .iter() .map(|d| DiskInfo { mount: d.mount_point().to_string_lossy().to_string(), total_mb: d.total_space() / 1_048_576, free_mb: d.available_space() / 1_048_576, }) .collect(); let mut instances = Vec::with_capacity(agent.cfg.instances.len()); for inst in &agent.cfg.instances { let (state, uptime_seconds) = match agent.supervisors.get(&inst.id) { Some(sup) if !matches!(sup.state(), crate::process::InstanceState::Unmanaged) => { (sup.state().as_label().to_string(), sup.uptime_seconds().await) } _ => { let exists = inst.root.exists(); ( if exists { "configured" } else { "missing_root" }.to_string(), 0, ) } }; instances.push(InstanceInfo { id: inst.id.clone(), game: inst.game.clone(), label: inst.label.clone(), state, uptime_seconds, root_disk_free_mb: disk_free_for_path(&disks, &inst.root), }); } let instances = instances; HeartbeatPayload { schema: 2, timestamp: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), agent: AgentInfo { version: version::VERSION.to_string(), commit: version::GIT_HASH.to_string(), os: std::env::consts::OS.to_string(), arch: std::env::consts::ARCH.to_string(), uptime_seconds: agent.started.elapsed().as_secs(), }, host: HostInfo { hostname: System::host_name(), cpu_percent: sys.global_cpu_usage(), cpu_cores: sys.cpus().len(), mem_total_mb: sys.total_memory() / 1_048_576, mem_used_mb: sys.used_memory() / 1_048_576, uptime_seconds: System::uptime(), disks: disk_infos, }, instances, probe: agent.last_probe.read().await.clone(), } } /// Free space on the disk whose mount point is the longest prefix of `path`. fn disk_free_for_path(disks: &Disks, path: &Path) -> Option { disks .iter() .filter(|d| path.starts_with(d.mount_point())) .max_by_key(|d| d.mount_point().as_os_str().len()) .map(|d| d.available_space() / 1_048_576) }