diff --git a/CHANGELOG.md b/CHANGELOG.md index abdd517..b866166 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added (Host-agent Phase 2 — Dune docker-compose adapter — 2026-06-12) + +**`Supervisor` trait abstraction (`corrosion-host-agent`):** +- Introduced `trait Supervisor` (via `async-trait`, the battle-tested ecosystem standard) so the agent can manage games with fundamentally different models behind one wire contract. `ProcessSupervisor` (spawned OS process — Rust/Conan/Soulmask) and the new `DockerComposeSupervisor` (Dune) both implement it; `Agent.supervisors` is now `HashMap>` and the instance command dispatch (`instancecmd::dispatch`) is fully game-agnostic — `start`/`stop`/`restart`/`status` are identical across games. A per-game factory in `main` selects the impl. `InstanceState` moved to the shared `supervisor` module. +- **Architecture call** (per Commander): chose the `dyn` trait over a zero-dependency enum because the Dune references point at *several* future management planes (kubectl, AMP/podman, SSH) — a trait makes each new plane "new struct + impl," no central match to edit. + +**`DockerComposeSupervisor` (Dune: Awakening):** +- Drives `docker compose up -d` / `stop` / `restart` against the instance's compose project (a "battlegroup"), with `-f`/`-p`/single-service support and a configurable compose binary (`docker compose` default, `docker-compose` legacy). New `[instance.docker_compose]` config block (file/project/service/command, all optional). `steam_update` already rejected for Dune (Docker images, no SteamCMD). +- **Scope (first cut):** lifecycle + cached state. Deferred to Phase 3b (with process PID adoption): container crash-detection and state adoption on agent restart (both reconcilable with a `docker compose ps` probe). +- Verified: 6 new docker-compose tests (mock `docker` binary asserting exact invocations + state transitions + failure paths) + the 5 refactored process-supervisor tests; full agent suite 56 tests green, zero warnings. Live verification against a real Dune stack pending the Commander standing one up. + ### Changed (Fleet-driven active game + signed-update CI fix — 2026-06-12) **Frontend — active game follows the deployed fleet:** diff --git a/corrosion-host-agent/Cargo.lock b/corrosion-host-agent/Cargo.lock index 6180f9b..5a7d409 100644 --- a/corrosion-host-agent/Cargo.lock +++ b/corrosion-host-agent/Cargo.lock @@ -110,6 +110,17 @@ dependencies = [ "url", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -276,10 +287,11 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "corrosion-host-agent" -version = "2.0.0-alpha.8" +version = "2.0.0-alpha.9" dependencies = [ "anyhow", "async-nats", + "async-trait", "chrono", "clap", "futures", diff --git a/corrosion-host-agent/Cargo.toml b/corrosion-host-agent/Cargo.toml index 2eaea17..20a220f 100644 --- a/corrosion-host-agent/Cargo.toml +++ b/corrosion-host-agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "corrosion-host-agent" -version = "2.0.0-alpha.8" +version = "2.0.0-alpha.9" edition = "2021" description = "Corrosion Host Agent — multi-game ops runtime for self-hosted game servers" license = "UNLICENSED" @@ -23,6 +23,7 @@ chrono = { version = "0.4", features = ["serde", "clock"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } anyhow = "1" +async-trait = "0.1" clap = { version = "4.5", features = ["derive"] } rand = "0.8" tokio-tungstenite = "0.24" diff --git a/corrosion-host-agent/PROTOCOL.md b/corrosion-host-agent/PROTOCOL.md index 24fb979..daf1cf5 100644 --- a/corrosion-host-agent/PROTOCOL.md +++ b/corrosion-host-agent/PROTOCOL.md @@ -101,8 +101,16 @@ Payload: `{}`. Lifecycle and control for one game instance. +The same `start`/`stop`/`restart`/`status` funcs work for **every** game: the +agent picks a `Supervisor` impl per game — a spawned-process supervisor for +Rust/Conan/Soulmask, a **docker-compose supervisor for Dune** (`docker compose +up -d` / `stop` / `restart` against the instance's compose project, configured +via `[instance.docker_compose]`). The wire contract is identical; only the +management model behind it differs. + Implemented funcs: `start`, `stop` (graceful with 30s budget, then force -kill), `restart`, `status` (returns `state` + `uptime_seconds`), and +kill — process supervisor; Dune maps stop to `docker compose stop`), `restart`, +`status` (returns `state` + `uptime_seconds`), and `rcon` — `{ "func": "rcon", "command": "" }` returns `{ "status": "success", "output": }`. Protocol per game: WebRCON (WebSocket JSON) for rust, Source RCON (Valve TCP) for @@ -118,7 +126,10 @@ streaming progress lines to `corrosion.{license}.{instance}.steam_status` and replying on completion. Planned funcs: `oxide_install` (rust), plus game-adapter-specific -commands (Dune: docker lifecycle, RabbitMQ bus commands, Coriolis reset). +commands (Dune: RabbitMQ admin-bus commands, Coriolis reset, Postgres admin +surface). Dune **lifecycle** is already covered by the shared +start/stop/restart funcs above; container crash-detection and state adoption on +agent restart land with Phase 3b. ### `corrosion.{license_id}.{instance_id}.steam_status` (agent → backend, publish) — LIVE diff --git a/corrosion-host-agent/README.md b/corrosion-host-agent/README.md index a213afe..7e11ae6 100644 --- a/corrosion-host-agent/README.md +++ b/corrosion-host-agent/README.md @@ -20,7 +20,9 @@ instance on that host — Rust, Conan Exiles, Soulmask, Dune: Awakening. crash detection with exit codes, live state in heartbeats (integration-tested with real processes + live-NATS contract test) - [ ] Phase 1b: RCON trait (WebRCON rust / TCP conan+soulmask), SteamCMD, jailed file manager -- [ ] Phase 2: Dune Docker adapter (compose lifecycle, RabbitMQ bus, Postgres admin) +- [~] Phase 2: Dune Docker adapter — **compose lifecycle done** (`docker compose up -d/stop/restart` + via the `Supervisor` trait + `DockerComposeSupervisor`); RabbitMQ admin bus + Postgres admin + surface deferred. Container crash-detection + state adoption on agent restart land with Phase 3b. - [x] Phase 3a: SIGNED self-update — minisign-verified download+swap+relaunch (NATS `update` func); embedded public key; CI signs releases - [ ] Phase 3b: service install (systemd/SCM), PID adoption diff --git a/corrosion-host-agent/agent.example.toml b/corrosion-host-agent/agent.example.toml index 1f82e61..cef1841 100644 --- a/corrosion-host-agent/agent.example.toml +++ b/corrosion-host-agent/agent.example.toml @@ -60,6 +60,24 @@ password = "changeme" # Dune instances do not use SteamCMD (Docker images); the steam_update func # will return a clear error if invoked on a dune instance. +# --- Dune: Awakening (container-managed) --------------------------------- +# Dune runs as a docker-compose stack, not a spawned process — leave +# `executable` unset and add an [instance.docker_compose] block. The agent +# drives `docker compose up -d / stop / restart` for start/stop/restart, and +# `steam_update` is rejected (Dune ships as Docker images). +# +# [[instance]] +# id = "dune-main" +# game = "dune" +# root = "/opt/dune" # directory the compose commands run in +# label = "Arrakis (battlegroup)" +# +# [instance.docker_compose] +# file = "docker-compose.yml" # -f; relative to root. Omit to use compose's discovery +# project = "dune-main" # -p; defaults to the instance id +# service = "gameserver" # limit lifecycle to one service; omit for the whole stack +# command = ["docker", "compose"] # default; use ["docker-compose"] for the legacy binary + [prober] interval_seconds = 300 diff --git a/corrosion-host-agent/src/agent.rs b/corrosion-host-agent/src/agent.rs index 9fa38a4..25f58ba 100644 --- a/corrosion-host-agent/src/agent.rs +++ b/corrosion-host-agent/src/agent.rs @@ -7,16 +7,17 @@ use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use crate::config::Settings; -use crate::process::ProcessSupervisor; use crate::prober::ProbeReport; +use crate::supervisor::Supervisor; pub struct Agent { pub cfg: Settings, pub nats: async_nats::Client, pub started: Instant, pub last_probe: RwLock>, - /// One supervisor per instance (unmanaged instances included — they - /// report `unmanaged` state and reject process commands). - pub supervisors: HashMap>, + /// One supervisor per instance, keyed by instance id. The concrete impl + /// (process vs docker-compose) is chosen per game by the factory in main; + /// every subsystem talks to the `Supervisor` trait only. + pub supervisors: HashMap>, pub shutdown: CancellationToken, } diff --git a/corrosion-host-agent/src/config.rs b/corrosion-host-agent/src/config.rs index 930f4cd..560077e 100644 --- a/corrosion-host-agent/src/config.rs +++ b/corrosion-host-agent/src/config.rs @@ -10,6 +10,7 @@ use serde::Deserialize; use std::collections::HashSet; use std::path::{Path, PathBuf}; +use crate::docker_compose::DockerComposeConfig; use crate::rcon::RconConfig; use crate::steamcmd::SteamcmdConfig; @@ -76,6 +77,10 @@ pub struct InstanceConfig { /// validate = false). #[serde(default)] pub steamcmd: Option, + /// Docker-compose settings for container-managed games (Dune). Absent = + /// defaults apply (compose file in the instance root, project = instance id). + #[serde(default)] + pub docker_compose: Option, } impl InstanceConfig { diff --git a/corrosion-host-agent/src/docker_compose.rs b/corrosion-host-agent/src/docker_compose.rs new file mode 100644 index 0000000..8d34664 --- /dev/null +++ b/corrosion-host-agent/src/docker_compose.rs @@ -0,0 +1,216 @@ +//! Docker-compose instance supervision — the Dune: Awakening adapter. +//! +//! Dune does not ship as a SteamCMD-updated process like Rust/Conan/Soulmask; +//! it runs as Docker container(s) (game server + RabbitMQ broker + Postgres), +//! orchestrated as a compose stack (a "battlegroup"). So Dune lifecycle is +//! `docker compose up -d / stop / restart` against the instance's compose +//! project, not a spawned OS process. This supervisor implements the same +//! [`Supervisor`] trait `ProcessSupervisor` does, so the instance command +//! dispatch is identical — only the management model differs. +//! +//! Scope (first cut): lifecycle + cached state. Two parity items are deferred +//! to Phase 3b alongside process PID adoption: (1) crash detection (containers +//! give us no child handle — a `docker compose ps` poll loop would supply it); +//! (2) state adoption on agent restart (a running stack reports `stopped` until +//! the next lifecycle command). Both are reconcilable with a `ps` probe. +//! +//! Reference: docs/reference-repos/icehunter SETUP_DOCKER.md (the docker +//! control plane this mirrors). + +use std::path::PathBuf; +use std::process::Stdio; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{bail, Context, Result}; +use serde::Deserialize; +use tokio::process::Command; +use tokio::sync::{watch, Mutex}; + +use crate::config::InstanceConfig; +use crate::supervisor::{InstanceState, Supervisor}; + +/// Per-instance docker-compose settings (`[instance.docker_compose]`). All +/// fields optional — defaults cover the common "one compose file in the +/// instance root" case. +#[derive(Debug, Clone, Default, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct DockerComposeConfig { + /// Compose file (`-f`). Relative paths resolve against the run dir. Default: + /// compose's own discovery (docker-compose.yml in the run dir). + #[serde(default)] + pub file: Option, + /// Compose project name (`-p`). Default: the instance id. + #[serde(default)] + pub project: Option, + /// Limit lifecycle ops to one service. Default: every service in the file. + #[serde(default)] + pub service: Option, + /// Override the compose binary invocation. Default: `["docker","compose"]`. + /// Use `["docker-compose"]` for the legacy standalone binary. + #[serde(default)] + pub command: Option>, +} + +struct Inner { + started_at: Option, +} + +pub struct DockerComposeSupervisor { + instance_id: String, + /// Directory the compose commands run in (relative `-f`/file paths resolve + /// against it). + run_dir: PathBuf, + compose_file: Option, + project: String, + service: Option, + /// Compose binary + leading args, e.g. `["docker","compose"]`. + command: Vec, + inner: Mutex, + state_tx: watch::Sender, +} + +impl DockerComposeSupervisor { + pub fn new(cfg: &InstanceConfig) -> Arc { + let dc = cfg.docker_compose.clone().unwrap_or_default(); + let run_dir = cfg + .working_dir + .clone() + .unwrap_or_else(|| cfg.root.clone()); + let command = dc + .command + .filter(|c| !c.is_empty()) + .unwrap_or_else(|| vec!["docker".to_string(), "compose".to_string()]); + let (state_tx, _) = watch::channel(InstanceState::Stopped); + Arc::new(Self { + instance_id: cfg.id.clone(), + run_dir, + compose_file: dc.file, + project: dc.project.unwrap_or_else(|| cfg.id.clone()), + service: dc.service, + command, + inner: Mutex::new(Inner { started_at: None }), + state_tx, + }) + } + + fn set_state(&self, state: InstanceState) { + let _ = self.state_tx.send_replace(state); + } + + /// Run one compose subcommand (`up`/`stop`/`restart`/...), bailing with the + /// captured stderr on non-zero exit. Global flags (`-f`, `-p`) precede the + /// subcommand; the optional single service is appended last. + async fn run(&self, action: &str, action_args: &[&str]) -> Result<()> { + let mut cmd = Command::new(&self.command[0]); + cmd.args(&self.command[1..]); + if let Some(file) = &self.compose_file { + cmd.arg("-f").arg(file); + } + cmd.arg("-p").arg(&self.project); + cmd.arg(action); + cmd.args(action_args); + if let Some(service) = &self.service { + cmd.arg(service); + } + cmd.current_dir(&self.run_dir) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + let output = cmd + .output() + .await + .with_context(|| format!("running `{} {action}` (is docker installed and on PATH?)", self.command.join(" ")))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + let stdout = String::from_utf8_lossy(&output.stdout); + let detail = if !stderr.trim().is_empty() { + stderr.trim() + } else { + stdout.trim() + }; + bail!("compose {action} failed ({}): {detail}", output.status); + } + Ok(()) + } +} + +#[async_trait::async_trait] +impl Supervisor for DockerComposeSupervisor { + fn instance_id(&self) -> &str { + &self.instance_id + } + + fn state(&self) -> InstanceState { + self.state_tx.borrow().clone() + } + + fn watch_state(&self) -> watch::Receiver { + self.state_tx.subscribe() + } + + async fn uptime_seconds(&self) -> u64 { + let inner = self.inner.lock().await; + match (&*self.state_tx.borrow(), inner.started_at) { + (InstanceState::Running, Some(t)) => t.elapsed().as_secs(), + _ => 0, + } + } + + async fn start(self: Arc) -> Result<()> { + if matches!( + *self.state_tx.borrow(), + InstanceState::Running | InstanceState::Starting + ) { + bail!("instance '{}' is already running", self.instance_id); + } + self.set_state(InstanceState::Starting); + match self.run("up", &["-d"]).await { + Ok(()) => { + self.inner.lock().await.started_at = Some(Instant::now()); + self.set_state(InstanceState::Running); + tracing::info!("instance '{}' compose up -d", self.instance_id); + Ok(()) + } + Err(e) => { + self.set_state(InstanceState::Stopped); + Err(e) + } + } + } + + async fn stop(self: Arc) -> Result<()> { + self.set_state(InstanceState::Stopping); + match self.run("stop", &[]).await { + Ok(()) => { + self.inner.lock().await.started_at = None; + self.set_state(InstanceState::Stopped); + tracing::info!("instance '{}' compose stop", self.instance_id); + Ok(()) + } + Err(e) => { + // Stop failed — the stack is most likely still up. + self.set_state(InstanceState::Running); + Err(e) + } + } + } + + async fn restart(self: Arc) -> Result<()> { + self.set_state(InstanceState::Starting); + match self.run("restart", &[]).await { + Ok(()) => { + self.inner.lock().await.started_at = Some(Instant::now()); + self.set_state(InstanceState::Running); + tracing::info!("instance '{}' compose restart", self.instance_id); + Ok(()) + } + Err(e) => { + self.set_state(InstanceState::Stopped); + Err(e) + } + } + } +} diff --git a/corrosion-host-agent/src/instancecmd.rs b/corrosion-host-agent/src/instancecmd.rs index 483dade..9c5a224 100644 --- a/corrosion-host-agent/src/instancecmd.rs +++ b/corrosion-host-agent/src/instancecmd.rs @@ -13,9 +13,9 @@ use serde_json::json; use std::sync::Arc; use crate::agent::Agent; -use crate::process::ProcessSupervisor; use crate::subjects; use crate::steamcmd; +use crate::supervisor::Supervisor; #[derive(Debug, Deserialize)] struct InstanceCommand { @@ -26,8 +26,8 @@ struct InstanceCommand { } /// Forward every supervisor state change as a status event. -pub async fn publish_state_changes(agent: Arc, sup: Arc) { - let subject = subjects::instance_status(&agent.cfg.license_id, &sup.instance_id); +pub async fn publish_state_changes(agent: Arc, sup: Arc) { + let subject = subjects::instance_status(&agent.cfg.license_id, sup.instance_id()); let mut rx = sup.watch_state(); let cancel = agent.shutdown.clone(); @@ -40,13 +40,13 @@ pub async fn publish_state_changes(agent: Arc, sup: Arc { if let Err(e) = agent.nats.publish(subject.clone(), bytes.into()).await { - tracing::warn!("status publish failed for '{}': {e}", sup.instance_id); + tracing::warn!("status publish failed for '{}': {e}", sup.instance_id()); } } Err(e) => tracing::error!("status serialize failed: {e}"), @@ -58,8 +58,8 @@ pub async fn publish_state_changes(agent: Arc, sup: Arc, sup: Arc) -> anyhow::Result<()> { - let subject = subjects::instance_cmd(&agent.cfg.license_id, &sup.instance_id); +pub async fn run(agent: Arc, sup: Arc) -> anyhow::Result<()> { + let subject = subjects::instance_cmd(&agent.cfg.license_id, sup.instance_id()); let mut sub = agent.nats.subscribe(subject.clone()).await?; tracing::info!("instance command handler listening on {subject}"); @@ -74,13 +74,13 @@ pub async fn run(agent: Arc, sup: Arc) -> anyhow::Resu tokio::spawn(async move { handle(agent, sup, msg).await }); } None => { - tracing::warn!("instance command subscription ended for '{}'", sup.instance_id); + tracing::warn!("instance command subscription ended for '{}'", sup.instance_id()); break; } } } _ = cancel.cancelled() => { - tracing::info!("instance command handler stopping for '{}'", sup.instance_id); + tracing::info!("instance command handler stopping for '{}'", sup.instance_id()); break; } } @@ -88,7 +88,7 @@ pub async fn run(agent: Arc, sup: Arc) -> anyhow::Resu Ok(()) } -async fn handle(agent: Arc, sup: Arc, msg: async_nats::Message) { +async fn handle(agent: Arc, sup: Arc, msg: async_nats::Message) { let Some(reply) = msg.reply.clone() else { tracing::warn!("instance command without reply subject ignored"); return; @@ -113,20 +113,22 @@ async fn handle(agent: Arc, sup: Arc, msg: async_nats: async fn dispatch( agent: &Arc, - sup: &Arc, + sup: &Arc, cmd: &InstanceCommand, ) -> serde_json::Value { let func = cmd.func.as_str(); + // start/stop/restart take `self: Arc` (they may hand a clone to a + // monitor task), so clone the Arc before the consuming call. let outcome = match func { - "start" => sup.start().await.map(|_| "starting"), - "stop" => sup.stop().await.map(|_| "stopped"), - "restart" => sup.restart().await.map(|_| "restarted"), + "start" => sup.clone().start().await.map(|_| "starting"), + "stop" => sup.clone().stop().await.map(|_| "stopped"), + "restart" => sup.clone().restart().await.map(|_| "restarted"), "status" => { return json!({ "status": "success", "func": "status", - "instance_id": sup.instance_id, + "instance_id": sup.instance_id(), "state": sup.state(), "uptime_seconds": sup.uptime_seconds().await, }); @@ -139,15 +141,15 @@ async fn dispatch( .cfg .instances .iter() - .find(|i| i.id == sup.instance_id); + .find(|i| i.id == sup.instance_id()); let rcon_cfg = inst_cfg.and_then(|i| i.rcon.as_ref()); let Some(rcon_cfg) = rcon_cfg else { return json!({ "status": "error", "func": "rcon", - "instance_id": sup.instance_id, - "message": format!("instance '{}' has no rcon configured", sup.instance_id), + "instance_id": sup.instance_id(), + "message": format!("instance '{}' has no rcon configured", sup.instance_id()), }); }; @@ -155,7 +157,7 @@ async fn dispatch( return json!({ "status": "error", "func": "rcon", - "instance_id": sup.instance_id, + "instance_id": sup.instance_id(), "message": "rcon func requires a 'command' field", }); }; @@ -165,13 +167,13 @@ async fn dispatch( Ok(output) => json!({ "status": "success", "func": "rcon", - "instance_id": sup.instance_id, + "instance_id": sup.instance_id(), "output": output, }), Err(e) => json!({ "status": "error", "func": "rcon", - "instance_id": sup.instance_id, + "instance_id": sup.instance_id(), "message": format!("{e:#}"), }), }; @@ -181,14 +183,14 @@ async fn dispatch( // settings. The supervisor only carries process-control state, not // the full config, so we reach into agent.cfg.instances here as the // rcon dispatch does. - let inst_cfg = agent.cfg.instances.iter().find(|i| i.id == sup.instance_id); + let inst_cfg = agent.cfg.instances.iter().find(|i| i.id == sup.instance_id()); let Some(inst_cfg) = inst_cfg else { return json!({ "status": "error", "func": "steam_update", - "instance_id": sup.instance_id, - "message": format!("no config found for instance '{}'", sup.instance_id), + "instance_id": sup.instance_id(), + "message": format!("no config found for instance '{}'", sup.instance_id()), }); }; @@ -209,7 +211,7 @@ async fn dispatch( }; let license = agent.cfg.license_id.clone(); - let instance_id = sup.instance_id.clone(); + let instance_id = sup.instance_id().to_string(); let nats = agent.nats.clone(); // Publish each progress line to the steam_status subject. @@ -240,12 +242,12 @@ async fn dispatch( Ok(()) => json!({ "status": "success", "func": "steam_update", - "instance_id": sup.instance_id, + "instance_id": sup.instance_id(), }), Err(e) => json!({ "status": "error", "func": "steam_update", - "instance_id": sup.instance_id, + "instance_id": sup.instance_id(), "message": format!("{e:#}"), }), }; @@ -262,14 +264,14 @@ async fn dispatch( Ok(result) => json!({ "status": "success", "func": func, - "instance_id": sup.instance_id, + "instance_id": sup.instance_id(), "result": result, "state": sup.state(), }), Err(e) => json!({ "status": "error", "func": func, - "instance_id": sup.instance_id, + "instance_id": sup.instance_id(), "message": format!("{e:#}"), }), } diff --git a/corrosion-host-agent/src/lib.rs b/corrosion-host-agent/src/lib.rs index 8143a6a..e494ecc 100644 --- a/corrosion-host-agent/src/lib.rs +++ b/corrosion-host-agent/src/lib.rs @@ -4,6 +4,7 @@ pub mod agent; pub mod bus; pub mod config; +pub mod docker_compose; pub mod filemanager; pub mod hostcmd; pub mod instancecmd; @@ -12,6 +13,7 @@ pub mod process; pub mod rcon; pub mod steamcmd; pub mod subjects; +pub mod supervisor; pub mod telemetry; pub mod update; pub mod version; diff --git a/corrosion-host-agent/src/main.rs b/corrosion-host-agent/src/main.rs index 6aa7cdd..86ea1ef 100644 --- a/corrosion-host-agent/src/main.rs +++ b/corrosion-host-agent/src/main.rs @@ -5,8 +5,8 @@ //! game adapters arrive in Phase 1+ (see PROTOCOL.md). use corrosion_host_agent::{ - agent, bus, config, filemanager, hostcmd, instancecmd, prober, process, subjects, telemetry, - version, + agent, bus, config, docker_compose, filemanager, hostcmd, instancecmd, prober, process, + subjects, supervisor, telemetry, version, }; use anyhow::{Context, Result}; @@ -92,10 +92,20 @@ async fn run(settings: config::Settings) -> Result<()> { let nats = bus::connect(&settings).await?; - let supervisors = settings + // Per-game supervisor factory: container-managed games (Dune) get a + // docker-compose supervisor; everything else is a spawned-process + // supervisor. Both satisfy the `Supervisor` trait, so the rest of the agent + // is game-agnostic. + let supervisors: std::collections::HashMap> = settings .instances .iter() - .map(|inst| (inst.id.clone(), process::ProcessSupervisor::new(inst))) + .map(|inst| { + let sup: Arc = match inst.game.as_str() { + "dune" => docker_compose::DockerComposeSupervisor::new(inst), + _ => process::ProcessSupervisor::new(inst), + }; + (inst.id.clone(), sup) + }) .collect(); let agent = Arc::new(Agent { diff --git a/corrosion-host-agent/src/process.rs b/corrosion-host-agent/src/process.rs index 598cf7e..fdf0592 100644 --- a/corrosion-host-agent/src/process.rs +++ b/corrosion-host-agent/src/process.rs @@ -1,14 +1,16 @@ //! Per-instance game-server process supervision. //! -//! One `ProcessSupervisor` per process-managed instance. Lifecycle mirrors the -//! proven Go agent behavior — graceful SIGTERM with a 30s budget before force -//! kill, a monitor task that reaps the child and records crash-vs-stop — with -//! two fixes the Go version needed: args are a proper list (no naive space -//! splitting), and every state change is observable through a watch channel -//! so the panel gets push events instead of waiting for the next heartbeat. +//! One `ProcessSupervisor` per process-managed instance (Rust/Conan/Soulmask). +//! Lifecycle mirrors the proven Go agent behavior — graceful SIGTERM with a 30s +//! budget before force kill, a monitor task that reaps the child and records +//! crash-vs-stop — with two fixes the Go version needed: args are a proper list +//! (no naive space splitting), and every state change is observable through a +//! watch channel so the panel gets push events instead of waiting for the next +//! heartbeat. Lifecycle control is exposed through the [`Supervisor`] trait so +//! the command dispatch is identical across process- and container-managed +//! games. use anyhow::{bail, Context, Result}; -use serde::Serialize; use std::path::PathBuf; use std::process::Stdio; use std::sync::Arc; @@ -17,39 +19,11 @@ use tokio::process::{Child, Command}; use tokio::sync::{watch, Mutex}; use crate::config::InstanceConfig; +use crate::supervisor::{InstanceState, Supervisor}; const GRACEFUL_STOP_BUDGET: Duration = Duration::from_secs(30); const RESTART_PAUSE: Duration = Duration::from_secs(2); -#[derive(Debug, Clone, PartialEq, Serialize)] -#[serde(rename_all = "snake_case", tag = "state")] -pub enum InstanceState { - /// Not process-managed (no executable configured). - Unmanaged, - Stopped, - Starting, - Running, - Stopping, - /// Process exited without a stop request. - Crashed { - #[serde(skip_serializing_if = "Option::is_none")] - exit_code: Option, - }, -} - -impl InstanceState { - pub fn as_label(&self) -> &'static str { - match self { - InstanceState::Unmanaged => "unmanaged", - InstanceState::Stopped => "stopped", - InstanceState::Starting => "starting", - InstanceState::Running => "running", - InstanceState::Stopping => "stopping", - InstanceState::Crashed { .. } => "crashed", - } - } -} - struct Inner { child: Option, started_at: Option, @@ -59,7 +33,7 @@ struct Inner { } pub struct ProcessSupervisor { - pub instance_id: String, + instance_id: String, executable: Option, args: Vec, working_dir: Option, @@ -90,72 +64,6 @@ impl ProcessSupervisor { }) } - pub fn state(&self) -> InstanceState { - self.state_tx.borrow().clone() - } - - pub fn watch_state(&self) -> watch::Receiver { - self.state_tx.subscribe() - } - - pub async fn uptime_seconds(&self) -> u64 { - let inner = self.inner.lock().await; - match (&*self.state_tx.borrow(), inner.started_at) { - (InstanceState::Running, Some(t)) => t.elapsed().as_secs(), - _ => 0, - } - } - - pub async fn start(self: &Arc) -> Result<()> { - let Some(exe) = self.executable.clone() else { - bail!("instance '{}' has no executable configured", self.instance_id); - }; - if !exe.exists() { - bail!("executable not found: {}", exe.display()); - } - - let mut inner = self.inner.lock().await; - if matches!(*self.state_tx.borrow(), InstanceState::Running | InstanceState::Starting) { - bail!("instance '{}' is already running", self.instance_id); - } - - self.set_state(InstanceState::Starting); - - let workdir = self - .working_dir - .clone() - .or_else(|| exe.parent().map(|p| p.to_path_buf())) - .unwrap_or_else(|| PathBuf::from(".")); - - let child = Command::new(&exe) - .args(&self.args) - .current_dir(&workdir) - .stdin(Stdio::null()) - .stdout(Stdio::inherit()) - .stderr(Stdio::inherit()) - .spawn() - .with_context(|| format!("spawning {}", exe.display()))?; - - let pid = child.id(); - inner.child = Some(child); - inner.started_at = Some(Instant::now()); - inner.stop_requested = false; - drop(inner); - - self.set_state(InstanceState::Running); - tracing::info!( - "instance '{}' started: {} (pid {:?})", - self.instance_id, - exe.display(), - pid - ); - - // Monitor: reap the child and classify the exit. - let sup = Arc::clone(self); - tokio::spawn(async move { sup.monitor().await }); - Ok(()) - } - async fn monitor(self: Arc) { // Take a waiter without holding the lock across the whole child // lifetime: Child::wait needs &mut, so the child stays in inner and @@ -201,7 +109,85 @@ impl ProcessSupervisor { } } - pub async fn stop(self: &Arc) -> Result<()> { + fn set_state(&self, state: InstanceState) { + // send_replace never fails even with zero receivers. + let _ = self.state_tx.send_replace(state); + } +} + +#[async_trait::async_trait] +impl Supervisor for ProcessSupervisor { + fn instance_id(&self) -> &str { + &self.instance_id + } + + fn state(&self) -> InstanceState { + self.state_tx.borrow().clone() + } + + fn watch_state(&self) -> watch::Receiver { + self.state_tx.subscribe() + } + + async fn uptime_seconds(&self) -> u64 { + let inner = self.inner.lock().await; + match (&*self.state_tx.borrow(), inner.started_at) { + (InstanceState::Running, Some(t)) => t.elapsed().as_secs(), + _ => 0, + } + } + + async fn start(self: Arc) -> Result<()> { + let Some(exe) = self.executable.clone() else { + bail!("instance '{}' has no executable configured", self.instance_id); + }; + if !exe.exists() { + bail!("executable not found: {}", exe.display()); + } + + let mut inner = self.inner.lock().await; + if matches!(*self.state_tx.borrow(), InstanceState::Running | InstanceState::Starting) { + bail!("instance '{}' is already running", self.instance_id); + } + + self.set_state(InstanceState::Starting); + + let workdir = self + .working_dir + .clone() + .or_else(|| exe.parent().map(|p| p.to_path_buf())) + .unwrap_or_else(|| PathBuf::from(".")); + + let child = Command::new(&exe) + .args(&self.args) + .current_dir(&workdir) + .stdin(Stdio::null()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .spawn() + .with_context(|| format!("spawning {}", exe.display()))?; + + let pid = child.id(); + inner.child = Some(child); + inner.started_at = Some(Instant::now()); + inner.stop_requested = false; + drop(inner); + + self.set_state(InstanceState::Running); + tracing::info!( + "instance '{}' started: {} (pid {:?})", + self.instance_id, + exe.display(), + pid + ); + + // Monitor: reap the child and classify the exit. + let sup = Arc::clone(&self); + tokio::spawn(async move { sup.monitor().await }); + Ok(()) + } + + async fn stop(self: Arc) -> Result<()> { let mut inner = self.inner.lock().await; if inner.child.is_none() { bail!("instance '{}' is not running", self.instance_id); @@ -263,16 +249,14 @@ impl ProcessSupervisor { Ok(()) } - pub async fn restart(self: &Arc) -> Result<()> { - if !matches!(*self.state_tx.borrow(), InstanceState::Stopped | InstanceState::Crashed { .. } | InstanceState::Unmanaged) { - self.stop().await?; + async fn restart(self: Arc) -> Result<()> { + if !matches!( + *self.state_tx.borrow(), + InstanceState::Stopped | InstanceState::Crashed { .. } | InstanceState::Unmanaged + ) { + self.clone().stop().await?; } tokio::time::sleep(RESTART_PAUSE).await; self.start().await } - - fn set_state(&self, state: InstanceState) { - // send_replace never fails even with zero receivers. - let _ = self.state_tx.send_replace(state); - } } diff --git a/corrosion-host-agent/src/supervisor.rs b/corrosion-host-agent/src/supervisor.rs new file mode 100644 index 0000000..2eabc78 --- /dev/null +++ b/corrosion-host-agent/src/supervisor.rs @@ -0,0 +1,80 @@ +//! The supervision abstraction. +//! +//! A `Supervisor` owns the lifecycle of one game instance. Different games are +//! managed in fundamentally different ways — Rust/Conan/Soulmask are spawned OS +//! processes ([`crate::process::ProcessSupervisor`]); Dune is a docker-compose +//! stack ([`crate::docker_compose::DockerComposeSupervisor`]); future planes +//! (kubectl, AMP/podman, SSH) will be their own impls. The instance command +//! dispatch (`instancecmd::dispatch`) talks only to this trait, so it never +//! learns which management model is behind a given instance. +//! +//! Trait objects (`Arc`) need object-safe, dynamically +//! dispatchable async methods; native `async fn` in traits is not yet +//! dyn-compatible, so we use `#[async_trait]` (the battle-tested ecosystem +//! standard) to box the returned futures. The cost — one heap alloc per +//! lifecycle call — is irrelevant for start/stop/restart, which happen seconds +//! to minutes apart. + +use std::sync::Arc; + +use anyhow::Result; +use serde::Serialize; +use tokio::sync::watch; + +/// Observable lifecycle state of one instance. Shared vocabulary across every +/// supervisor impl; serialized verbatim into heartbeats and status events +/// (`{"state":"running", ...}`). +#[derive(Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "snake_case", tag = "state")] +pub enum InstanceState { + /// Not lifecycle-managed (a process instance with no executable, etc.). + Unmanaged, + Stopped, + Starting, + Running, + Stopping, + /// Exited/died without a stop request. + Crashed { + #[serde(skip_serializing_if = "Option::is_none")] + exit_code: Option, + }, +} + +impl InstanceState { + pub fn as_label(&self) -> &'static str { + match self { + InstanceState::Unmanaged => "unmanaged", + InstanceState::Stopped => "stopped", + InstanceState::Starting => "starting", + InstanceState::Running => "running", + InstanceState::Stopping => "stopping", + InstanceState::Crashed { .. } => "crashed", + } + } +} + +/// Lifecycle control + state observation for one instance. +/// +/// `start`/`stop`/`restart` take `self: Arc` so an impl can hand a clone +/// to a spawned monitor task; callers hold an `Arc` and +/// `clone()` before each call. `watch_state` exposes the same channel the +/// status-event publisher drains, so panel push events stay decoupled from the +/// heartbeat cadence. +#[async_trait::async_trait] +pub trait Supervisor: Send + Sync { + /// The instance slug (a NATS subject segment). + fn instance_id(&self) -> &str; + + /// Current cached state (cheap; no I/O). + fn state(&self) -> InstanceState; + + /// Subscribe to state transitions. + fn watch_state(&self) -> watch::Receiver; + + /// Seconds since the instance entered `Running` (0 otherwise). + async fn uptime_seconds(&self) -> u64; + + async fn start(self: Arc) -> Result<()>; + async fn stop(self: Arc) -> Result<()>; + async fn restart(self: Arc) -> Result<()>; +} diff --git a/corrosion-host-agent/src/telemetry.rs b/corrosion-host-agent/src/telemetry.rs index 5ea6d5c..fcb3b8e 100644 --- a/corrosion-host-agent/src/telemetry.rs +++ b/corrosion-host-agent/src/telemetry.rs @@ -129,7 +129,7 @@ pub async fn collect(agent: &Agent, sys: &mut System) -> HeartbeatPayload { let mut instances = Vec::with_capacity(agent.cfg.instances.len()); for inst in &agent.cfg.instances { let (state, uptime_seconds) = match agent.supervisors.get(&inst.id) { - Some(sup) if !matches!(sup.state(), crate::process::InstanceState::Unmanaged) => { + Some(sup) if !matches!(sup.state(), crate::supervisor::InstanceState::Unmanaged) => { (sup.state().as_label().to_string(), sup.uptime_seconds().await) } _ => { diff --git a/corrosion-host-agent/tests/docker_compose.rs b/corrosion-host-agent/tests/docker_compose.rs new file mode 100644 index 0000000..06dffc8 --- /dev/null +++ b/corrosion-host-agent/tests/docker_compose.rs @@ -0,0 +1,156 @@ +//! DockerComposeSupervisor tests. A fake `docker` script records the exact +//! arguments it was invoked with and returns a controllable exit code, so we +//! assert the compose invocations + state transitions with no real Docker +//! daemon — the same mock-the-external-binary approach the steamcmd tests use. +#![cfg(unix)] + +use std::os::unix::fs::PermissionsExt; +use std::path::{Path, PathBuf}; + +use corrosion_host_agent::config::InstanceConfig; +use corrosion_host_agent::docker_compose::{DockerComposeConfig, DockerComposeSupervisor}; +use corrosion_host_agent::supervisor::{InstanceState, Supervisor}; + +/// Write a fake `docker` executable that appends its args (space-joined) to +/// `args_log` and exits with the integer in `exit_file` (0 if absent). +fn fake_docker(dir: &Path, args_log: &Path, exit_file: &Path) -> PathBuf { + let script = dir.join("fakedocker"); + let body = format!( + "#!/bin/sh\nprintf '%s\\n' \"$*\" >> '{}'\nexit \"$(cat '{}' 2>/dev/null || echo 0)\"\n", + args_log.display(), + exit_file.display(), + ); + std::fs::write(&script, body).unwrap(); + let mut perms = std::fs::metadata(&script).unwrap().permissions(); + perms.set_mode(0o755); + std::fs::set_permissions(&script, perms).unwrap(); + script +} + +fn dune_instance(command: Vec, service: Option) -> InstanceConfig { + InstanceConfig { + id: "dune-main".to_string(), + game: "dune".to_string(), + root: PathBuf::from("/tmp"), + label: None, + executable: None, + args: vec![], + working_dir: None, + rcon: None, + steamcmd: None, + docker_compose: Some(DockerComposeConfig { + file: Some(PathBuf::from("docker-compose.yml")), + project: Some("duneproj".to_string()), + service, + command: Some(command), + }), + } +} + +#[tokio::test] +async fn start_runs_compose_up_detached_and_sets_running() { + let dir = tempfile::tempdir().unwrap(); + let args_log = dir.path().join("args.log"); + let exit_file = dir.path().join("exit"); + let docker = fake_docker(dir.path(), &args_log, &exit_file); + + let sup = DockerComposeSupervisor::new(&dune_instance( + vec![docker.to_string_lossy().into_owned()], + None, + )); + assert_eq!(sup.state(), InstanceState::Stopped); + + sup.clone().start().await.expect("compose up should succeed"); + assert_eq!(sup.state(), InstanceState::Running); + + let logged = std::fs::read_to_string(&args_log).unwrap(); + assert!(logged.contains("up -d"), "expected `up -d`; got: {logged}"); + assert!(logged.contains("-p duneproj"), "expected project flag; got: {logged}"); + assert!(logged.contains("-f docker-compose.yml"), "expected file flag; got: {logged}"); +} + +#[tokio::test] +async fn stop_runs_compose_stop_and_sets_stopped() { + let dir = tempfile::tempdir().unwrap(); + let args_log = dir.path().join("args.log"); + let exit_file = dir.path().join("exit"); + let docker = fake_docker(dir.path(), &args_log, &exit_file); + + let sup = DockerComposeSupervisor::new(&dune_instance( + vec![docker.to_string_lossy().into_owned()], + None, + )); + sup.clone().start().await.expect("up"); + sup.clone().stop().await.expect("compose stop should succeed"); + assert_eq!(sup.state(), InstanceState::Stopped); + assert_eq!(sup.uptime_seconds().await, 0); + + let logged = std::fs::read_to_string(&args_log).unwrap(); + assert!(logged.lines().any(|l| l.contains("stop")), "expected a `stop` call; got: {logged}"); +} + +#[tokio::test] +async fn restart_runs_compose_restart() { + let dir = tempfile::tempdir().unwrap(); + let args_log = dir.path().join("args.log"); + let exit_file = dir.path().join("exit"); + let docker = fake_docker(dir.path(), &args_log, &exit_file); + + let sup = DockerComposeSupervisor::new(&dune_instance( + vec![docker.to_string_lossy().into_owned()], + None, + )); + sup.clone().restart().await.expect("compose restart should succeed"); + assert_eq!(sup.state(), InstanceState::Running); + + let logged = std::fs::read_to_string(&args_log).unwrap(); + assert!(logged.contains("restart"), "expected `restart`; got: {logged}"); +} + +#[tokio::test] +async fn single_service_is_targeted() { + let dir = tempfile::tempdir().unwrap(); + let args_log = dir.path().join("args.log"); + let exit_file = dir.path().join("exit"); + let docker = fake_docker(dir.path(), &args_log, &exit_file); + + let sup = DockerComposeSupervisor::new(&dune_instance( + vec![docker.to_string_lossy().into_owned()], + Some("gameserver".to_string()), + )); + sup.clone().start().await.expect("up"); + + let logged = std::fs::read_to_string(&args_log).unwrap(); + assert!( + logged.contains("up -d gameserver"), + "service must be appended after `up -d`; got: {logged}" + ); +} + +#[tokio::test] +async fn compose_failure_errors_and_reverts_state() { + let dir = tempfile::tempdir().unwrap(); + let args_log = dir.path().join("args.log"); + let exit_file = dir.path().join("exit"); + std::fs::write(&exit_file, "1").unwrap(); // make the fake docker fail + let docker = fake_docker(dir.path(), &args_log, &exit_file); + + let sup = DockerComposeSupervisor::new(&dune_instance( + vec![docker.to_string_lossy().into_owned()], + None, + )); + let err = sup.clone().start().await.expect_err("nonzero compose exit must fail"); + assert!(err.to_string().contains("compose up failed"), "got: {err}"); + assert_eq!(sup.state(), InstanceState::Stopped, "failed start must revert to Stopped"); +} + +#[tokio::test] +async fn missing_docker_binary_errors_cleanly() { + let sup = DockerComposeSupervisor::new(&dune_instance( + vec!["/nonexistent/docker-xyz".to_string()], + None, + )); + let err = sup.clone().start().await.expect_err("missing docker must fail"); + assert!(err.to_string().contains("docker"), "error should mention docker: {err}"); + assert_eq!(sup.state(), InstanceState::Stopped); +} diff --git a/corrosion-host-agent/tests/supervisor.rs b/corrosion-host-agent/tests/supervisor.rs index 62d31ee..b923930 100644 --- a/corrosion-host-agent/tests/supervisor.rs +++ b/corrosion-host-agent/tests/supervisor.rs @@ -8,7 +8,8 @@ use std::path::PathBuf; use std::time::Duration; use corrosion_host_agent::config::InstanceConfig; -use corrosion_host_agent::process::{InstanceState, ProcessSupervisor}; +use corrosion_host_agent::process::ProcessSupervisor; +use corrosion_host_agent::supervisor::{InstanceState, Supervisor}; fn managed_instance(executable: &str, args: &[&str]) -> InstanceConfig { InstanceConfig { @@ -21,6 +22,7 @@ fn managed_instance(executable: &str, args: &[&str]) -> InstanceConfig { working_dir: None, rcon: None, steamcmd: None, + docker_compose: None, } } @@ -47,15 +49,15 @@ async fn start_status_stop_lifecycle() { let sup = ProcessSupervisor::new(&managed_instance("/bin/sleep", &["300"])); assert_eq!(sup.state(), InstanceState::Stopped); - sup.start().await.expect("start should succeed"); + sup.clone().start().await.expect("start should succeed"); assert_eq!(sup.state(), InstanceState::Running); tokio::time::sleep(Duration::from_millis(1100)).await; assert!(sup.uptime_seconds().await >= 1, "uptime should advance"); // Double-start must be rejected while running. - assert!(sup.start().await.is_err(), "double start must fail"); + assert!(sup.clone().start().await.is_err(), "double start must fail"); - sup.stop().await.expect("stop should succeed"); + sup.clone().stop().await.expect("stop should succeed"); let state = wait_for_state(&sup, |s| matches!(s, InstanceState::Stopped), Duration::from_secs(5)).await; assert_eq!(state, InstanceState::Stopped); assert_eq!(sup.uptime_seconds().await, 0); @@ -64,7 +66,7 @@ async fn start_status_stop_lifecycle() { #[tokio::test] async fn unexpected_exit_is_crashed_with_code() { let sup = ProcessSupervisor::new(&managed_instance("/bin/sh", &["-c", "sleep 0.2; exit 7"])); - sup.start().await.expect("start should succeed"); + sup.clone().start().await.expect("start should succeed"); let state = wait_for_state( &sup, @@ -78,16 +80,16 @@ async fn unexpected_exit_is_crashed_with_code() { #[tokio::test] async fn restart_from_crashed_recovers() { let sup = ProcessSupervisor::new(&managed_instance("/bin/sh", &["-c", "exit 1"])); - sup.start().await.expect("start should succeed"); + sup.clone().start().await.expect("start should succeed"); wait_for_state(&sup, |s| matches!(s, InstanceState::Crashed { .. }), Duration::from_secs(5)).await; // Restart from crashed must work (panel "Restart" after a crash). // Use a long-lived command this time by replacing the supervisor — the // command is fixed per supervisor, so emulate via a fresh one. let sup2 = ProcessSupervisor::new(&managed_instance("/bin/sleep", &["300"])); - sup2.restart().await.expect("restart from stopped should start"); + sup2.clone().restart().await.expect("restart from stopped should start"); assert_eq!(sup2.state(), InstanceState::Running); - sup2.stop().await.expect("cleanup stop"); + sup2.clone().stop().await.expect("cleanup stop"); } #[tokio::test] @@ -96,14 +98,14 @@ async fn unmanaged_instance_rejects_process_commands() { cfg.executable = None; let sup = ProcessSupervisor::new(&cfg); assert_eq!(sup.state(), InstanceState::Unmanaged); - assert!(sup.start().await.is_err(), "unmanaged start must fail"); - assert!(sup.stop().await.is_err(), "unmanaged stop must fail"); + assert!(sup.clone().start().await.is_err(), "unmanaged start must fail"); + assert!(sup.clone().stop().await.is_err(), "unmanaged stop must fail"); } #[tokio::test] async fn missing_executable_fails_cleanly() { let sup = ProcessSupervisor::new(&managed_instance("/nonexistent/bin/gameserver", &[])); - let err = sup.start().await.expect_err("must fail"); + let err = sup.clone().start().await.expect_err("must fail"); assert!(err.to_string().contains("not found"), "error should say not found: {err}"); assert_eq!(sup.state(), InstanceState::Stopped, "failed start must not leave Starting state"); }