feat(host-agent): Phase 2 — Dune docker-compose adapter via Supervisor trait
Introduce a Supervisor trait (async-trait) so the agent manages games with different models behind one wire contract. ProcessSupervisor (spawned process: rust/conan/soulmask) and the new DockerComposeSupervisor (dune) both impl it; Agent.supervisors is now HashMap<String, Arc<dyn Supervisor>> and instancecmd dispatch is game-agnostic — start/stop/restart/status identical across games, selected by a per-game factory in main. InstanceState moved to the shared supervisor module. DockerComposeSupervisor drives against the instance's compose project, with -f/-p/single-service support and a configurable compose binary. New [instance.docker_compose] config block. First cut = lifecycle + cached state; container crash-detection + restart adoption deferred to Phase 3b (reconcilable with ). Trait choice (dyn over enum) per Commander: scales to future planes (kubectl, AMP/podman, SSH) as new struct+impl, no central match. 56 tests green (6 new docker-compose mock-binary tests + 5 refactored process tests), zero warnings. Live verification pending a real Dune stack. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
11
CHANGELOG.md
11
CHANGELOG.md
@@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added (Host-agent Phase 2 — Dune docker-compose adapter — 2026-06-12)
|
||||
|
||||
**`Supervisor` trait abstraction (`corrosion-host-agent`):**
|
||||
- Introduced `trait Supervisor` (via `async-trait`, the battle-tested ecosystem standard) so the agent can manage games with fundamentally different models behind one wire contract. `ProcessSupervisor` (spawned OS process — Rust/Conan/Soulmask) and the new `DockerComposeSupervisor` (Dune) both implement it; `Agent.supervisors` is now `HashMap<String, Arc<dyn Supervisor>>` and the instance command dispatch (`instancecmd::dispatch`) is fully game-agnostic — `start`/`stop`/`restart`/`status` are identical across games. A per-game factory in `main` selects the impl. `InstanceState` moved to the shared `supervisor` module.
|
||||
- **Architecture call** (per Commander): chose the `dyn` trait over a zero-dependency enum because the Dune references point at *several* future management planes (kubectl, AMP/podman, SSH) — a trait makes each new plane "new struct + impl," no central match to edit.
|
||||
|
||||
**`DockerComposeSupervisor` (Dune: Awakening):**
|
||||
- Drives `docker compose up -d` / `stop` / `restart` against the instance's compose project (a "battlegroup"), with `-f`/`-p`/single-service support and a configurable compose binary (`docker compose` default, `docker-compose` legacy). New `[instance.docker_compose]` config block (file/project/service/command, all optional). `steam_update` already rejected for Dune (Docker images, no SteamCMD).
|
||||
- **Scope (first cut):** lifecycle + cached state. Deferred to Phase 3b (with process PID adoption): container crash-detection and state adoption on agent restart (both reconcilable with a `docker compose ps` probe).
|
||||
- Verified: 6 new docker-compose tests (mock `docker` binary asserting exact invocations + state transitions + failure paths) + the 5 refactored process-supervisor tests; full agent suite 56 tests green, zero warnings. Live verification against a real Dune stack pending the Commander standing one up.
|
||||
|
||||
### Changed (Fleet-driven active game + signed-update CI fix — 2026-06-12)
|
||||
|
||||
**Frontend — active game follows the deployed fleet:**
|
||||
|
||||
14
corrosion-host-agent/Cargo.lock
generated
14
corrosion-host-agent/Cargo.lock
generated
@@ -110,6 +110,17 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.89"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
@@ -276,10 +287,11 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
||||
|
||||
[[package]]
|
||||
name = "corrosion-host-agent"
|
||||
version = "2.0.0-alpha.8"
|
||||
version = "2.0.0-alpha.9"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-nats",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"clap",
|
||||
"futures",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "corrosion-host-agent"
|
||||
version = "2.0.0-alpha.8"
|
||||
version = "2.0.0-alpha.9"
|
||||
edition = "2021"
|
||||
description = "Corrosion Host Agent — multi-game ops runtime for self-hosted game servers"
|
||||
license = "UNLICENSED"
|
||||
@@ -23,6 +23,7 @@ chrono = { version = "0.4", features = ["serde", "clock"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
anyhow = "1"
|
||||
async-trait = "0.1"
|
||||
clap = { version = "4.5", features = ["derive"] }
|
||||
rand = "0.8"
|
||||
tokio-tungstenite = "0.24"
|
||||
|
||||
@@ -101,8 +101,16 @@ Payload: `{}`.
|
||||
|
||||
Lifecycle and control for one game instance.
|
||||
|
||||
The same `start`/`stop`/`restart`/`status` funcs work for **every** game: the
|
||||
agent picks a `Supervisor` impl per game — a spawned-process supervisor for
|
||||
Rust/Conan/Soulmask, a **docker-compose supervisor for Dune** (`docker compose
|
||||
up -d` / `stop` / `restart` against the instance's compose project, configured
|
||||
via `[instance.docker_compose]`). The wire contract is identical; only the
|
||||
management model behind it differs.
|
||||
|
||||
Implemented funcs: `start`, `stop` (graceful with 30s budget, then force
|
||||
kill), `restart`, `status` (returns `state` + `uptime_seconds`), and
|
||||
kill — process supervisor; Dune maps stop to `docker compose stop`), `restart`,
|
||||
`status` (returns `state` + `uptime_seconds`), and
|
||||
`rcon` — `{ "func": "rcon", "command": "<console command>" }` returns
|
||||
`{ "status": "success", "output": <server response> }`. Protocol per game:
|
||||
WebRCON (WebSocket JSON) for rust, Source RCON (Valve TCP) for
|
||||
@@ -118,7 +126,10 @@ streaming progress lines to `corrosion.{license}.{instance}.steam_status`
|
||||
and replying on completion.
|
||||
|
||||
Planned funcs: `oxide_install` (rust), plus game-adapter-specific
|
||||
commands (Dune: docker lifecycle, RabbitMQ bus commands, Coriolis reset).
|
||||
commands (Dune: RabbitMQ admin-bus commands, Coriolis reset, Postgres admin
|
||||
surface). Dune **lifecycle** is already covered by the shared
|
||||
start/stop/restart funcs above; container crash-detection and state adoption on
|
||||
agent restart land with Phase 3b.
|
||||
|
||||
### `corrosion.{license_id}.{instance_id}.steam_status` (agent → backend, publish) — LIVE
|
||||
|
||||
|
||||
@@ -20,7 +20,9 @@ instance on that host — Rust, Conan Exiles, Soulmask, Dune: Awakening.
|
||||
crash detection with exit codes, live state in heartbeats
|
||||
(integration-tested with real processes + live-NATS contract test)
|
||||
- [ ] Phase 1b: RCON trait (WebRCON rust / TCP conan+soulmask), SteamCMD, jailed file manager
|
||||
- [ ] Phase 2: Dune Docker adapter (compose lifecycle, RabbitMQ bus, Postgres admin)
|
||||
- [~] Phase 2: Dune Docker adapter — **compose lifecycle done** (`docker compose up -d/stop/restart`
|
||||
via the `Supervisor` trait + `DockerComposeSupervisor`); RabbitMQ admin bus + Postgres admin
|
||||
surface deferred. Container crash-detection + state adoption on agent restart land with Phase 3b.
|
||||
- [x] Phase 3a: SIGNED self-update — minisign-verified download+swap+relaunch (NATS `update` func); embedded public key; CI signs releases
|
||||
- [ ] Phase 3b: service install (systemd/SCM), PID adoption
|
||||
|
||||
|
||||
@@ -60,6 +60,24 @@ password = "changeme"
|
||||
# Dune instances do not use SteamCMD (Docker images); the steam_update func
|
||||
# will return a clear error if invoked on a dune instance.
|
||||
|
||||
# --- Dune: Awakening (container-managed) ---------------------------------
|
||||
# Dune runs as a docker-compose stack, not a spawned process — leave
|
||||
# `executable` unset and add an [instance.docker_compose] block. The agent
|
||||
# drives `docker compose up -d / stop / restart` for start/stop/restart, and
|
||||
# `steam_update` is rejected (Dune ships as Docker images).
|
||||
#
|
||||
# [[instance]]
|
||||
# id = "dune-main"
|
||||
# game = "dune"
|
||||
# root = "/opt/dune" # directory the compose commands run in
|
||||
# label = "Arrakis (battlegroup)"
|
||||
#
|
||||
# [instance.docker_compose]
|
||||
# file = "docker-compose.yml" # -f; relative to root. Omit to use compose's discovery
|
||||
# project = "dune-main" # -p; defaults to the instance id
|
||||
# service = "gameserver" # limit lifecycle to one service; omit for the whole stack
|
||||
# command = ["docker", "compose"] # default; use ["docker-compose"] for the legacy binary
|
||||
|
||||
[prober]
|
||||
interval_seconds = 300
|
||||
|
||||
|
||||
@@ -7,16 +7,17 @@ use tokio::sync::RwLock;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::Settings;
|
||||
use crate::process::ProcessSupervisor;
|
||||
use crate::prober::ProbeReport;
|
||||
use crate::supervisor::Supervisor;
|
||||
|
||||
pub struct Agent {
|
||||
pub cfg: Settings,
|
||||
pub nats: async_nats::Client,
|
||||
pub started: Instant,
|
||||
pub last_probe: RwLock<Option<ProbeReport>>,
|
||||
/// One supervisor per instance (unmanaged instances included — they
|
||||
/// report `unmanaged` state and reject process commands).
|
||||
pub supervisors: HashMap<String, Arc<ProcessSupervisor>>,
|
||||
/// One supervisor per instance, keyed by instance id. The concrete impl
|
||||
/// (process vs docker-compose) is chosen per game by the factory in main;
|
||||
/// every subsystem talks to the `Supervisor` trait only.
|
||||
pub supervisors: HashMap<String, Arc<dyn Supervisor>>,
|
||||
pub shutdown: CancellationToken,
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use serde::Deserialize;
|
||||
use std::collections::HashSet;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use crate::docker_compose::DockerComposeConfig;
|
||||
use crate::rcon::RconConfig;
|
||||
use crate::steamcmd::SteamcmdConfig;
|
||||
|
||||
@@ -76,6 +77,10 @@ pub struct InstanceConfig {
|
||||
/// validate = false).
|
||||
#[serde(default)]
|
||||
pub steamcmd: Option<SteamcmdConfig>,
|
||||
/// Docker-compose settings for container-managed games (Dune). Absent =
|
||||
/// defaults apply (compose file in the instance root, project = instance id).
|
||||
#[serde(default)]
|
||||
pub docker_compose: Option<DockerComposeConfig>,
|
||||
}
|
||||
|
||||
impl InstanceConfig {
|
||||
|
||||
216
corrosion-host-agent/src/docker_compose.rs
Normal file
216
corrosion-host-agent/src/docker_compose.rs
Normal file
@@ -0,0 +1,216 @@
|
||||
//! Docker-compose instance supervision — the Dune: Awakening adapter.
|
||||
//!
|
||||
//! Dune does not ship as a SteamCMD-updated process like Rust/Conan/Soulmask;
|
||||
//! it runs as Docker container(s) (game server + RabbitMQ broker + Postgres),
|
||||
//! orchestrated as a compose stack (a "battlegroup"). So Dune lifecycle is
|
||||
//! `docker compose up -d / stop / restart` against the instance's compose
|
||||
//! project, not a spawned OS process. This supervisor implements the same
|
||||
//! [`Supervisor`] trait `ProcessSupervisor` does, so the instance command
|
||||
//! dispatch is identical — only the management model differs.
|
||||
//!
|
||||
//! Scope (first cut): lifecycle + cached state. Two parity items are deferred
|
||||
//! to Phase 3b alongside process PID adoption: (1) crash detection (containers
|
||||
//! give us no child handle — a `docker compose ps` poll loop would supply it);
|
||||
//! (2) state adoption on agent restart (a running stack reports `stopped` until
|
||||
//! the next lifecycle command). Both are reconcilable with a `ps` probe.
|
||||
//!
|
||||
//! Reference: docs/reference-repos/icehunter SETUP_DOCKER.md (the docker
|
||||
//! control plane this mirrors).
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use serde::Deserialize;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::{watch, Mutex};
|
||||
|
||||
use crate::config::InstanceConfig;
|
||||
use crate::supervisor::{InstanceState, Supervisor};
|
||||
|
||||
/// Per-instance docker-compose settings (`[instance.docker_compose]`). All
|
||||
/// fields optional — defaults cover the common "one compose file in the
|
||||
/// instance root" case.
|
||||
#[derive(Debug, Clone, Default, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct DockerComposeConfig {
|
||||
/// Compose file (`-f`). Relative paths resolve against the run dir. Default:
|
||||
/// compose's own discovery (docker-compose.yml in the run dir).
|
||||
#[serde(default)]
|
||||
pub file: Option<PathBuf>,
|
||||
/// Compose project name (`-p`). Default: the instance id.
|
||||
#[serde(default)]
|
||||
pub project: Option<String>,
|
||||
/// Limit lifecycle ops to one service. Default: every service in the file.
|
||||
#[serde(default)]
|
||||
pub service: Option<String>,
|
||||
/// Override the compose binary invocation. Default: `["docker","compose"]`.
|
||||
/// Use `["docker-compose"]` for the legacy standalone binary.
|
||||
#[serde(default)]
|
||||
pub command: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
started_at: Option<Instant>,
|
||||
}
|
||||
|
||||
pub struct DockerComposeSupervisor {
|
||||
instance_id: String,
|
||||
/// Directory the compose commands run in (relative `-f`/file paths resolve
|
||||
/// against it).
|
||||
run_dir: PathBuf,
|
||||
compose_file: Option<PathBuf>,
|
||||
project: String,
|
||||
service: Option<String>,
|
||||
/// Compose binary + leading args, e.g. `["docker","compose"]`.
|
||||
command: Vec<String>,
|
||||
inner: Mutex<Inner>,
|
||||
state_tx: watch::Sender<InstanceState>,
|
||||
}
|
||||
|
||||
impl DockerComposeSupervisor {
|
||||
pub fn new(cfg: &InstanceConfig) -> Arc<Self> {
|
||||
let dc = cfg.docker_compose.clone().unwrap_or_default();
|
||||
let run_dir = cfg
|
||||
.working_dir
|
||||
.clone()
|
||||
.unwrap_or_else(|| cfg.root.clone());
|
||||
let command = dc
|
||||
.command
|
||||
.filter(|c| !c.is_empty())
|
||||
.unwrap_or_else(|| vec!["docker".to_string(), "compose".to_string()]);
|
||||
let (state_tx, _) = watch::channel(InstanceState::Stopped);
|
||||
Arc::new(Self {
|
||||
instance_id: cfg.id.clone(),
|
||||
run_dir,
|
||||
compose_file: dc.file,
|
||||
project: dc.project.unwrap_or_else(|| cfg.id.clone()),
|
||||
service: dc.service,
|
||||
command,
|
||||
inner: Mutex::new(Inner { started_at: None }),
|
||||
state_tx,
|
||||
})
|
||||
}
|
||||
|
||||
fn set_state(&self, state: InstanceState) {
|
||||
let _ = self.state_tx.send_replace(state);
|
||||
}
|
||||
|
||||
/// Run one compose subcommand (`up`/`stop`/`restart`/...), bailing with the
|
||||
/// captured stderr on non-zero exit. Global flags (`-f`, `-p`) precede the
|
||||
/// subcommand; the optional single service is appended last.
|
||||
async fn run(&self, action: &str, action_args: &[&str]) -> Result<()> {
|
||||
let mut cmd = Command::new(&self.command[0]);
|
||||
cmd.args(&self.command[1..]);
|
||||
if let Some(file) = &self.compose_file {
|
||||
cmd.arg("-f").arg(file);
|
||||
}
|
||||
cmd.arg("-p").arg(&self.project);
|
||||
cmd.arg(action);
|
||||
cmd.args(action_args);
|
||||
if let Some(service) = &self.service {
|
||||
cmd.arg(service);
|
||||
}
|
||||
cmd.current_dir(&self.run_dir)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
let output = cmd
|
||||
.output()
|
||||
.await
|
||||
.with_context(|| format!("running `{} {action}` (is docker installed and on PATH?)", self.command.join(" ")))?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let detail = if !stderr.trim().is_empty() {
|
||||
stderr.trim()
|
||||
} else {
|
||||
stdout.trim()
|
||||
};
|
||||
bail!("compose {action} failed ({}): {detail}", output.status);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Supervisor for DockerComposeSupervisor {
|
||||
fn instance_id(&self) -> &str {
|
||||
&self.instance_id
|
||||
}
|
||||
|
||||
fn state(&self) -> InstanceState {
|
||||
self.state_tx.borrow().clone()
|
||||
}
|
||||
|
||||
fn watch_state(&self) -> watch::Receiver<InstanceState> {
|
||||
self.state_tx.subscribe()
|
||||
}
|
||||
|
||||
async fn uptime_seconds(&self) -> u64 {
|
||||
let inner = self.inner.lock().await;
|
||||
match (&*self.state_tx.borrow(), inner.started_at) {
|
||||
(InstanceState::Running, Some(t)) => t.elapsed().as_secs(),
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(self: Arc<Self>) -> Result<()> {
|
||||
if matches!(
|
||||
*self.state_tx.borrow(),
|
||||
InstanceState::Running | InstanceState::Starting
|
||||
) {
|
||||
bail!("instance '{}' is already running", self.instance_id);
|
||||
}
|
||||
self.set_state(InstanceState::Starting);
|
||||
match self.run("up", &["-d"]).await {
|
||||
Ok(()) => {
|
||||
self.inner.lock().await.started_at = Some(Instant::now());
|
||||
self.set_state(InstanceState::Running);
|
||||
tracing::info!("instance '{}' compose up -d", self.instance_id);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
self.set_state(InstanceState::Stopped);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop(self: Arc<Self>) -> Result<()> {
|
||||
self.set_state(InstanceState::Stopping);
|
||||
match self.run("stop", &[]).await {
|
||||
Ok(()) => {
|
||||
self.inner.lock().await.started_at = None;
|
||||
self.set_state(InstanceState::Stopped);
|
||||
tracing::info!("instance '{}' compose stop", self.instance_id);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
// Stop failed — the stack is most likely still up.
|
||||
self.set_state(InstanceState::Running);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn restart(self: Arc<Self>) -> Result<()> {
|
||||
self.set_state(InstanceState::Starting);
|
||||
match self.run("restart", &[]).await {
|
||||
Ok(()) => {
|
||||
self.inner.lock().await.started_at = Some(Instant::now());
|
||||
self.set_state(InstanceState::Running);
|
||||
tracing::info!("instance '{}' compose restart", self.instance_id);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
self.set_state(InstanceState::Stopped);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -13,9 +13,9 @@ use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::agent::Agent;
|
||||
use crate::process::ProcessSupervisor;
|
||||
use crate::subjects;
|
||||
use crate::steamcmd;
|
||||
use crate::supervisor::Supervisor;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct InstanceCommand {
|
||||
@@ -26,8 +26,8 @@ struct InstanceCommand {
|
||||
}
|
||||
|
||||
/// Forward every supervisor state change as a status event.
|
||||
pub async fn publish_state_changes(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>) {
|
||||
let subject = subjects::instance_status(&agent.cfg.license_id, &sup.instance_id);
|
||||
pub async fn publish_state_changes(agent: Arc<Agent>, sup: Arc<dyn Supervisor>) {
|
||||
let subject = subjects::instance_status(&agent.cfg.license_id, sup.instance_id());
|
||||
let mut rx = sup.watch_state();
|
||||
let cancel = agent.shutdown.clone();
|
||||
|
||||
@@ -40,13 +40,13 @@ pub async fn publish_state_changes(agent: Arc<Agent>, sup: Arc<ProcessSupervisor
|
||||
let state = rx.borrow().clone();
|
||||
let event = json!({
|
||||
"timestamp": Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true),
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
"event": state,
|
||||
});
|
||||
match serde_json::to_vec(&event) {
|
||||
Ok(bytes) => {
|
||||
if let Err(e) = agent.nats.publish(subject.clone(), bytes.into()).await {
|
||||
tracing::warn!("status publish failed for '{}': {e}", sup.instance_id);
|
||||
tracing::warn!("status publish failed for '{}': {e}", sup.instance_id());
|
||||
}
|
||||
}
|
||||
Err(e) => tracing::error!("status serialize failed: {e}"),
|
||||
@@ -58,8 +58,8 @@ pub async fn publish_state_changes(agent: Arc<Agent>, sup: Arc<ProcessSupervisor
|
||||
}
|
||||
|
||||
/// Request-reply command handler for one instance.
|
||||
pub async fn run(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>) -> anyhow::Result<()> {
|
||||
let subject = subjects::instance_cmd(&agent.cfg.license_id, &sup.instance_id);
|
||||
pub async fn run(agent: Arc<Agent>, sup: Arc<dyn Supervisor>) -> anyhow::Result<()> {
|
||||
let subject = subjects::instance_cmd(&agent.cfg.license_id, sup.instance_id());
|
||||
let mut sub = agent.nats.subscribe(subject.clone()).await?;
|
||||
tracing::info!("instance command handler listening on {subject}");
|
||||
|
||||
@@ -74,13 +74,13 @@ pub async fn run(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>) -> anyhow::Resu
|
||||
tokio::spawn(async move { handle(agent, sup, msg).await });
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("instance command subscription ended for '{}'", sup.instance_id);
|
||||
tracing::warn!("instance command subscription ended for '{}'", sup.instance_id());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("instance command handler stopping for '{}'", sup.instance_id);
|
||||
tracing::info!("instance command handler stopping for '{}'", sup.instance_id());
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -88,7 +88,7 @@ pub async fn run(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>) -> anyhow::Resu
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>, msg: async_nats::Message) {
|
||||
async fn handle(agent: Arc<Agent>, sup: Arc<dyn Supervisor>, msg: async_nats::Message) {
|
||||
let Some(reply) = msg.reply.clone() else {
|
||||
tracing::warn!("instance command without reply subject ignored");
|
||||
return;
|
||||
@@ -113,20 +113,22 @@ async fn handle(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>, msg: async_nats:
|
||||
|
||||
async fn dispatch(
|
||||
agent: &Arc<Agent>,
|
||||
sup: &Arc<ProcessSupervisor>,
|
||||
sup: &Arc<dyn Supervisor>,
|
||||
cmd: &InstanceCommand,
|
||||
) -> serde_json::Value {
|
||||
let func = cmd.func.as_str();
|
||||
|
||||
// start/stop/restart take `self: Arc<Self>` (they may hand a clone to a
|
||||
// monitor task), so clone the Arc before the consuming call.
|
||||
let outcome = match func {
|
||||
"start" => sup.start().await.map(|_| "starting"),
|
||||
"stop" => sup.stop().await.map(|_| "stopped"),
|
||||
"restart" => sup.restart().await.map(|_| "restarted"),
|
||||
"start" => sup.clone().start().await.map(|_| "starting"),
|
||||
"stop" => sup.clone().stop().await.map(|_| "stopped"),
|
||||
"restart" => sup.clone().restart().await.map(|_| "restarted"),
|
||||
"status" => {
|
||||
return json!({
|
||||
"status": "success",
|
||||
"func": "status",
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
"state": sup.state(),
|
||||
"uptime_seconds": sup.uptime_seconds().await,
|
||||
});
|
||||
@@ -139,15 +141,15 @@ async fn dispatch(
|
||||
.cfg
|
||||
.instances
|
||||
.iter()
|
||||
.find(|i| i.id == sup.instance_id);
|
||||
.find(|i| i.id == sup.instance_id());
|
||||
|
||||
let rcon_cfg = inst_cfg.and_then(|i| i.rcon.as_ref());
|
||||
let Some(rcon_cfg) = rcon_cfg else {
|
||||
return json!({
|
||||
"status": "error",
|
||||
"func": "rcon",
|
||||
"instance_id": sup.instance_id,
|
||||
"message": format!("instance '{}' has no rcon configured", sup.instance_id),
|
||||
"instance_id": sup.instance_id(),
|
||||
"message": format!("instance '{}' has no rcon configured", sup.instance_id()),
|
||||
});
|
||||
};
|
||||
|
||||
@@ -155,7 +157,7 @@ async fn dispatch(
|
||||
return json!({
|
||||
"status": "error",
|
||||
"func": "rcon",
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
"message": "rcon func requires a 'command' field",
|
||||
});
|
||||
};
|
||||
@@ -165,13 +167,13 @@ async fn dispatch(
|
||||
Ok(output) => json!({
|
||||
"status": "success",
|
||||
"func": "rcon",
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
"output": output,
|
||||
}),
|
||||
Err(e) => json!({
|
||||
"status": "error",
|
||||
"func": "rcon",
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
"message": format!("{e:#}"),
|
||||
}),
|
||||
};
|
||||
@@ -181,14 +183,14 @@ async fn dispatch(
|
||||
// settings. The supervisor only carries process-control state, not
|
||||
// the full config, so we reach into agent.cfg.instances here as the
|
||||
// rcon dispatch does.
|
||||
let inst_cfg = agent.cfg.instances.iter().find(|i| i.id == sup.instance_id);
|
||||
let inst_cfg = agent.cfg.instances.iter().find(|i| i.id == sup.instance_id());
|
||||
|
||||
let Some(inst_cfg) = inst_cfg else {
|
||||
return json!({
|
||||
"status": "error",
|
||||
"func": "steam_update",
|
||||
"instance_id": sup.instance_id,
|
||||
"message": format!("no config found for instance '{}'", sup.instance_id),
|
||||
"instance_id": sup.instance_id(),
|
||||
"message": format!("no config found for instance '{}'", sup.instance_id()),
|
||||
});
|
||||
};
|
||||
|
||||
@@ -209,7 +211,7 @@ async fn dispatch(
|
||||
};
|
||||
|
||||
let license = agent.cfg.license_id.clone();
|
||||
let instance_id = sup.instance_id.clone();
|
||||
let instance_id = sup.instance_id().to_string();
|
||||
let nats = agent.nats.clone();
|
||||
|
||||
// Publish each progress line to the steam_status subject.
|
||||
@@ -240,12 +242,12 @@ async fn dispatch(
|
||||
Ok(()) => json!({
|
||||
"status": "success",
|
||||
"func": "steam_update",
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
}),
|
||||
Err(e) => json!({
|
||||
"status": "error",
|
||||
"func": "steam_update",
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
"message": format!("{e:#}"),
|
||||
}),
|
||||
};
|
||||
@@ -262,14 +264,14 @@ async fn dispatch(
|
||||
Ok(result) => json!({
|
||||
"status": "success",
|
||||
"func": func,
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
"result": result,
|
||||
"state": sup.state(),
|
||||
}),
|
||||
Err(e) => json!({
|
||||
"status": "error",
|
||||
"func": func,
|
||||
"instance_id": sup.instance_id,
|
||||
"instance_id": sup.instance_id(),
|
||||
"message": format!("{e:#}"),
|
||||
}),
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
pub mod agent;
|
||||
pub mod bus;
|
||||
pub mod config;
|
||||
pub mod docker_compose;
|
||||
pub mod filemanager;
|
||||
pub mod hostcmd;
|
||||
pub mod instancecmd;
|
||||
@@ -12,6 +13,7 @@ pub mod process;
|
||||
pub mod rcon;
|
||||
pub mod steamcmd;
|
||||
pub mod subjects;
|
||||
pub mod supervisor;
|
||||
pub mod telemetry;
|
||||
pub mod update;
|
||||
pub mod version;
|
||||
|
||||
@@ -5,8 +5,8 @@
|
||||
//! game adapters arrive in Phase 1+ (see PROTOCOL.md).
|
||||
|
||||
use corrosion_host_agent::{
|
||||
agent, bus, config, filemanager, hostcmd, instancecmd, prober, process, subjects, telemetry,
|
||||
version,
|
||||
agent, bus, config, docker_compose, filemanager, hostcmd, instancecmd, prober, process,
|
||||
subjects, supervisor, telemetry, version,
|
||||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
@@ -92,10 +92,20 @@ async fn run(settings: config::Settings) -> Result<()> {
|
||||
|
||||
let nats = bus::connect(&settings).await?;
|
||||
|
||||
let supervisors = settings
|
||||
// Per-game supervisor factory: container-managed games (Dune) get a
|
||||
// docker-compose supervisor; everything else is a spawned-process
|
||||
// supervisor. Both satisfy the `Supervisor` trait, so the rest of the agent
|
||||
// is game-agnostic.
|
||||
let supervisors: std::collections::HashMap<String, Arc<dyn supervisor::Supervisor>> = settings
|
||||
.instances
|
||||
.iter()
|
||||
.map(|inst| (inst.id.clone(), process::ProcessSupervisor::new(inst)))
|
||||
.map(|inst| {
|
||||
let sup: Arc<dyn supervisor::Supervisor> = match inst.game.as_str() {
|
||||
"dune" => docker_compose::DockerComposeSupervisor::new(inst),
|
||||
_ => process::ProcessSupervisor::new(inst),
|
||||
};
|
||||
(inst.id.clone(), sup)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let agent = Arc::new(Agent {
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
//! Per-instance game-server process supervision.
|
||||
//!
|
||||
//! One `ProcessSupervisor` per process-managed instance. Lifecycle mirrors the
|
||||
//! proven Go agent behavior — graceful SIGTERM with a 30s budget before force
|
||||
//! kill, a monitor task that reaps the child and records crash-vs-stop — with
|
||||
//! two fixes the Go version needed: args are a proper list (no naive space
|
||||
//! splitting), and every state change is observable through a watch channel
|
||||
//! so the panel gets push events instead of waiting for the next heartbeat.
|
||||
//! One `ProcessSupervisor` per process-managed instance (Rust/Conan/Soulmask).
|
||||
//! Lifecycle mirrors the proven Go agent behavior — graceful SIGTERM with a 30s
|
||||
//! budget before force kill, a monitor task that reaps the child and records
|
||||
//! crash-vs-stop — with two fixes the Go version needed: args are a proper list
|
||||
//! (no naive space splitting), and every state change is observable through a
|
||||
//! watch channel so the panel gets push events instead of waiting for the next
|
||||
//! heartbeat. Lifecycle control is exposed through the [`Supervisor`] trait so
|
||||
//! the command dispatch is identical across process- and container-managed
|
||||
//! games.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use serde::Serialize;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
@@ -17,39 +19,11 @@ use tokio::process::{Child, Command};
|
||||
use tokio::sync::{watch, Mutex};
|
||||
|
||||
use crate::config::InstanceConfig;
|
||||
use crate::supervisor::{InstanceState, Supervisor};
|
||||
|
||||
const GRACEFUL_STOP_BUDGET: Duration = Duration::from_secs(30);
|
||||
const RESTART_PAUSE: Duration = Duration::from_secs(2);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case", tag = "state")]
|
||||
pub enum InstanceState {
|
||||
/// Not process-managed (no executable configured).
|
||||
Unmanaged,
|
||||
Stopped,
|
||||
Starting,
|
||||
Running,
|
||||
Stopping,
|
||||
/// Process exited without a stop request.
|
||||
Crashed {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
exit_code: Option<i32>,
|
||||
},
|
||||
}
|
||||
|
||||
impl InstanceState {
|
||||
pub fn as_label(&self) -> &'static str {
|
||||
match self {
|
||||
InstanceState::Unmanaged => "unmanaged",
|
||||
InstanceState::Stopped => "stopped",
|
||||
InstanceState::Starting => "starting",
|
||||
InstanceState::Running => "running",
|
||||
InstanceState::Stopping => "stopping",
|
||||
InstanceState::Crashed { .. } => "crashed",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
child: Option<Child>,
|
||||
started_at: Option<Instant>,
|
||||
@@ -59,7 +33,7 @@ struct Inner {
|
||||
}
|
||||
|
||||
pub struct ProcessSupervisor {
|
||||
pub instance_id: String,
|
||||
instance_id: String,
|
||||
executable: Option<PathBuf>,
|
||||
args: Vec<String>,
|
||||
working_dir: Option<PathBuf>,
|
||||
@@ -90,72 +64,6 @@ impl ProcessSupervisor {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn state(&self) -> InstanceState {
|
||||
self.state_tx.borrow().clone()
|
||||
}
|
||||
|
||||
pub fn watch_state(&self) -> watch::Receiver<InstanceState> {
|
||||
self.state_tx.subscribe()
|
||||
}
|
||||
|
||||
pub async fn uptime_seconds(&self) -> u64 {
|
||||
let inner = self.inner.lock().await;
|
||||
match (&*self.state_tx.borrow(), inner.started_at) {
|
||||
(InstanceState::Running, Some(t)) => t.elapsed().as_secs(),
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(self: &Arc<Self>) -> Result<()> {
|
||||
let Some(exe) = self.executable.clone() else {
|
||||
bail!("instance '{}' has no executable configured", self.instance_id);
|
||||
};
|
||||
if !exe.exists() {
|
||||
bail!("executable not found: {}", exe.display());
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock().await;
|
||||
if matches!(*self.state_tx.borrow(), InstanceState::Running | InstanceState::Starting) {
|
||||
bail!("instance '{}' is already running", self.instance_id);
|
||||
}
|
||||
|
||||
self.set_state(InstanceState::Starting);
|
||||
|
||||
let workdir = self
|
||||
.working_dir
|
||||
.clone()
|
||||
.or_else(|| exe.parent().map(|p| p.to_path_buf()))
|
||||
.unwrap_or_else(|| PathBuf::from("."));
|
||||
|
||||
let child = Command::new(&exe)
|
||||
.args(&self.args)
|
||||
.current_dir(&workdir)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::inherit())
|
||||
.stderr(Stdio::inherit())
|
||||
.spawn()
|
||||
.with_context(|| format!("spawning {}", exe.display()))?;
|
||||
|
||||
let pid = child.id();
|
||||
inner.child = Some(child);
|
||||
inner.started_at = Some(Instant::now());
|
||||
inner.stop_requested = false;
|
||||
drop(inner);
|
||||
|
||||
self.set_state(InstanceState::Running);
|
||||
tracing::info!(
|
||||
"instance '{}' started: {} (pid {:?})",
|
||||
self.instance_id,
|
||||
exe.display(),
|
||||
pid
|
||||
);
|
||||
|
||||
// Monitor: reap the child and classify the exit.
|
||||
let sup = Arc::clone(self);
|
||||
tokio::spawn(async move { sup.monitor().await });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn monitor(self: Arc<Self>) {
|
||||
// Take a waiter without holding the lock across the whole child
|
||||
// lifetime: Child::wait needs &mut, so the child stays in inner and
|
||||
@@ -201,7 +109,85 @@ impl ProcessSupervisor {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop(self: &Arc<Self>) -> Result<()> {
|
||||
fn set_state(&self, state: InstanceState) {
|
||||
// send_replace never fails even with zero receivers.
|
||||
let _ = self.state_tx.send_replace(state);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Supervisor for ProcessSupervisor {
|
||||
fn instance_id(&self) -> &str {
|
||||
&self.instance_id
|
||||
}
|
||||
|
||||
fn state(&self) -> InstanceState {
|
||||
self.state_tx.borrow().clone()
|
||||
}
|
||||
|
||||
fn watch_state(&self) -> watch::Receiver<InstanceState> {
|
||||
self.state_tx.subscribe()
|
||||
}
|
||||
|
||||
async fn uptime_seconds(&self) -> u64 {
|
||||
let inner = self.inner.lock().await;
|
||||
match (&*self.state_tx.borrow(), inner.started_at) {
|
||||
(InstanceState::Running, Some(t)) => t.elapsed().as_secs(),
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
async fn start(self: Arc<Self>) -> Result<()> {
|
||||
let Some(exe) = self.executable.clone() else {
|
||||
bail!("instance '{}' has no executable configured", self.instance_id);
|
||||
};
|
||||
if !exe.exists() {
|
||||
bail!("executable not found: {}", exe.display());
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock().await;
|
||||
if matches!(*self.state_tx.borrow(), InstanceState::Running | InstanceState::Starting) {
|
||||
bail!("instance '{}' is already running", self.instance_id);
|
||||
}
|
||||
|
||||
self.set_state(InstanceState::Starting);
|
||||
|
||||
let workdir = self
|
||||
.working_dir
|
||||
.clone()
|
||||
.or_else(|| exe.parent().map(|p| p.to_path_buf()))
|
||||
.unwrap_or_else(|| PathBuf::from("."));
|
||||
|
||||
let child = Command::new(&exe)
|
||||
.args(&self.args)
|
||||
.current_dir(&workdir)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::inherit())
|
||||
.stderr(Stdio::inherit())
|
||||
.spawn()
|
||||
.with_context(|| format!("spawning {}", exe.display()))?;
|
||||
|
||||
let pid = child.id();
|
||||
inner.child = Some(child);
|
||||
inner.started_at = Some(Instant::now());
|
||||
inner.stop_requested = false;
|
||||
drop(inner);
|
||||
|
||||
self.set_state(InstanceState::Running);
|
||||
tracing::info!(
|
||||
"instance '{}' started: {} (pid {:?})",
|
||||
self.instance_id,
|
||||
exe.display(),
|
||||
pid
|
||||
);
|
||||
|
||||
// Monitor: reap the child and classify the exit.
|
||||
let sup = Arc::clone(&self);
|
||||
tokio::spawn(async move { sup.monitor().await });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop(self: Arc<Self>) -> Result<()> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
if inner.child.is_none() {
|
||||
bail!("instance '{}' is not running", self.instance_id);
|
||||
@@ -263,16 +249,14 @@ impl ProcessSupervisor {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn restart(self: &Arc<Self>) -> Result<()> {
|
||||
if !matches!(*self.state_tx.borrow(), InstanceState::Stopped | InstanceState::Crashed { .. } | InstanceState::Unmanaged) {
|
||||
self.stop().await?;
|
||||
async fn restart(self: Arc<Self>) -> Result<()> {
|
||||
if !matches!(
|
||||
*self.state_tx.borrow(),
|
||||
InstanceState::Stopped | InstanceState::Crashed { .. } | InstanceState::Unmanaged
|
||||
) {
|
||||
self.clone().stop().await?;
|
||||
}
|
||||
tokio::time::sleep(RESTART_PAUSE).await;
|
||||
self.start().await
|
||||
}
|
||||
|
||||
fn set_state(&self, state: InstanceState) {
|
||||
// send_replace never fails even with zero receivers.
|
||||
let _ = self.state_tx.send_replace(state);
|
||||
}
|
||||
}
|
||||
|
||||
80
corrosion-host-agent/src/supervisor.rs
Normal file
80
corrosion-host-agent/src/supervisor.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
//! The supervision abstraction.
|
||||
//!
|
||||
//! A `Supervisor` owns the lifecycle of one game instance. Different games are
|
||||
//! managed in fundamentally different ways — Rust/Conan/Soulmask are spawned OS
|
||||
//! processes ([`crate::process::ProcessSupervisor`]); Dune is a docker-compose
|
||||
//! stack ([`crate::docker_compose::DockerComposeSupervisor`]); future planes
|
||||
//! (kubectl, AMP/podman, SSH) will be their own impls. The instance command
|
||||
//! dispatch (`instancecmd::dispatch`) talks only to this trait, so it never
|
||||
//! learns which management model is behind a given instance.
|
||||
//!
|
||||
//! Trait objects (`Arc<dyn Supervisor>`) need object-safe, dynamically
|
||||
//! dispatchable async methods; native `async fn` in traits is not yet
|
||||
//! dyn-compatible, so we use `#[async_trait]` (the battle-tested ecosystem
|
||||
//! standard) to box the returned futures. The cost — one heap alloc per
|
||||
//! lifecycle call — is irrelevant for start/stop/restart, which happen seconds
|
||||
//! to minutes apart.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// Observable lifecycle state of one instance. Shared vocabulary across every
|
||||
/// supervisor impl; serialized verbatim into heartbeats and status events
|
||||
/// (`{"state":"running", ...}`).
|
||||
#[derive(Debug, Clone, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case", tag = "state")]
|
||||
pub enum InstanceState {
|
||||
/// Not lifecycle-managed (a process instance with no executable, etc.).
|
||||
Unmanaged,
|
||||
Stopped,
|
||||
Starting,
|
||||
Running,
|
||||
Stopping,
|
||||
/// Exited/died without a stop request.
|
||||
Crashed {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
exit_code: Option<i32>,
|
||||
},
|
||||
}
|
||||
|
||||
impl InstanceState {
|
||||
pub fn as_label(&self) -> &'static str {
|
||||
match self {
|
||||
InstanceState::Unmanaged => "unmanaged",
|
||||
InstanceState::Stopped => "stopped",
|
||||
InstanceState::Starting => "starting",
|
||||
InstanceState::Running => "running",
|
||||
InstanceState::Stopping => "stopping",
|
||||
InstanceState::Crashed { .. } => "crashed",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Lifecycle control + state observation for one instance.
|
||||
///
|
||||
/// `start`/`stop`/`restart` take `self: Arc<Self>` so an impl can hand a clone
|
||||
/// to a spawned monitor task; callers hold an `Arc<dyn Supervisor>` and
|
||||
/// `clone()` before each call. `watch_state` exposes the same channel the
|
||||
/// status-event publisher drains, so panel push events stay decoupled from the
|
||||
/// heartbeat cadence.
|
||||
#[async_trait::async_trait]
|
||||
pub trait Supervisor: Send + Sync {
|
||||
/// The instance slug (a NATS subject segment).
|
||||
fn instance_id(&self) -> &str;
|
||||
|
||||
/// Current cached state (cheap; no I/O).
|
||||
fn state(&self) -> InstanceState;
|
||||
|
||||
/// Subscribe to state transitions.
|
||||
fn watch_state(&self) -> watch::Receiver<InstanceState>;
|
||||
|
||||
/// Seconds since the instance entered `Running` (0 otherwise).
|
||||
async fn uptime_seconds(&self) -> u64;
|
||||
|
||||
async fn start(self: Arc<Self>) -> Result<()>;
|
||||
async fn stop(self: Arc<Self>) -> Result<()>;
|
||||
async fn restart(self: Arc<Self>) -> Result<()>;
|
||||
}
|
||||
@@ -129,7 +129,7 @@ pub async fn collect(agent: &Agent, sys: &mut System) -> HeartbeatPayload {
|
||||
let mut instances = Vec::with_capacity(agent.cfg.instances.len());
|
||||
for inst in &agent.cfg.instances {
|
||||
let (state, uptime_seconds) = match agent.supervisors.get(&inst.id) {
|
||||
Some(sup) if !matches!(sup.state(), crate::process::InstanceState::Unmanaged) => {
|
||||
Some(sup) if !matches!(sup.state(), crate::supervisor::InstanceState::Unmanaged) => {
|
||||
(sup.state().as_label().to_string(), sup.uptime_seconds().await)
|
||||
}
|
||||
_ => {
|
||||
|
||||
156
corrosion-host-agent/tests/docker_compose.rs
Normal file
156
corrosion-host-agent/tests/docker_compose.rs
Normal file
@@ -0,0 +1,156 @@
|
||||
//! DockerComposeSupervisor tests. A fake `docker` script records the exact
|
||||
//! arguments it was invoked with and returns a controllable exit code, so we
|
||||
//! assert the compose invocations + state transitions with no real Docker
|
||||
//! daemon — the same mock-the-external-binary approach the steamcmd tests use.
|
||||
#![cfg(unix)]
|
||||
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use corrosion_host_agent::config::InstanceConfig;
|
||||
use corrosion_host_agent::docker_compose::{DockerComposeConfig, DockerComposeSupervisor};
|
||||
use corrosion_host_agent::supervisor::{InstanceState, Supervisor};
|
||||
|
||||
/// Write a fake `docker` executable that appends its args (space-joined) to
|
||||
/// `args_log` and exits with the integer in `exit_file` (0 if absent).
|
||||
fn fake_docker(dir: &Path, args_log: &Path, exit_file: &Path) -> PathBuf {
|
||||
let script = dir.join("fakedocker");
|
||||
let body = format!(
|
||||
"#!/bin/sh\nprintf '%s\\n' \"$*\" >> '{}'\nexit \"$(cat '{}' 2>/dev/null || echo 0)\"\n",
|
||||
args_log.display(),
|
||||
exit_file.display(),
|
||||
);
|
||||
std::fs::write(&script, body).unwrap();
|
||||
let mut perms = std::fs::metadata(&script).unwrap().permissions();
|
||||
perms.set_mode(0o755);
|
||||
std::fs::set_permissions(&script, perms).unwrap();
|
||||
script
|
||||
}
|
||||
|
||||
fn dune_instance(command: Vec<String>, service: Option<String>) -> InstanceConfig {
|
||||
InstanceConfig {
|
||||
id: "dune-main".to_string(),
|
||||
game: "dune".to_string(),
|
||||
root: PathBuf::from("/tmp"),
|
||||
label: None,
|
||||
executable: None,
|
||||
args: vec![],
|
||||
working_dir: None,
|
||||
rcon: None,
|
||||
steamcmd: None,
|
||||
docker_compose: Some(DockerComposeConfig {
|
||||
file: Some(PathBuf::from("docker-compose.yml")),
|
||||
project: Some("duneproj".to_string()),
|
||||
service,
|
||||
command: Some(command),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_runs_compose_up_detached_and_sets_running() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let args_log = dir.path().join("args.log");
|
||||
let exit_file = dir.path().join("exit");
|
||||
let docker = fake_docker(dir.path(), &args_log, &exit_file);
|
||||
|
||||
let sup = DockerComposeSupervisor::new(&dune_instance(
|
||||
vec![docker.to_string_lossy().into_owned()],
|
||||
None,
|
||||
));
|
||||
assert_eq!(sup.state(), InstanceState::Stopped);
|
||||
|
||||
sup.clone().start().await.expect("compose up should succeed");
|
||||
assert_eq!(sup.state(), InstanceState::Running);
|
||||
|
||||
let logged = std::fs::read_to_string(&args_log).unwrap();
|
||||
assert!(logged.contains("up -d"), "expected `up -d`; got: {logged}");
|
||||
assert!(logged.contains("-p duneproj"), "expected project flag; got: {logged}");
|
||||
assert!(logged.contains("-f docker-compose.yml"), "expected file flag; got: {logged}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stop_runs_compose_stop_and_sets_stopped() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let args_log = dir.path().join("args.log");
|
||||
let exit_file = dir.path().join("exit");
|
||||
let docker = fake_docker(dir.path(), &args_log, &exit_file);
|
||||
|
||||
let sup = DockerComposeSupervisor::new(&dune_instance(
|
||||
vec![docker.to_string_lossy().into_owned()],
|
||||
None,
|
||||
));
|
||||
sup.clone().start().await.expect("up");
|
||||
sup.clone().stop().await.expect("compose stop should succeed");
|
||||
assert_eq!(sup.state(), InstanceState::Stopped);
|
||||
assert_eq!(sup.uptime_seconds().await, 0);
|
||||
|
||||
let logged = std::fs::read_to_string(&args_log).unwrap();
|
||||
assert!(logged.lines().any(|l| l.contains("stop")), "expected a `stop` call; got: {logged}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restart_runs_compose_restart() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let args_log = dir.path().join("args.log");
|
||||
let exit_file = dir.path().join("exit");
|
||||
let docker = fake_docker(dir.path(), &args_log, &exit_file);
|
||||
|
||||
let sup = DockerComposeSupervisor::new(&dune_instance(
|
||||
vec![docker.to_string_lossy().into_owned()],
|
||||
None,
|
||||
));
|
||||
sup.clone().restart().await.expect("compose restart should succeed");
|
||||
assert_eq!(sup.state(), InstanceState::Running);
|
||||
|
||||
let logged = std::fs::read_to_string(&args_log).unwrap();
|
||||
assert!(logged.contains("restart"), "expected `restart`; got: {logged}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn single_service_is_targeted() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let args_log = dir.path().join("args.log");
|
||||
let exit_file = dir.path().join("exit");
|
||||
let docker = fake_docker(dir.path(), &args_log, &exit_file);
|
||||
|
||||
let sup = DockerComposeSupervisor::new(&dune_instance(
|
||||
vec![docker.to_string_lossy().into_owned()],
|
||||
Some("gameserver".to_string()),
|
||||
));
|
||||
sup.clone().start().await.expect("up");
|
||||
|
||||
let logged = std::fs::read_to_string(&args_log).unwrap();
|
||||
assert!(
|
||||
logged.contains("up -d gameserver"),
|
||||
"service must be appended after `up -d`; got: {logged}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compose_failure_errors_and_reverts_state() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let args_log = dir.path().join("args.log");
|
||||
let exit_file = dir.path().join("exit");
|
||||
std::fs::write(&exit_file, "1").unwrap(); // make the fake docker fail
|
||||
let docker = fake_docker(dir.path(), &args_log, &exit_file);
|
||||
|
||||
let sup = DockerComposeSupervisor::new(&dune_instance(
|
||||
vec![docker.to_string_lossy().into_owned()],
|
||||
None,
|
||||
));
|
||||
let err = sup.clone().start().await.expect_err("nonzero compose exit must fail");
|
||||
assert!(err.to_string().contains("compose up failed"), "got: {err}");
|
||||
assert_eq!(sup.state(), InstanceState::Stopped, "failed start must revert to Stopped");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn missing_docker_binary_errors_cleanly() {
|
||||
let sup = DockerComposeSupervisor::new(&dune_instance(
|
||||
vec!["/nonexistent/docker-xyz".to_string()],
|
||||
None,
|
||||
));
|
||||
let err = sup.clone().start().await.expect_err("missing docker must fail");
|
||||
assert!(err.to_string().contains("docker"), "error should mention docker: {err}");
|
||||
assert_eq!(sup.state(), InstanceState::Stopped);
|
||||
}
|
||||
@@ -8,7 +8,8 @@ use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use corrosion_host_agent::config::InstanceConfig;
|
||||
use corrosion_host_agent::process::{InstanceState, ProcessSupervisor};
|
||||
use corrosion_host_agent::process::ProcessSupervisor;
|
||||
use corrosion_host_agent::supervisor::{InstanceState, Supervisor};
|
||||
|
||||
fn managed_instance(executable: &str, args: &[&str]) -> InstanceConfig {
|
||||
InstanceConfig {
|
||||
@@ -21,6 +22,7 @@ fn managed_instance(executable: &str, args: &[&str]) -> InstanceConfig {
|
||||
working_dir: None,
|
||||
rcon: None,
|
||||
steamcmd: None,
|
||||
docker_compose: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,15 +49,15 @@ async fn start_status_stop_lifecycle() {
|
||||
let sup = ProcessSupervisor::new(&managed_instance("/bin/sleep", &["300"]));
|
||||
assert_eq!(sup.state(), InstanceState::Stopped);
|
||||
|
||||
sup.start().await.expect("start should succeed");
|
||||
sup.clone().start().await.expect("start should succeed");
|
||||
assert_eq!(sup.state(), InstanceState::Running);
|
||||
tokio::time::sleep(Duration::from_millis(1100)).await;
|
||||
assert!(sup.uptime_seconds().await >= 1, "uptime should advance");
|
||||
|
||||
// Double-start must be rejected while running.
|
||||
assert!(sup.start().await.is_err(), "double start must fail");
|
||||
assert!(sup.clone().start().await.is_err(), "double start must fail");
|
||||
|
||||
sup.stop().await.expect("stop should succeed");
|
||||
sup.clone().stop().await.expect("stop should succeed");
|
||||
let state = wait_for_state(&sup, |s| matches!(s, InstanceState::Stopped), Duration::from_secs(5)).await;
|
||||
assert_eq!(state, InstanceState::Stopped);
|
||||
assert_eq!(sup.uptime_seconds().await, 0);
|
||||
@@ -64,7 +66,7 @@ async fn start_status_stop_lifecycle() {
|
||||
#[tokio::test]
|
||||
async fn unexpected_exit_is_crashed_with_code() {
|
||||
let sup = ProcessSupervisor::new(&managed_instance("/bin/sh", &["-c", "sleep 0.2; exit 7"]));
|
||||
sup.start().await.expect("start should succeed");
|
||||
sup.clone().start().await.expect("start should succeed");
|
||||
|
||||
let state = wait_for_state(
|
||||
&sup,
|
||||
@@ -78,16 +80,16 @@ async fn unexpected_exit_is_crashed_with_code() {
|
||||
#[tokio::test]
|
||||
async fn restart_from_crashed_recovers() {
|
||||
let sup = ProcessSupervisor::new(&managed_instance("/bin/sh", &["-c", "exit 1"]));
|
||||
sup.start().await.expect("start should succeed");
|
||||
sup.clone().start().await.expect("start should succeed");
|
||||
wait_for_state(&sup, |s| matches!(s, InstanceState::Crashed { .. }), Duration::from_secs(5)).await;
|
||||
|
||||
// Restart from crashed must work (panel "Restart" after a crash).
|
||||
// Use a long-lived command this time by replacing the supervisor — the
|
||||
// command is fixed per supervisor, so emulate via a fresh one.
|
||||
let sup2 = ProcessSupervisor::new(&managed_instance("/bin/sleep", &["300"]));
|
||||
sup2.restart().await.expect("restart from stopped should start");
|
||||
sup2.clone().restart().await.expect("restart from stopped should start");
|
||||
assert_eq!(sup2.state(), InstanceState::Running);
|
||||
sup2.stop().await.expect("cleanup stop");
|
||||
sup2.clone().stop().await.expect("cleanup stop");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -96,14 +98,14 @@ async fn unmanaged_instance_rejects_process_commands() {
|
||||
cfg.executable = None;
|
||||
let sup = ProcessSupervisor::new(&cfg);
|
||||
assert_eq!(sup.state(), InstanceState::Unmanaged);
|
||||
assert!(sup.start().await.is_err(), "unmanaged start must fail");
|
||||
assert!(sup.stop().await.is_err(), "unmanaged stop must fail");
|
||||
assert!(sup.clone().start().await.is_err(), "unmanaged start must fail");
|
||||
assert!(sup.clone().stop().await.is_err(), "unmanaged stop must fail");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn missing_executable_fails_cleanly() {
|
||||
let sup = ProcessSupervisor::new(&managed_instance("/nonexistent/bin/gameserver", &[]));
|
||||
let err = sup.start().await.expect_err("must fail");
|
||||
let err = sup.clone().start().await.expect_err("must fail");
|
||||
assert!(err.to_string().contains("not found"), "error should say not found: {err}");
|
||||
assert_eq!(sup.state(), InstanceState::Stopped, "failed start must not leave Starting state");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user