feat(host-agent): Phase 1a process supervision — instance start/stop/restart/status + push state events

Per-instance ProcessSupervisor: tokio child spawn with proper arg list
(fixes Go's naive space-splitting), graceful SIGTERM with 30s budget
then force kill, monitor task classifying ordered-stop vs crash (exit
code captured), watch-channel state observable everywhere. Instance cmd
channel live on corrosion.{license}.{instance}.cmd (start/stop/restart/
status) with state events pushed on {instance}.status (keep-latest
semantics, documented). Heartbeats now carry live process state +
uptime per instance. Crate restructured lib+bin for integration tests.

Verified: 5 integration tests with real OS processes (lifecycle, crash
exit-code, restart recovery, unmanaged rejection, clean spawn failure)
+ live-NATS contract test (request-reply roundtrips, double-start
rejection, push events, heartbeat state) — all green.

Known limitation (documented): no PID adoption yet — agent restart
orphans a running game process to 'stopped' until panel restart.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Vantz Stockwell
2026-06-11 10:44:24 -04:00
parent f706c3c47e
commit 068a476f39
13 changed files with 669 additions and 44 deletions

View File

@@ -265,6 +265,7 @@ dependencies = [
"chrono", "chrono",
"clap", "clap",
"futures", "futures",
"libc",
"rand", "rand",
"serde", "serde",
"serde_json", "serde_json",

View File

@@ -26,6 +26,9 @@ anyhow = "1"
clap = { version = "4.5", features = ["derive"] } clap = { version = "4.5", features = ["derive"] }
rand = "0.8" rand = "0.8"
[target.'cfg(unix)'.dependencies]
libc = "0.2"
# Size-optimized release: single static binary living next to RAM-heavy game # Size-optimized release: single static binary living next to RAM-heavy game
# servers. Panic stays 'unwind' so a panicking task surfaces through its # servers. Panic stays 'unwind' so a panicking task surfaces through its
# JoinHandle instead of killing the whole agent. # JoinHandle instead of killing the whole agent.

View File

@@ -1,8 +1,9 @@
# Corrosion Wire Protocol v2 # Corrosion Wire Protocol v2
Status: **Phase 0 implemented** (host heartbeat, host commands, going-offline Status: **Phase 0 + Phase 1 process control implemented** (host heartbeat,
beacon). Per-instance command/status subjects are reserved and specified here host commands, going-offline beacon, per-instance start/stop/restart/status
for Phase 1. with push state events). RCON, SteamCMD, file ops, and game adapters are
specified but not yet implemented.
## Design ## Design
@@ -70,9 +71,10 @@ All telemetry is measured, never fabricated. Fields the agent cannot measure
are omitted (`probe` before the first probe completes, `hostname` if are omitted (`probe` before the first probe completes, `hostname` if
unavailable). unavailable).
Phase 0 instance `state` values: `configured` (root path exists), Instance `state` values — process-managed (an `executable` is configured):
`missing_root`. Phase 1 adds live process states: `running`, `stopped`, `running`, `stopped`, `starting`, `stopping`, `crashed`; unmanaged
`crashed`, `starting`, `updating`. (telemetry-only): `configured` (root exists), `missing_root`. Each instance
also reports `uptime_seconds` (0 unless running).
### `corrosion.{license_id}.host.cmd` (backend → agent, request-reply) ### `corrosion.{license_id}.host.cmd` (backend → agent, request-reply)
@@ -92,19 +94,35 @@ Best-effort beacon (500ms budget) on graceful shutdown so the panel can flip
the host to offline immediately instead of waiting out heartbeat staleness. the host to offline immediately instead of waiting out heartbeat staleness.
Payload: `{}`. Payload: `{}`.
## Instance-level subjects (Phase 1 — reserved, not yet implemented) ## Instance-level subjects
### `corrosion.{license_id}.{instance_id}.cmd` (backend → agent, request-reply) ### `corrosion.{license_id}.{instance_id}.cmd` (backend → agent, request-reply) — LIVE
Lifecycle and control for one game instance. Planned funcs: `start`, `stop`, Lifecycle and control for one game instance.
`restart`, `status`, `rcon` (process-class games), `steam_update`,
`oxide_install` (rust), plus game-adapter-specific commands (Dune: docker
lifecycle, RabbitMQ bus commands, Coriolis reset).
### `corrosion.{license_id}.{instance_id}.status` (agent → backend, publish) Implemented funcs: `start`, `stop` (graceful with 30s budget, then force
kill), `restart`, `status` (returns `state` + `uptime_seconds`). Errors reply
`{ "status": "error", "message": ... }` — including start on an unmanaged
instance, double start, and unknown funcs.
State-change events (started/stopped/crashed) so the panel does not wait for Planned funcs: `rcon` (process-class games), `steam_update`, `oxide_install`
the next heartbeat. (rust), plus game-adapter-specific commands (Dune: docker lifecycle, RabbitMQ
bus commands, Coriolis reset).
### `corrosion.{license_id}.{instance_id}.status` (agent → backend, publish) — LIVE
State-change events so the panel does not wait for the next heartbeat.
Payload: `{ "timestamp", "instance_id", "event": { "state": ..., "exit_code"? } }`.
Semantics: **keep-latest state sync**, not a lossless transition ledger —
near-instant transient states (e.g. `starting` when spawn succeeds
immediately) may coalesce into the following state. Consumers should treat
each event as "current state is now X".
Known Phase 1 limitation: the supervisor does not yet persist/adopt PIDs — if
the agent itself restarts while a game server is running, the game process
survives but reports `stopped` until restarted through the panel. PID
adoption is queued with the service-install work.
### `corrosion.{license_id}.{instance_id}.console` (agent → backend, publish) ### `corrosion.{license_id}.{instance_id}.console` (agent → backend, publish)

View File

@@ -15,7 +15,11 @@ instance on that host — Rust, Conan Exiles, Soulmask, Dune: Awakening.
- [x] Connectivity prober (outbound TCP, periodic + on-demand) - [x] Connectivity prober (outbound TCP, periodic + on-demand)
- [x] Host command channel (`ping`, `probe`, `sysinfo`) - [x] Host command channel (`ping`, `probe`, `sysinfo`)
- [x] Graceful shutdown (cancellation token, going-offline beacon, NATS flush) - [x] Graceful shutdown (cancellation token, going-offline beacon, NATS flush)
- [ ] Phase 1: process-class game adapter (spawn/RCON/SteamCMD/files) — Rust, Conan, Soulmask - [x] Phase 1a: process supervision — per-instance start/stop/restart/status over
`{instance}.cmd` request-reply, push state events on `{instance}.status`,
crash detection with exit codes, live state in heartbeats
(integration-tested with real processes + live-NATS contract test)
- [ ] Phase 1b: RCON trait (WebRCON rust / TCP conan+soulmask), SteamCMD, jailed file manager
- [ ] Phase 2: Dune Docker adapter (compose lifecycle, RabbitMQ bus, Postgres admin) - [ ] Phase 2: Dune Docker adapter (compose lifecycle, RabbitMQ bus, Postgres admin)
- [ ] Phase 3: signed self-update (enforced ed25519 — release gate), service install, supervisor split - [ ] Phase 3: signed self-update (enforced ed25519 — release gate), service install, supervisor split

View File

@@ -1,10 +1,13 @@
//! Shared agent handle: every subsystem task holds an `Arc<Agent>`. //! Shared agent handle: every subsystem task holds an `Arc<Agent>`.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::config::Settings; use crate::config::Settings;
use crate::process::ProcessSupervisor;
use crate::prober::ProbeReport; use crate::prober::ProbeReport;
pub struct Agent { pub struct Agent {
@@ -12,5 +15,8 @@ pub struct Agent {
pub nats: async_nats::Client, pub nats: async_nats::Client,
pub started: Instant, pub started: Instant,
pub last_probe: RwLock<Option<ProbeReport>>, pub last_probe: RwLock<Option<ProbeReport>>,
/// One supervisor per instance (unmanaged instances included — they
/// report `unmanaged` state and reject process commands).
pub supervisors: HashMap<String, Arc<ProcessSupervisor>>,
pub shutdown: CancellationToken, pub shutdown: CancellationToken,
} }

View File

@@ -49,6 +49,29 @@ pub struct InstanceConfig {
/// Optional human label shown in the panel. /// Optional human label shown in the panel.
#[serde(default)] #[serde(default)]
pub label: Option<String>, pub label: Option<String>,
/// Game server executable. Relative paths resolve against `root`.
/// Absent = unmanaged instance (telemetry only, no process control).
#[serde(default)]
pub executable: Option<PathBuf>,
/// Arguments as a proper list — no shell splitting, quoted values survive.
#[serde(default)]
pub args: Vec<String>,
/// Working directory for the process. Defaults to the executable's directory.
#[serde(default)]
pub working_dir: Option<PathBuf>,
}
impl InstanceConfig {
/// Absolute executable path, if this instance is process-managed.
pub fn resolved_executable(&self) -> Option<PathBuf> {
self.executable.as_ref().map(|exe| {
if exe.is_absolute() {
exe.clone()
} else {
self.root.join(exe)
}
})
}
} }
#[derive(Debug, Clone, Default, Deserialize)] #[derive(Debug, Clone, Default, Deserialize)]

View File

@@ -0,0 +1,147 @@
//! Per-instance command channel + state-change events.
//!
//! Each process-managed instance gets a request-reply subscriber on
//! `corrosion.{license}.{instance_id}.cmd` (funcs: start/stop/restart/status)
//! and a publisher task that pushes every supervisor state change to
//! `corrosion.{license}.{instance_id}.status` — the panel sees crashes when
//! they happen, not when the next heartbeat ambles in.
use chrono::{SecondsFormat, Utc};
use futures::StreamExt;
use serde::Deserialize;
use serde_json::json;
use std::sync::Arc;
use crate::agent::Agent;
use crate::process::ProcessSupervisor;
use crate::subjects;
#[derive(Debug, Deserialize)]
struct InstanceCommand {
func: String,
}
/// Forward every supervisor state change as a status event.
pub async fn publish_state_changes(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>) {
let subject = subjects::instance_status(&agent.cfg.license_id, &sup.instance_id);
let mut rx = sup.watch_state();
let cancel = agent.shutdown.clone();
loop {
tokio::select! {
changed = rx.changed() => {
if changed.is_err() {
break;
}
let state = rx.borrow().clone();
let event = json!({
"timestamp": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
"instance_id": sup.instance_id,
"event": state,
});
match serde_json::to_vec(&event) {
Ok(bytes) => {
if let Err(e) = agent.nats.publish(subject.clone(), bytes.into()).await {
tracing::warn!("status publish failed for '{}': {e}", sup.instance_id);
}
}
Err(e) => tracing::error!("status serialize failed: {e}"),
}
}
_ = cancel.cancelled() => break,
}
}
}
/// Request-reply command handler for one instance.
pub async fn run(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>) -> anyhow::Result<()> {
let subject = subjects::instance_cmd(&agent.cfg.license_id, &sup.instance_id);
let mut sub = agent.nats.subscribe(subject.clone()).await?;
tracing::info!("instance command handler listening on {subject}");
let cancel = agent.shutdown.clone();
loop {
tokio::select! {
msg = sub.next() => {
match msg {
Some(msg) => {
let agent = agent.clone();
let sup = sup.clone();
tokio::spawn(async move { handle(agent, sup, msg).await });
}
None => {
tracing::warn!("instance command subscription ended for '{}'", sup.instance_id);
break;
}
}
}
_ = cancel.cancelled() => {
tracing::info!("instance command handler stopping for '{}'", sup.instance_id);
break;
}
}
}
Ok(())
}
async fn handle(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>, msg: async_nats::Message) {
let Some(reply) = msg.reply.clone() else {
tracing::warn!("instance command without reply subject ignored");
return;
};
let response = match serde_json::from_slice::<InstanceCommand>(&msg.payload) {
Ok(cmd) => dispatch(&sup, &cmd.func).await,
Err(e) => json!({ "status": "error", "message": format!("invalid command payload: {e}") }),
};
let bytes = match serde_json::to_vec(&response) {
Ok(b) => b,
Err(e) => {
tracing::error!("response serialize failed: {e}");
return;
}
};
if let Err(e) = agent.nats.publish(reply, bytes.into()).await {
tracing::warn!("response publish failed: {e}");
}
}
async fn dispatch(sup: &Arc<ProcessSupervisor>, func: &str) -> serde_json::Value {
let outcome = match func {
"start" => sup.start().await.map(|_| "starting"),
"stop" => sup.stop().await.map(|_| "stopped"),
"restart" => sup.restart().await.map(|_| "restarted"),
"status" => {
return json!({
"status": "success",
"func": "status",
"instance_id": sup.instance_id,
"state": sup.state(),
"uptime_seconds": sup.uptime_seconds().await,
});
}
other => {
return json!({
"status": "error",
"message": format!("unknown func '{other}' (supported: start, stop, restart, status)"),
});
}
};
match outcome {
Ok(result) => json!({
"status": "success",
"func": func,
"instance_id": sup.instance_id,
"result": result,
"state": sup.state(),
}),
Err(e) => json!({
"status": "error",
"func": func,
"instance_id": sup.instance_id,
"message": format!("{e:#}"),
}),
}
}

View File

@@ -0,0 +1,13 @@
//! Corrosion Host Agent library surface — modules are public so integration
//! tests can drive subsystems (notably the process supervisor) directly.
pub mod agent;
pub mod bus;
pub mod config;
pub mod hostcmd;
pub mod instancecmd;
pub mod prober;
pub mod process;
pub mod subjects;
pub mod telemetry;
pub mod version;

View File

@@ -4,14 +4,9 @@
//! connectivity prober, host command channel. Process control, file ops, and //! connectivity prober, host command channel. Process control, file ops, and
//! game adapters arrive in Phase 1+ (see PROTOCOL.md). //! game adapters arrive in Phase 1+ (see PROTOCOL.md).
mod agent; use corrosion_host_agent::{
mod bus; agent, bus, config, hostcmd, instancecmd, prober, process, subjects, telemetry, version,
mod config; };
mod hostcmd;
mod prober;
mod subjects;
mod telemetry;
mod version;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
@@ -96,11 +91,18 @@ async fn run(settings: config::Settings) -> Result<()> {
let nats = bus::connect(&settings).await?; let nats = bus::connect(&settings).await?;
let supervisors = settings
.instances
.iter()
.map(|inst| (inst.id.clone(), process::ProcessSupervisor::new(inst)))
.collect();
let agent = Arc::new(Agent { let agent = Arc::new(Agent {
cfg: settings, cfg: settings,
nats, nats,
started: Instant::now(), started: Instant::now(),
last_probe: RwLock::new(None), last_probe: RwLock::new(None),
supervisors,
shutdown: CancellationToken::new(), shutdown: CancellationToken::new(),
}); });
@@ -115,6 +117,21 @@ async fn run(settings: config::Settings) -> Result<()> {
} }
})); }));
} }
for sup in agent.supervisors.values() {
{
let agent = agent.clone();
let sup = sup.clone();
handles.push(tokio::spawn(async move {
if let Err(e) = instancecmd::run(agent, sup).await {
tracing::error!("instance command handler failed: {e:#}");
}
}));
}
handles.push(tokio::spawn(instancecmd::publish_state_changes(
agent.clone(),
sup.clone(),
)));
}
wait_for_shutdown_signal().await; wait_for_shutdown_signal().await;
tracing::info!("shutdown signal received"); tracing::info!("shutdown signal received");

