Files
corrosion-admin-panel/corrosion-host-agent/src/telemetry.rs
Vantz Stockwell 068a476f39 feat(host-agent): Phase 1a process supervision — instance start/stop/restart/status + push state events
Per-instance ProcessSupervisor: tokio child spawn with proper arg list
(fixes Go's naive space-splitting), graceful SIGTERM with 30s budget
then force kill, monitor task classifying ordered-stop vs crash (exit
code captured), watch-channel state observable everywhere. Instance cmd
channel live on corrosion.{license}.{instance}.cmd (start/stop/restart/
status) with state events pushed on {instance}.status (keep-latest
semantics, documented). Heartbeats now carry live process state +
uptime per instance. Crate restructured lib+bin for integration tests.

Verified: 5 integration tests with real OS processes (lifecycle, crash
exit-code, restart recovery, unmanaged rejection, clean spawn failure)
+ live-NATS contract test (request-reply roundtrips, double-start
rejection, push events, heartbeat state) — all green.

Known limitation (documented): no PID adoption yet — agent restart
orphans a running game process to 'stopped' until panel restart.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 10:44:24 -04:00

186 lines
6.0 KiB
Rust

//! Host heartbeat: real telemetry, never fabricated.
//!
//! The Go agent shipped `disk_free_mb: 50000` and `cpu_percent: 0.0` as
//! hardcoded placeholders. This module is the first time the panel's
//! Resources view receives the truth. Anything we cannot measure is omitted
//! or null — never invented.
use chrono::{SecondsFormat, Utc};
use rand::Rng;
use serde::Serialize;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::{Disks, System};
use crate::agent::Agent;
use crate::prober::ProbeReport;
use crate::subjects;
use crate::version;
#[derive(Debug, Serialize)]
pub struct HeartbeatPayload {
/// Wire schema version — lets the backend distinguish v2 host heartbeats
/// from legacy Go companion heartbeats during any transition window.
pub schema: u32,
pub timestamp: String,
pub agent: AgentInfo,
pub host: HostInfo,
pub instances: Vec<InstanceInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub probe: Option<ProbeReport>,
}
#[derive(Debug, Serialize)]
pub struct AgentInfo {
pub version: String,
pub commit: String,
pub os: String,
pub arch: String,
pub uptime_seconds: u64,
}
#[derive(Debug, Serialize)]
pub struct HostInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub hostname: Option<String>,
pub cpu_percent: f32,
pub cpu_cores: usize,
pub mem_total_mb: u64,
pub mem_used_mb: u64,
pub uptime_seconds: u64,
pub disks: Vec<DiskInfo>,
}
#[derive(Debug, Serialize)]
pub struct DiskInfo {
pub mount: String,
pub total_mb: u64,
pub free_mb: u64,
}
#[derive(Debug, Serialize)]
pub struct InstanceInfo {
pub id: String,
pub game: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
/// Process-managed: running/stopped/starting/stopping/crashed.
/// Unmanaged (no executable configured): configured/missing_root.
pub state: String,
pub uptime_seconds: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub root_disk_free_mb: Option<u64>,
}
pub async fn run(agent: Arc<Agent>) {
let cancel = agent.shutdown.clone();
let mut sys = System::new();
// CPU usage is a delta between refreshes; prime it once so the first
// heartbeat carries a real figure instead of 0.
sys.refresh_cpu_usage();
tokio::time::sleep(Duration::from_millis(250)).await;
loop {
let payload = collect(&agent, &mut sys).await;
match serde_json::to_vec(&payload) {
Ok(bytes) => {
let subject = subjects::host_heartbeat(&agent.cfg.license_id);
if let Err(e) = agent.nats.publish(subject, bytes.into()).await {
tracing::warn!("heartbeat publish failed: {e}");
} else {
tracing::debug!(
"heartbeat sent: cpu {:.1}%, {} instance(s)",
payload.host.cpu_percent,
payload.instances.len()
);
}
}
Err(e) => tracing::error!("heartbeat serialize failed: {e}"),
}
let jitter = rand::thread_rng().gen_range(0.8..1.2);
let interval = Duration::from_secs_f64(agent.cfg.heartbeat_seconds as f64 * jitter);
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = cancel.cancelled() => {
tracing::info!("telemetry stopping");
break;
}
}
}
}
pub async fn collect(agent: &Agent, sys: &mut System) -> HeartbeatPayload {
sys.refresh_cpu_usage();
sys.refresh_memory();
let disks = Disks::new_with_refreshed_list();
let disk_infos: Vec<DiskInfo> = disks
.iter()
.map(|d| DiskInfo {
mount: d.mount_point().to_string_lossy().to_string(),
total_mb: d.total_space() / 1_048_576,
free_mb: d.available_space() / 1_048_576,
})
.collect();
let mut instances = Vec::with_capacity(agent.cfg.instances.len());
for inst in &agent.cfg.instances {
let (state, uptime_seconds) = match agent.supervisors.get(&inst.id) {
Some(sup) if !matches!(sup.state(), crate::process::InstanceState::Unmanaged) => {
(sup.state().as_label().to_string(), sup.uptime_seconds().await)
}
_ => {
let exists = inst.root.exists();
(
if exists { "configured" } else { "missing_root" }.to_string(),
0,
)
}
};
instances.push(InstanceInfo {
id: inst.id.clone(),
game: inst.game.clone(),
label: inst.label.clone(),
state,
uptime_seconds,
root_disk_free_mb: disk_free_for_path(&disks, &inst.root),
});
}
let instances = instances;
HeartbeatPayload {
schema: 2,
timestamp: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
agent: AgentInfo {
version: version::VERSION.to_string(),
commit: version::GIT_HASH.to_string(),
os: std::env::consts::OS.to_string(),
arch: std::env::consts::ARCH.to_string(),
uptime_seconds: agent.started.elapsed().as_secs(),
},
host: HostInfo {
hostname: System::host_name(),
cpu_percent: sys.global_cpu_usage(),
cpu_cores: sys.cpus().len(),
mem_total_mb: sys.total_memory() / 1_048_576,
mem_used_mb: sys.used_memory() / 1_048_576,
uptime_seconds: System::uptime(),
disks: disk_infos,
},
instances,
probe: agent.last_probe.read().await.clone(),
}
}
/// Free space on the disk whose mount point is the longest prefix of `path`.
fn disk_free_for_path(disks: &Disks, path: &Path) -> Option<u64> {
disks
.iter()
.filter(|d| path.starts_with(d.mount_point()))
.max_by_key(|d| d.mount_point().as_os_str().len())
.map(|d| d.available_space() / 1_048_576)
}