feat(host-agent): Rust rewrite Phase 0 — multi-instance foundation, v2 wire protocol, real telemetry
All checks were successful
Test Asgard Runner / test (push) Successful in 3s
All checks were successful
Test Asgard Runner / test (push) Successful in 3s
New corrosion-host-agent/ crate (Go companion-agent stays as behavior
reference until parity). Wire protocol v2 per COA-B: instance-scoped
subjects corrosion.{license}.{instance}.* + host-level .host.* — spec
in PROTOCOL.md, designed for the license->host->instance fleet model.
- Multi-instance TOML config in the foundation, not retrofitted
- NATS layer on the Vigilance production profile (infinite reconnect,
capped backoff, 30s ping, 8192-msg offline buffer)
- Heartbeat with real sysinfo telemetry — Go agent shipped hardcoded
disk/cpu placeholders; this is the panel's first true Resources data
- Connectivity prober (outbound TCP, periodic + on-demand)
- Host cmd channel (ping/probe/sysinfo), going-offline beacon,
CancellationToken shutdown
- Live-fire verified against production NATS; artifacts: 3.7MB static
linux-musl, 3.8MB windows .exe (static CRT)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
16
corrosion-host-agent/src/agent.rs
Normal file
16
corrosion-host-agent/src/agent.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
//! Shared agent handle: every subsystem task holds an `Arc<Agent>`.
|
||||
|
||||
use std::time::Instant;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::Settings;
|
||||
use crate::prober::ProbeReport;
|
||||
|
||||
pub struct Agent {
|
||||
pub cfg: Settings,
|
||||
pub nats: async_nats::Client,
|
||||
pub started: Instant,
|
||||
pub last_probe: RwLock<Option<ProbeReport>>,
|
||||
pub shutdown: CancellationToken,
|
||||
}
|
||||
58
corrosion-host-agent/src/bus.rs
Normal file
58
corrosion-host-agent/src/bus.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
//! NATS connection layer.
|
||||
//!
|
||||
//! Connection parameters follow the production-proven Vigilance profile:
|
||||
//! infinite reconnects with capped exponential backoff, 30s pings to detect
|
||||
//! zombie TCP in ~60s, and a deep client-side send queue so telemetry buffers
|
||||
//! through broker outages instead of erroring.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::config::Settings;
|
||||
|
||||
pub async fn connect(cfg: &Settings) -> Result<async_nats::Client> {
|
||||
let (url, force_tls) = normalize_url(&cfg.nats_url);
|
||||
|
||||
let mut opts = async_nats::ConnectOptions::new()
|
||||
.name("corrosion-host-agent")
|
||||
.retry_on_initial_connect()
|
||||
.max_reconnects(None)
|
||||
.ping_interval(Duration::from_secs(30))
|
||||
.client_capacity(8192)
|
||||
.reconnect_delay_callback(|attempts| {
|
||||
Duration::from_millis(std::cmp::min(attempts as u64 * 100, 8_000))
|
||||
})
|
||||
.event_callback(|event| async move {
|
||||
match event {
|
||||
async_nats::Event::Disconnected => tracing::warn!("nats disconnected"),
|
||||
async_nats::Event::Connected => tracing::info!("nats connected"),
|
||||
other => tracing::debug!("nats event: {other}"),
|
||||
}
|
||||
});
|
||||
|
||||
if force_tls {
|
||||
opts = opts.require_tls(true);
|
||||
}
|
||||
if let Some(token) = &cfg.nats_token {
|
||||
opts = opts.token(token.clone());
|
||||
}
|
||||
|
||||
let client = opts
|
||||
.connect(&url)
|
||||
.await
|
||||
.with_context(|| format!("connecting to NATS at {url}"))?;
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
/// Accept `tls://` / `nats+tls://` URL schemes by translating to `nats://` +
|
||||
/// an explicit TLS requirement.
|
||||
fn normalize_url(raw: &str) -> (String, bool) {
|
||||
if let Some(rest) = raw.strip_prefix("tls://") {
|
||||
(format!("nats://{rest}"), true)
|
||||
} else if let Some(rest) = raw.strip_prefix("nats+tls://") {
|
||||
(format!("nats://{rest}"), true)
|
||||
} else {
|
||||
(raw.to_string(), false)
|
||||
}
|
||||
}
|
||||
186
corrosion-host-agent/src/config.rs
Normal file
186
corrosion-host-agent/src/config.rs
Normal file
@@ -0,0 +1,186 @@
|
||||
//! Agent configuration: TOML file + environment overrides.
|
||||
//!
|
||||
//! Multi-instance is foundational, not bolted on: one agent supervises N game
|
||||
//! instances on the host, each declared as an `[[instance]]` block. Connection
|
||||
//! secrets may come from env so the config file can be world-readable-ish
|
||||
//! while the token is not.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashSet;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// Instance ids share the NATS subject namespace with host-level segments.
|
||||
const RESERVED_INSTANCE_IDS: &[&str] = &["host", "cmd", "files", "update", "agent"];
|
||||
|
||||
pub const SUPPORTED_GAMES: &[&str] = &["rust", "conan", "soulmask", "dune"];
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct ConfigFile {
|
||||
pub agent: AgentSection,
|
||||
#[serde(default, rename = "instance")]
|
||||
pub instances: Vec<InstanceConfig>,
|
||||
#[serde(default)]
|
||||
pub prober: ProberSection,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct AgentSection {
|
||||
pub license_id: Option<String>,
|
||||
pub nats_url: Option<String>,
|
||||
pub nats_token: Option<String>,
|
||||
#[serde(default = "default_heartbeat_seconds")]
|
||||
pub heartbeat_seconds: u64,
|
||||
#[serde(default = "default_log_level")]
|
||||
pub log_level: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct InstanceConfig {
|
||||
/// Short slug, unique per license: becomes a NATS subject segment.
|
||||
pub id: String,
|
||||
/// One of SUPPORTED_GAMES.
|
||||
pub game: String,
|
||||
/// Install root for this instance on the host.
|
||||
pub root: PathBuf,
|
||||
/// Optional human label shown in the panel.
|
||||
#[serde(default)]
|
||||
pub label: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct ProberSection {
|
||||
#[serde(default = "default_probe_interval")]
|
||||
pub interval_seconds: u64,
|
||||
/// Extra TCP targets beyond the built-in defaults.
|
||||
#[serde(default, rename = "target")]
|
||||
pub targets: Vec<ProbeTargetConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct ProbeTargetConfig {
|
||||
pub name: String,
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
fn default_heartbeat_seconds() -> u64 {
|
||||
60
|
||||
}
|
||||
|
||||
fn default_probe_interval() -> u64 {
|
||||
300
|
||||
}
|
||||
|
||||
fn default_log_level() -> String {
|
||||
"info".to_string()
|
||||
}
|
||||
|
||||
/// Fully-resolved settings after merging file + env. Everything required is
|
||||
/// present and validated.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Settings {
|
||||
pub license_id: String,
|
||||
pub nats_url: String,
|
||||
pub nats_token: Option<String>,
|
||||
pub heartbeat_seconds: u64,
|
||||
pub log_level: String,
|
||||
pub instances: Vec<InstanceConfig>,
|
||||
pub probe_interval_seconds: u64,
|
||||
pub probe_targets: Vec<ProbeTargetConfig>,
|
||||
}
|
||||
|
||||
pub fn default_config_path() -> PathBuf {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
PathBuf::from(r"C:\ProgramData\Corrosion\agent.toml")
|
||||
}
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
PathBuf::from("/etc/corrosion/agent.toml")
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load(path: &Path) -> Result<Settings> {
|
||||
let raw = std::fs::read_to_string(path)
|
||||
.with_context(|| format!("reading config file {}", path.display()))?;
|
||||
let file: ConfigFile = toml::from_str(&raw)
|
||||
.with_context(|| format!("parsing config file {}", path.display()))?;
|
||||
resolve(file)
|
||||
}
|
||||
|
||||
/// Merge env overrides (env wins) and validate.
|
||||
fn resolve(file: ConfigFile) -> Result<Settings> {
|
||||
let license_id = std::env::var("CORROSION_LICENSE_ID")
|
||||
.ok()
|
||||
.filter(|v| !v.is_empty())
|
||||
.or(file.agent.license_id)
|
||||
.context("license_id missing: set [agent].license_id or CORROSION_LICENSE_ID")?;
|
||||
|
||||
let nats_url = std::env::var("CORROSION_NATS_URL")
|
||||
.ok()
|
||||
.filter(|v| !v.is_empty())
|
||||
.or(file.agent.nats_url)
|
||||
.context("nats_url missing: set [agent].nats_url or CORROSION_NATS_URL")?;
|
||||
|
||||
let nats_token = std::env::var("CORROSION_NATS_TOKEN")
|
||||
.ok()
|
||||
.filter(|v| !v.is_empty())
|
||||
.or(file.agent.nats_token);
|
||||
|
||||
validate_subject_segment("license_id", &license_id)?;
|
||||
|
||||
let mut seen: HashSet<&str> = HashSet::new();
|
||||
for inst in &file.instances {
|
||||
validate_subject_segment("instance id", &inst.id)?;
|
||||
if RESERVED_INSTANCE_IDS.contains(&inst.id.as_str()) {
|
||||
bail!("instance id '{}' is reserved", inst.id);
|
||||
}
|
||||
if !seen.insert(inst.id.as_str()) {
|
||||
bail!("duplicate instance id '{}'", inst.id);
|
||||
}
|
||||
if !SUPPORTED_GAMES.contains(&inst.game.as_str()) {
|
||||
bail!(
|
||||
"instance '{}': unsupported game '{}' (supported: {})",
|
||||
inst.id,
|
||||
inst.game,
|
||||
SUPPORTED_GAMES.join(", ")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if file.agent.heartbeat_seconds < 10 {
|
||||
bail!("[agent].heartbeat_seconds must be >= 10");
|
||||
}
|
||||
|
||||
Ok(Settings {
|
||||
license_id,
|
||||
nats_url,
|
||||
nats_token,
|
||||
heartbeat_seconds: file.agent.heartbeat_seconds,
|
||||
log_level: file.agent.log_level,
|
||||
instances: file.instances,
|
||||
probe_interval_seconds: file.prober.interval_seconds.max(30),
|
||||
probe_targets: file.prober.targets,
|
||||
})
|
||||
}
|
||||
|
||||
/// NATS subject segments must not contain '.', '*', '>', whitespace, etc.
|
||||
/// Keep it strict: lowercase alphanumerics plus '-' and '_', max 64 chars.
|
||||
fn validate_subject_segment(what: &str, value: &str) -> Result<()> {
|
||||
if value.is_empty() || value.len() > 64 {
|
||||
bail!("{what} '{value}' must be 1-64 characters");
|
||||
}
|
||||
if !value
|
||||
.chars()
|
||||
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_')
|
||||
{
|
||||
bail!("{what} '{value}' may only contain lowercase letters, digits, '-' and '_'");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
115
corrosion-host-agent/src/hostcmd.rs
Normal file
115
corrosion-host-agent/src/hostcmd.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
//! Host-level command handler: request-reply on `corrosion.{license}.host.cmd`.
|
||||
//!
|
||||
//! One subscriber; each message handled in its own task so a slow command
|
||||
//! never blocks the dispatch loop. Phase 0 commands: ping, probe, sysinfo.
|
||||
|
||||
use futures::StreamExt;
|
||||
use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use sysinfo::System;
|
||||
|
||||
use crate::agent::Agent;
|
||||
use crate::prober;
|
||||
use crate::subjects;
|
||||
use crate::telemetry;
|
||||
use crate::version;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct HostCommand {
|
||||
func: String,
|
||||
}
|
||||
|
||||
pub async fn run(agent: Arc<Agent>) -> anyhow::Result<()> {
|
||||
let subject = subjects::host_cmd(&agent.cfg.license_id);
|
||||
let mut sub = agent.nats.subscribe(subject.clone()).await?;
|
||||
tracing::info!("host command handler listening on {subject}");
|
||||
|
||||
let cancel = agent.shutdown.clone();
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = sub.next() => {
|
||||
match msg {
|
||||
Some(msg) => {
|
||||
let agent = agent.clone();
|
||||
tokio::spawn(async move { handle(agent, msg).await });
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("host command subscription ended");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("host command handler stopping");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle(agent: Arc<Agent>, msg: async_nats::Message) {
|
||||
let Some(reply) = msg.reply.clone() else {
|
||||
tracing::warn!("host command without reply subject ignored");
|
||||
return;
|
||||
};
|
||||
|
||||
let response = match serde_json::from_slice::<HostCommand>(&msg.payload) {
|
||||
Ok(cmd) => dispatch(&agent, &cmd.func).await,
|
||||
Err(e) => json!({ "status": "error", "message": format!("invalid command payload: {e}") }),
|
||||
};
|
||||
|
||||
let bytes = match serde_json::to_vec(&response) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
tracing::error!("response serialize failed: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(e) = agent.nats.publish(reply, bytes.into()).await {
|
||||
tracing::warn!("response publish failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
async fn dispatch(agent: &Arc<Agent>, func: &str) -> serde_json::Value {
|
||||
match func {
|
||||
"ping" => json!({
|
||||
"status": "success",
|
||||
"func": "ping",
|
||||
"version": version::VERSION,
|
||||
"commit": version::GIT_HASH,
|
||||
"uptime_seconds": agent.started.elapsed().as_secs(),
|
||||
}),
|
||||
"probe" => {
|
||||
let report = prober::run_probe(&agent.cfg.probe_targets).await;
|
||||
*agent.last_probe.write().await = Some(report.clone());
|
||||
match serde_json::to_value(&report) {
|
||||
Ok(report_json) => json!({
|
||||
"status": "success",
|
||||
"func": "probe",
|
||||
"report": report_json,
|
||||
}),
|
||||
Err(e) => json!({ "status": "error", "message": format!("probe serialize: {e}") }),
|
||||
}
|
||||
}
|
||||
"sysinfo" => {
|
||||
let mut sys = System::new();
|
||||
sys.refresh_cpu_usage();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
|
||||
let payload = telemetry::collect(agent, &mut sys).await;
|
||||
match serde_json::to_value(&payload) {
|
||||
Ok(snapshot) => json!({
|
||||
"status": "success",
|
||||
"func": "sysinfo",
|
||||
"snapshot": snapshot,
|
||||
}),
|
||||
Err(e) => json!({ "status": "error", "message": format!("sysinfo serialize: {e}") }),
|
||||
}
|
||||
}
|
||||
other => json!({
|
||||
"status": "error",
|
||||
"message": format!("unknown func '{other}' (supported: ping, probe, sysinfo)"),
|
||||
}),
|
||||
}
|
||||
}
|
||||
168
corrosion-host-agent/src/main.rs
Normal file
168
corrosion-host-agent/src/main.rs
Normal file
@@ -0,0 +1,168 @@
|
||||
//! Corrosion Host Agent — multi-game ops runtime.
|
||||
//!
|
||||
//! Phase 0: NATS connectivity, real host telemetry, multi-instance config,
|
||||
//! connectivity prober, host command channel. Process control, file ops, and
|
||||
//! game adapters arrive in Phase 1+ (see PROTOCOL.md).
|
||||
|
||||
mod agent;
|
||||
mod bus;
|
||||
mod config;
|
||||
mod hostcmd;
|
||||
mod prober;
|
||||
mod subjects;
|
||||
mod telemetry;
|
||||
mod version;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::{Parser, Subcommand};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::agent::Agent;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "corrosion-host-agent", version = version::VERSION, about)]
|
||||
struct Cli {
|
||||
/// Path to agent.toml (default: /etc/corrosion/agent.toml on Linux,
|
||||
/// C:\ProgramData\Corrosion\agent.toml on Windows)
|
||||
#[arg(long, short = 'c')]
|
||||
config: Option<PathBuf>,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: Option<Command>,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Command {
|
||||
/// Validate the config file and exit.
|
||||
Check,
|
||||
/// Print full version (semver, git hash, build timestamp) and exit.
|
||||
Version,
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
let config_path = cli.config.unwrap_or_else(config::default_config_path);
|
||||
|
||||
match cli.command {
|
||||
Some(Command::Version) => {
|
||||
println!("corrosion-host-agent {}", version::long());
|
||||
Ok(())
|
||||
}
|
||||
Some(Command::Check) => {
|
||||
let settings = config::load(&config_path)?;
|
||||
println!(
|
||||
"config ok: license {}, {} instance(s), nats {}",
|
||||
settings.license_id,
|
||||
settings.instances.len(),
|
||||
settings.nats_url
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
None => {
|
||||
let settings = config::load(&config_path)?;
|
||||
init_logging(&settings.log_level);
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.context("building tokio runtime")?
|
||||
.block_on(run(settings))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn init_logging(level: &str) {
|
||||
let filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level));
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(filter)
|
||||
.with_target(false)
|
||||
.init();
|
||||
}
|
||||
|
||||
async fn run(settings: config::Settings) -> Result<()> {
|
||||
tracing::info!(
|
||||
"corrosion-host-agent {} starting: license {}, {} instance(s)",
|
||||
version::long(),
|
||||
settings.license_id,
|
||||
settings.instances.len()
|
||||
);
|
||||
for inst in &settings.instances {
|
||||
tracing::info!(" instance '{}' ({}) at {}", inst.id, inst.game, inst.root.display());
|
||||
}
|
||||
|
||||
let nats = bus::connect(&settings).await?;
|
||||
|
||||
let agent = Arc::new(Agent {
|
||||
cfg: settings,
|
||||
nats,
|
||||
started: Instant::now(),
|
||||
last_probe: RwLock::new(None),
|
||||
shutdown: CancellationToken::new(),
|
||||
});
|
||||
|
||||
let mut handles = Vec::new();
|
||||
handles.push(tokio::spawn(telemetry::run(agent.clone())));
|
||||
handles.push(tokio::spawn(prober::run_loop(agent.clone())));
|
||||
{
|
||||
let agent = agent.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
if let Err(e) = hostcmd::run(agent).await {
|
||||
tracing::error!("host command handler failed: {e:#}");
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
wait_for_shutdown_signal().await;
|
||||
tracing::info!("shutdown signal received");
|
||||
agent.shutdown.cancel();
|
||||
|
||||
// Best-effort offline beacon so the panel flips to offline immediately
|
||||
// instead of waiting out the heartbeat staleness window.
|
||||
let beacon = subjects::host_going_offline(&agent.cfg.license_id);
|
||||
let _ = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
agent.nats.publish(beacon, "{}".into()),
|
||||
)
|
||||
.await;
|
||||
|
||||
match tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
futures::future::join_all(handles),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => tracing::info!("all subsystems stopped cleanly"),
|
||||
Err(_) => tracing::warn!("shutdown timeout: some subsystems did not stop within 10s"),
|
||||
}
|
||||
|
||||
let _ = agent.nats.flush().await;
|
||||
tracing::info!("corrosion-host-agent stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_shutdown_signal() {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
let mut sigterm = match signal(SignalKind::terminate()) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
tracing::error!("SIGTERM handler failed: {e}; falling back to ctrl-c only");
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
tokio::select! {
|
||||
_ = tokio::signal::ctrl_c() => {}
|
||||
_ = sigterm.recv() => {}
|
||||
}
|
||||
}
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
}
|
||||
}
|
||||
121
corrosion-host-agent/src/prober.rs
Normal file
121
corrosion-host-agent/src/prober.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
//! Connectivity prober.
|
||||
//!
|
||||
//! Answers "is it the box or is it the network?" before a support ticket gets
|
||||
//! written. Phase 0 scope is OUTBOUND reachability: TCP connect timing from
|
||||
//! the host to known endpoints. Inbound port-forward verification (the thing
|
||||
//! panel users actually struggle with) requires a backend-side reverse probe
|
||||
//! and is specified in PROTOCOL.md as a later phase.
|
||||
|
||||
use chrono::{SecondsFormat, Utc};
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use crate::agent::Agent;
|
||||
use crate::config::ProbeTargetConfig;
|
||||
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ProbeResult {
|
||||
pub name: String,
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub ok: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub latency_ms: Option<u64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ProbeReport {
|
||||
pub timestamp: String,
|
||||
pub results: Vec<ProbeResult>,
|
||||
}
|
||||
|
||||
/// Built-in targets every agent checks, before config extras.
|
||||
fn default_targets() -> Vec<ProbeTargetConfig> {
|
||||
vec![ProbeTargetConfig {
|
||||
name: "corrosion-cdn".to_string(),
|
||||
host: "cdn.corrosionmgmt.com".to_string(),
|
||||
port: 443,
|
||||
}]
|
||||
}
|
||||
|
||||
pub async fn run_probe(extra_targets: &[ProbeTargetConfig]) -> ProbeReport {
|
||||
let mut targets = default_targets();
|
||||
targets.extend(extra_targets.iter().cloned());
|
||||
|
||||
let checks = targets.into_iter().map(|t| async move {
|
||||
let started = Instant::now();
|
||||
let addr = format!("{}:{}", t.host, t.port);
|
||||
let outcome = tokio::time::timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr)).await;
|
||||
match outcome {
|
||||
Ok(Ok(_stream)) => ProbeResult {
|
||||
name: t.name,
|
||||
host: t.host,
|
||||
port: t.port,
|
||||
ok: true,
|
||||
latency_ms: Some(started.elapsed().as_millis() as u64),
|
||||
error: None,
|
||||
},
|
||||
Ok(Err(e)) => ProbeResult {
|
||||
name: t.name,
|
||||
host: t.host,
|
||||
port: t.port,
|
||||
ok: false,
|
||||
latency_ms: None,
|
||||
error: Some(e.to_string()),
|
||||
},
|
||||
Err(_) => ProbeResult {
|
||||
name: t.name,
|
||||
host: t.host,
|
||||
port: t.port,
|
||||
ok: false,
|
||||
latency_ms: None,
|
||||
error: Some(format!("timeout after {}s", CONNECT_TIMEOUT.as_secs())),
|
||||
},
|
||||
}
|
||||
});
|
||||
|
||||
let results = futures::future::join_all(checks).await;
|
||||
|
||||
ProbeReport {
|
||||
timestamp: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
|
||||
results,
|
||||
}
|
||||
}
|
||||
|
||||
/// Periodic probe loop; results land in shared state and ride the next
|
||||
/// heartbeat. Jittered interval to avoid fleet-wide synchronization.
|
||||
pub async fn run_loop(agent: Arc<Agent>) {
|
||||
let cancel = agent.shutdown.clone();
|
||||
loop {
|
||||
let report = run_probe(&agent.cfg.probe_targets).await;
|
||||
let failed: Vec<&str> = report
|
||||
.results
|
||||
.iter()
|
||||
.filter(|r| !r.ok)
|
||||
.map(|r| r.name.as_str())
|
||||
.collect();
|
||||
if failed.is_empty() {
|
||||
tracing::debug!("probe ok ({} targets)", report.results.len());
|
||||
} else {
|
||||
tracing::warn!("probe failures: {}", failed.join(", "));
|
||||
}
|
||||
*agent.last_probe.write().await = Some(report);
|
||||
|
||||
let jitter = rand::Rng::gen_range(&mut rand::thread_rng(), 0.8..1.2);
|
||||
let interval =
|
||||
Duration::from_secs_f64(agent.cfg.probe_interval_seconds as f64 * jitter);
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(interval) => {}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("prober stopping");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
30
corrosion-host-agent/src/subjects.rs
Normal file
30
corrosion-host-agent/src/subjects.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
//! Corrosion wire protocol v2 subject scheme (see PROTOCOL.md).
|
||||
//!
|
||||
//! Host-level subjects live under `corrosion.{license}.host.*`; per-instance
|
||||
//! subjects under `corrosion.{license}.{instance_id}.*`. Instance ids are
|
||||
//! validated at config load so they can never collide with the reserved
|
||||
//! `host` segment or contain subject metacharacters.
|
||||
|
||||
pub fn host_heartbeat(license: &str) -> String {
|
||||
format!("corrosion.{license}.host.heartbeat")
|
||||
}
|
||||
|
||||
pub fn host_cmd(license: &str) -> String {
|
||||
format!("corrosion.{license}.host.cmd")
|
||||
}
|
||||
|
||||
pub fn host_going_offline(license: &str) -> String {
|
||||
format!("corrosion.{license}.host.going_offline")
|
||||
}
|
||||
|
||||
/// Phase 1: per-instance command channel (start/stop/restart/rcon/...).
|
||||
#[allow(dead_code)]
|
||||
pub fn instance_cmd(license: &str, instance: &str) -> String {
|
||||
format!("corrosion.{license}.{instance}.cmd")
|
||||
}
|
||||
|
||||
/// Phase 1: per-instance state-change events.
|
||||
#[allow(dead_code)]
|
||||
pub fn instance_status(license: &str, instance: &str) -> String {
|
||||
format!("corrosion.{license}.{instance}.status")
|
||||
}
|
||||
175
corrosion-host-agent/src/telemetry.rs
Normal file
175
corrosion-host-agent/src/telemetry.rs
Normal file
@@ -0,0 +1,175 @@
|
||||
//! Host heartbeat: real telemetry, never fabricated.
|
||||
//!
|
||||
//! The Go agent shipped `disk_free_mb: 50000` and `cpu_percent: 0.0` as
|
||||
//! hardcoded placeholders. This module is the first time the panel's
|
||||
//! Resources view receives the truth. Anything we cannot measure is omitted
|
||||
//! or null — never invented.
|
||||
|
||||
use chrono::{SecondsFormat, Utc};
|
||||
use rand::Rng;
|
||||
use serde::Serialize;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use sysinfo::{Disks, System};
|
||||
|
||||
use crate::agent::Agent;
|
||||
use crate::prober::ProbeReport;
|
||||
use crate::subjects;
|
||||
use crate::version;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct HeartbeatPayload {
|
||||
/// Wire schema version — lets the backend distinguish v2 host heartbeats
|
||||
/// from legacy Go companion heartbeats during any transition window.
|
||||
pub schema: u32,
|
||||
pub timestamp: String,
|
||||
pub agent: AgentInfo,
|
||||
pub host: HostInfo,
|
||||
pub instances: Vec<InstanceInfo>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub probe: Option<ProbeReport>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AgentInfo {
|
||||
pub version: String,
|
||||
pub commit: String,
|
||||
pub os: String,
|
||||
pub arch: String,
|
||||
pub uptime_seconds: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct HostInfo {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub hostname: Option<String>,
|
||||
pub cpu_percent: f32,
|
||||
pub cpu_cores: usize,
|
||||
pub mem_total_mb: u64,
|
||||
pub mem_used_mb: u64,
|
||||
pub uptime_seconds: u64,
|
||||
pub disks: Vec<DiskInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct DiskInfo {
|
||||
pub mount: String,
|
||||
pub total_mb: u64,
|
||||
pub free_mb: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct InstanceInfo {
|
||||
pub id: String,
|
||||
pub game: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub label: Option<String>,
|
||||
/// Phase 0 states: `configured` (root exists) or `missing_root`.
|
||||
/// Phase 1 adds live process states (running/stopped/crashed).
|
||||
pub state: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub root_disk_free_mb: Option<u64>,
|
||||
}
|
||||
|
||||
pub async fn run(agent: Arc<Agent>) {
|
||||
let cancel = agent.shutdown.clone();
|
||||
let mut sys = System::new();
|
||||
|
||||
// CPU usage is a delta between refreshes; prime it once so the first
|
||||
// heartbeat carries a real figure instead of 0.
|
||||
sys.refresh_cpu_usage();
|
||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||
|
||||
loop {
|
||||
let payload = collect(&agent, &mut sys).await;
|
||||
match serde_json::to_vec(&payload) {
|
||||
Ok(bytes) => {
|
||||
let subject = subjects::host_heartbeat(&agent.cfg.license_id);
|
||||
if let Err(e) = agent.nats.publish(subject, bytes.into()).await {
|
||||
tracing::warn!("heartbeat publish failed: {e}");
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"heartbeat sent: cpu {:.1}%, {} instance(s)",
|
||||
payload.host.cpu_percent,
|
||||
payload.instances.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => tracing::error!("heartbeat serialize failed: {e}"),
|
||||
}
|
||||
|
||||
let jitter = rand::thread_rng().gen_range(0.8..1.2);
|
||||
let interval = Duration::from_secs_f64(agent.cfg.heartbeat_seconds as f64 * jitter);
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(interval) => {}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("telemetry stopping");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn collect(agent: &Agent, sys: &mut System) -> HeartbeatPayload {
|
||||
sys.refresh_cpu_usage();
|
||||
sys.refresh_memory();
|
||||
let disks = Disks::new_with_refreshed_list();
|
||||
|
||||
let disk_infos: Vec<DiskInfo> = disks
|
||||
.iter()
|
||||
.map(|d| DiskInfo {
|
||||
mount: d.mount_point().to_string_lossy().to_string(),
|
||||
total_mb: d.total_space() / 1_048_576,
|
||||
free_mb: d.available_space() / 1_048_576,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let instances = agent
|
||||
.cfg
|
||||
.instances
|
||||
.iter()
|
||||
.map(|inst| {
|
||||
let exists = inst.root.exists();
|
||||
InstanceInfo {
|
||||
id: inst.id.clone(),
|
||||
game: inst.game.clone(),
|
||||
label: inst.label.clone(),
|
||||
state: if exists { "configured" } else { "missing_root" }.to_string(),
|
||||
root_disk_free_mb: disk_free_for_path(&disks, &inst.root),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
HeartbeatPayload {
|
||||
schema: 2,
|
||||
timestamp: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
|
||||
agent: AgentInfo {
|
||||
version: version::VERSION.to_string(),
|
||||
commit: version::GIT_HASH.to_string(),
|
||||
os: std::env::consts::OS.to_string(),
|
||||
arch: std::env::consts::ARCH.to_string(),
|
||||
uptime_seconds: agent.started.elapsed().as_secs(),
|
||||
},
|
||||
host: HostInfo {
|
||||
hostname: System::host_name(),
|
||||
cpu_percent: sys.global_cpu_usage(),
|
||||
cpu_cores: sys.cpus().len(),
|
||||
mem_total_mb: sys.total_memory() / 1_048_576,
|
||||
mem_used_mb: sys.used_memory() / 1_048_576,
|
||||
uptime_seconds: System::uptime(),
|
||||
disks: disk_infos,
|
||||
},
|
||||
instances,
|
||||
probe: agent.last_probe.read().await.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Free space on the disk whose mount point is the longest prefix of `path`.
|
||||
fn disk_free_for_path(disks: &Disks, path: &Path) -> Option<u64> {
|
||||
disks
|
||||
.iter()
|
||||
.filter(|d| path.starts_with(d.mount_point()))
|
||||
.max_by_key(|d| d.mount_point().as_os_str().len())
|
||||
.map(|d| d.available_space() / 1_048_576)
|
||||
}
|
||||
10
corrosion-host-agent/src/version.rs
Normal file
10
corrosion-host-agent/src/version.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
//! Build-time identity, embedded so every heartbeat and `--version` can state
|
||||
//! exactly what is running.
|
||||
|
||||
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
pub const GIT_HASH: &str = env!("CORROSION_GIT_HASH");
|
||||
pub const BUILD_TS: &str = env!("CORROSION_BUILD_TS");
|
||||
|
||||
pub fn long() -> String {
|
||||
format!("{VERSION} ({GIT_HASH}, built {BUILD_TS})")
|
||||
}
|
||||
Reference in New Issue
Block a user