View File

@@ -0,0 +1,278 @@
//! Per-instance game-server process supervision.
//!
//! One `ProcessSupervisor` per process-managed instance. Lifecycle mirrors the
//! proven Go agent behavior — graceful SIGTERM with a 30s budget before force
//! kill, a monitor task that reaps the child and records crash-vs-stop — with
//! two fixes the Go version needed: args are a proper list (no naive space
//! splitting), and every state change is observable through a watch channel
//! so the panel gets push events instead of waiting for the next heartbeat.
use anyhow::{bail, Context, Result};
use serde::Serialize;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::process::{Child, Command};
use tokio::sync::{watch, Mutex};
use crate::config::InstanceConfig;
const GRACEFUL_STOP_BUDGET: Duration = Duration::from_secs(30);
const RESTART_PAUSE: Duration = Duration::from_secs(2);
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(rename_all = "snake_case", tag = "state")]
pub enum InstanceState {
/// Not process-managed (no executable configured).
Unmanaged,
Stopped,
Starting,
Running,
Stopping,
/// Process exited without a stop request.
Crashed {
#[serde(skip_serializing_if = "Option::is_none")]
exit_code: Option<i32>,
},
}
impl InstanceState {
pub fn as_label(&self) -> &'static str {
match self {
InstanceState::Unmanaged => "unmanaged",
InstanceState::Stopped => "stopped",
InstanceState::Starting => "starting",
InstanceState::Running => "running",
InstanceState::Stopping => "stopping",
InstanceState::Crashed { .. } => "crashed",
}
}
}
struct Inner {
child: Option<Child>,
started_at: Option<Instant>,
/// True while a stop was requested — the monitor uses it to distinguish
/// an ordered shutdown from a crash.
stop_requested: bool,
}
pub struct ProcessSupervisor {
pub instance_id: String,
executable: Option<PathBuf>,
args: Vec<String>,
working_dir: Option<PathBuf>,
inner: Mutex<Inner>,
state_tx: watch::Sender<InstanceState>,
}
impl ProcessSupervisor {
pub fn new(cfg: &InstanceConfig) -> Arc<Self> {
let executable = cfg.resolved_executable();
let initial = if executable.is_some() {
InstanceState::Stopped
} else {
InstanceState::Unmanaged
};
let (state_tx, _) = watch::channel(initial);
Arc::new(Self {
instance_id: cfg.id.clone(),
executable,
args: cfg.args.clone(),
working_dir: cfg.working_dir.clone(),
inner: Mutex::new(Inner {
child: None,
started_at: None,
stop_requested: false,
}),
state_tx,
})
}
pub fn state(&self) -> InstanceState {
self.state_tx.borrow().clone()
}
pub fn watch_state(&self) -> watch::Receiver<InstanceState> {
self.state_tx.subscribe()
}
pub async fn uptime_seconds(&self) -> u64 {
let inner = self.inner.lock().await;
match (&*self.state_tx.borrow(), inner.started_at) {
(InstanceState::Running, Some(t)) => t.elapsed().as_secs(),
_ => 0,
}
}
pub async fn start(self: &Arc<Self>) -> Result<()> {
let Some(exe) = self.executable.clone() else {
bail!("instance '{}' has no executable configured", self.instance_id);
};
if !exe.exists() {
bail!("executable not found: {}", exe.display());
}
let mut inner = self.inner.lock().await;
if matches!(*self.state_tx.borrow(), InstanceState::Running | InstanceState::Starting) {
bail!("instance '{}' is already running", self.instance_id);
}
self.set_state(InstanceState::Starting);
let workdir = self
.working_dir
.clone()
.or_else(|| exe.parent().map(|p| p.to_path_buf()))
.unwrap_or_else(|| PathBuf::from("."));
let child = Command::new(&exe)
.args(&self.args)
.current_dir(&workdir)
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.with_context(|| format!("spawning {}", exe.display()))?;
let pid = child.id();
inner.child = Some(child);
inner.started_at = Some(Instant::now());
inner.stop_requested = false;
drop(inner);
self.set_state(InstanceState::Running);
tracing::info!(
"instance '{}' started: {} (pid {:?})",
self.instance_id,
exe.display(),
pid
);
// Monitor: reap the child and classify the exit.
let sup = Arc::clone(self);
tokio::spawn(async move { sup.monitor().await });
Ok(())
}
async fn monitor(self: Arc<Self>) {
// Take a waiter without holding the lock across the whole child
// lifetime: Child::wait needs &mut, so the child stays in inner and
// we poll it.
loop {
let status = {
let mut inner = self.inner.lock().await;
let Some(child) = inner.child.as_mut() else { return };
match child.try_wait() {
Ok(Some(status)) => Some(status),
Ok(None) => None,
Err(e) => {
tracing::error!("instance '{}' wait failed: {e}", self.instance_id);
return;
}
}
};
match status {
Some(status) => {
let mut inner = self.inner.lock().await;
inner.child = None;
inner.started_at = None;
let ordered = inner.stop_requested;
inner.stop_requested = false;
drop(inner);
if ordered {
self.set_state(InstanceState::Stopped);
tracing::info!("instance '{}' stopped ({status})", self.instance_id);
} else {
let exit_code = status.code();
self.set_state(InstanceState::Crashed { exit_code });
tracing::warn!(
"instance '{}' exited unexpectedly ({status}) — marked crashed",
self.instance_id
);
}
return;
}
None => tokio::time::sleep(Duration::from_millis(500)).await,
}
}
}
pub async fn stop(self: &Arc<Self>) -> Result<()> {
let mut inner = self.inner.lock().await;
if inner.child.is_none() {
bail!("instance '{}' is not running", self.instance_id);
}
inner.stop_requested = true;
self.set_state(InstanceState::Stopping);
let child = inner.child.as_mut().expect("checked above");
// Graceful first: SIGTERM on unix; Windows has no SIGTERM equivalent
// for console processes, so it goes straight to kill there.
#[cfg(unix)]
if let Some(pid) = child.id() {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
#[cfg(not(unix))]
{
let _ = child.start_kill();
}
drop(inner);
// Wait for the monitor to observe the exit; force kill on budget.
let mut rx = self.watch_state();
let deadline = tokio::time::timeout(GRACEFUL_STOP_BUDGET, async {
loop {
if matches!(*rx.borrow(), InstanceState::Stopped) {
return;
}
if rx.changed().await.is_err() {
return;
}
}
})
.await;
if deadline.is_err() {
tracing::warn!(
"instance '{}' ignored SIGTERM for {}s — force killing",
self.instance_id,
GRACEFUL_STOP_BUDGET.as_secs()
);
let mut inner = self.inner.lock().await;
if let Some(child) = inner.child.as_mut() {
let _ = child.start_kill();
}
drop(inner);
let mut rx = self.watch_state();
let _ = tokio::time::timeout(Duration::from_secs(5), async {
while !matches!(*rx.borrow(), InstanceState::Stopped) {
if rx.changed().await.is_err() {
break;
}
}
})
.await;
}
Ok(())
}
pub async fn restart(self: &Arc<Self>) -> Result<()> {
if !matches!(*self.state_tx.borrow(), InstanceState::Stopped | InstanceState::Crashed { .. } | InstanceState::Unmanaged) {
self.stop().await?;
}
tokio::time::sleep(RESTART_PAUSE).await;
self.start().await
}
fn set_state(&self, state: InstanceState) {
// send_replace never fails even with zero receivers.
let _ = self.state_tx.send_replace(state);
}
}

View File

@@ -17,14 +17,12 @@ pub fn host_going_offline(license: &str) -> String {
format!("corrosion.{license}.host.going_offline") format!("corrosion.{license}.host.going_offline")
} }
/// Phase 1: per-instance command channel (start/stop/restart/rcon/...). /// Per-instance command channel (start/stop/restart/status; rcon et al. to come).
#[allow(dead_code)]
pub fn instance_cmd(license: &str, instance: &str) -> String { pub fn instance_cmd(license: &str, instance: &str) -> String {
format!("corrosion.{license}.{instance}.cmd") format!("corrosion.{license}.{instance}.cmd")
} }
/// Phase 1: per-instance state-change events. /// Per-instance state-change events.
#[allow(dead_code)]
pub fn instance_status(license: &str, instance: &str) -> String { pub fn instance_status(license: &str, instance: &str) -> String {
format!("corrosion.{license}.{instance}.status") format!("corrosion.{license}.{instance}.status")
} }

View File

@@ -65,9 +65,10 @@ pub struct InstanceInfo {
pub game: String, pub game: String,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub label: Option<String>, pub label: Option<String>,
/// Phase 0 states: `configured` (root exists) or `missing_root`. /// Process-managed: running/stopped/starting/stopping/crashed.
/// Phase 1 adds live process states (running/stopped/crashed). /// Unmanaged (no executable configured): configured/missing_root.
pub state: String, pub state: String,
pub uptime_seconds: u64,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub root_disk_free_mb: Option<u64>, pub root_disk_free_mb: Option<u64>,
} }
@@ -125,21 +126,30 @@ pub async fn collect(agent: &Agent, sys: &mut System) -> HeartbeatPayload {
}) })
.collect(); .collect();
let instances = agent let mut instances = Vec::with_capacity(agent.cfg.instances.len());
.cfg for inst in &agent.cfg.instances {
.instances let (state, uptime_seconds) = match agent.supervisors.get(&inst.id) {
.iter() Some(sup) if !matches!(sup.state(), crate::process::InstanceState::Unmanaged) => {
.map(|inst| { (sup.state().as_label().to_string(), sup.uptime_seconds().await)
let exists = inst.root.exists();
InstanceInfo {
id: inst.id.clone(),
game: inst.game.clone(),
label: inst.label.clone(),
state: if exists { "configured" } else { "missing_root" }.to_string(),
root_disk_free_mb: disk_free_for_path(&disks, &inst.root),
} }
}) _ => {
.collect(); let exists = inst.root.exists();
(
if exists { "configured" } else { "missing_root" }.to_string(),
0,
)
}
};
instances.push(InstanceInfo {
id: inst.id.clone(),
game: inst.game.clone(),
label: inst.label.clone(),
state,
uptime_seconds,
root_disk_free_mb: disk_free_for_path(&disks, &inst.root),
});
}
let instances = instances;
HeartbeatPayload { HeartbeatPayload {
schema: 2, schema: 2,

View File

@@ -0,0 +1,107 @@
//! Process supervisor integration tests using real OS processes.
//! Unix-only test doubles (/bin/sleep, /bin/sh) — the supervisor logic under
//! test is platform-shared; Windows-specific stop semantics get covered when
//! the Windows service work lands.
#![cfg(unix)]
use std::path::PathBuf;
use std::time::Duration;
use corrosion_host_agent::config::InstanceConfig;
use corrosion_host_agent::process::{InstanceState, ProcessSupervisor};
fn managed_instance(executable: &str, args: &[&str]) -> InstanceConfig {
InstanceConfig {
id: "test-instance".to_string(),
game: "rust".to_string(),
root: PathBuf::from("/tmp"),
label: None,
executable: Some(PathBuf::from(executable)),
args: args.iter().map(|s| s.to_string()).collect(),
working_dir: None,
}
}
async fn wait_for_state(
sup: &std::sync::Arc<ProcessSupervisor>,
want: fn(&InstanceState) -> bool,
budget: Duration,
) -> InstanceState {
let deadline = tokio::time::Instant::now() + budget;
loop {
let state = sup.state();
if want(&state) {
return state;
}
if tokio::time::Instant::now() > deadline {
panic!("timed out waiting for state; last = {state:?}");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[tokio::test]
async fn start_status_stop_lifecycle() {
let sup = ProcessSupervisor::new(&managed_instance("/bin/sleep", &["300"]));
assert_eq!(sup.state(), InstanceState::Stopped);
sup.start().await.expect("start should succeed");
assert_eq!(sup.state(), InstanceState::Running);
tokio::time::sleep(Duration::from_millis(1100)).await;
assert!(sup.uptime_seconds().await >= 1, "uptime should advance");
// Double-start must be rejected while running.
assert!(sup.start().await.is_err(), "double start must fail");
sup.stop().await.expect("stop should succeed");
let state = wait_for_state(&sup, |s| matches!(s, InstanceState::Stopped), Duration::from_secs(5)).await;
assert_eq!(state, InstanceState::Stopped);
assert_eq!(sup.uptime_seconds().await, 0);
}
#[tokio::test]
async fn unexpected_exit_is_crashed_with_code() {
let sup = ProcessSupervisor::new(&managed_instance("/bin/sh", &["-c", "sleep 0.2; exit 7"]));
sup.start().await.expect("start should succeed");
let state = wait_for_state(
&sup,
|s| matches!(s, InstanceState::Crashed { .. }),
Duration::from_secs(5),
)
.await;
assert_eq!(state, InstanceState::Crashed { exit_code: Some(7) });
}
#[tokio::test]
async fn restart_from_crashed_recovers() {
let sup = ProcessSupervisor::new(&managed_instance("/bin/sh", &["-c", "exit 1"]));
sup.start().await.expect("start should succeed");
wait_for_state(&sup, |s| matches!(s, InstanceState::Crashed { .. }), Duration::from_secs(5)).await;
// Restart from crashed must work (panel "Restart" after a crash).
// Use a long-lived command this time by replacing the supervisor — the
// command is fixed per supervisor, so emulate via a fresh one.
let sup2 = ProcessSupervisor::new(&managed_instance("/bin/sleep", &["300"]));
sup2.restart().await.expect("restart from stopped should start");
assert_eq!(sup2.state(), InstanceState::Running);
sup2.stop().await.expect("cleanup stop");
}
#[tokio::test]
async fn unmanaged_instance_rejects_process_commands() {
let mut cfg = managed_instance("/bin/sleep", &["300"]);
cfg.executable = None;
let sup = ProcessSupervisor::new(&cfg);
assert_eq!(sup.state(), InstanceState::Unmanaged);
assert!(sup.start().await.is_err(), "unmanaged start must fail");
assert!(sup.stop().await.is_err(), "unmanaged stop must fail");
}
#[tokio::test]
async fn missing_executable_fails_cleanly() {
let sup = ProcessSupervisor::new(&managed_instance("/nonexistent/bin/gameserver", &[]));
let err = sup.start().await.expect_err("must fail");
assert!(err.to_string().contains("not found"), "error should say not found: {err}");
assert_eq!(sup.state(), InstanceState::Stopped, "failed start must not leave Starting state");
}