Per-instance ProcessSupervisor: tokio child spawn with proper arg list
(fixes Go's naive space-splitting), graceful SIGTERM with 30s budget
then force kill, monitor task classifying ordered-stop vs crash (exit
code captured), watch-channel state observable everywhere. Instance cmd
channel live on corrosion.{license}.{instance}.cmd (start/stop/restart/
status) with state events pushed on {instance}.status (keep-latest
semantics, documented). Heartbeats now carry live process state +
uptime per instance. Crate restructured lib+bin for integration tests.
Verified: 5 integration tests with real OS processes (lifecycle, crash
exit-code, restart recovery, unmanaged rejection, clean spawn failure)
+ live-NATS contract test (request-reply roundtrips, double-start
rejection, push events, heartbeat state) — all green.
Known limitation (documented): no PID adoption yet — agent restart
orphans a running game process to 'stopped' until panel restart.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
279 lines
9.1 KiB
Rust
279 lines
9.1 KiB
Rust
//! Per-instance game-server process supervision.
|
|
//!
|
|
//! One `ProcessSupervisor` per process-managed instance. Lifecycle mirrors the
|
|
//! proven Go agent behavior — graceful SIGTERM with a 30s budget before force
|
|
//! kill, a monitor task that reaps the child and records crash-vs-stop — with
|
|
//! two fixes the Go version needed: args are a proper list (no naive space
|
|
//! splitting), and every state change is observable through a watch channel
|
|
//! so the panel gets push events instead of waiting for the next heartbeat.
|
|
|
|
use anyhow::{bail, Context, Result};
|
|
use serde::Serialize;
|
|
use std::path::PathBuf;
|
|
use std::process::Stdio;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
use tokio::process::{Child, Command};
|
|
use tokio::sync::{watch, Mutex};
|
|
|
|
use crate::config::InstanceConfig;
|
|
|
|
const GRACEFUL_STOP_BUDGET: Duration = Duration::from_secs(30);
|
|
const RESTART_PAUSE: Duration = Duration::from_secs(2);
|
|
|
|
#[derive(Debug, Clone, PartialEq, Serialize)]
|
|
#[serde(rename_all = "snake_case", tag = "state")]
|
|
pub enum InstanceState {
|
|
/// Not process-managed (no executable configured).
|
|
Unmanaged,
|
|
Stopped,
|
|
Starting,
|
|
Running,
|
|
Stopping,
|
|
/// Process exited without a stop request.
|
|
Crashed {
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
exit_code: Option<i32>,
|
|
},
|
|
}
|
|
|
|
impl InstanceState {
|
|
pub fn as_label(&self) -> &'static str {
|
|
match self {
|
|
InstanceState::Unmanaged => "unmanaged",
|
|
InstanceState::Stopped => "stopped",
|
|
InstanceState::Starting => "starting",
|
|
InstanceState::Running => "running",
|
|
InstanceState::Stopping => "stopping",
|
|
InstanceState::Crashed { .. } => "crashed",
|
|
}
|
|
}
|
|
}
|
|
|
|
struct Inner {
|
|
child: Option<Child>,
|
|
started_at: Option<Instant>,
|
|
/// True while a stop was requested — the monitor uses it to distinguish
|
|
/// an ordered shutdown from a crash.
|
|
stop_requested: bool,
|
|
}
|
|
|
|
pub struct ProcessSupervisor {
|
|
pub instance_id: String,
|
|
executable: Option<PathBuf>,
|
|
args: Vec<String>,
|
|
working_dir: Option<PathBuf>,
|
|
inner: Mutex<Inner>,
|
|
state_tx: watch::Sender<InstanceState>,
|
|
}
|
|
|
|
impl ProcessSupervisor {
|
|
pub fn new(cfg: &InstanceConfig) -> Arc<Self> {
|
|
let executable = cfg.resolved_executable();
|
|
let initial = if executable.is_some() {
|
|
InstanceState::Stopped
|
|
} else {
|
|
InstanceState::Unmanaged
|
|
};
|
|
let (state_tx, _) = watch::channel(initial);
|
|
Arc::new(Self {
|
|
instance_id: cfg.id.clone(),
|
|
executable,
|
|
args: cfg.args.clone(),
|
|
working_dir: cfg.working_dir.clone(),
|
|
inner: Mutex::new(Inner {
|
|
child: None,
|
|
started_at: None,
|
|
stop_requested: false,
|
|
}),
|
|
state_tx,
|
|
})
|
|
}
|
|
|
|
pub fn state(&self) -> InstanceState {
|
|
self.state_tx.borrow().clone()
|
|
}
|
|
|
|
pub fn watch_state(&self) -> watch::Receiver<InstanceState> {
|
|
self.state_tx.subscribe()
|
|
}
|
|
|
|
pub async fn uptime_seconds(&self) -> u64 {
|
|
let inner = self.inner.lock().await;
|
|
match (&*self.state_tx.borrow(), inner.started_at) {
|
|
(InstanceState::Running, Some(t)) => t.elapsed().as_secs(),
|
|
_ => 0,
|
|
}
|
|
}
|
|
|
|
pub async fn start(self: &Arc<Self>) -> Result<()> {
|
|
let Some(exe) = self.executable.clone() else {
|
|
bail!("instance '{}' has no executable configured", self.instance_id);
|
|
};
|
|
if !exe.exists() {
|
|
bail!("executable not found: {}", exe.display());
|
|
}
|
|
|
|
let mut inner = self.inner.lock().await;
|
|
if matches!(*self.state_tx.borrow(), InstanceState::Running | InstanceState::Starting) {
|
|
bail!("instance '{}' is already running", self.instance_id);
|
|
}
|
|
|
|
self.set_state(InstanceState::Starting);
|
|
|
|
let workdir = self
|
|
.working_dir
|
|
.clone()
|
|
.or_else(|| exe.parent().map(|p| p.to_path_buf()))
|
|
.unwrap_or_else(|| PathBuf::from("."));
|
|
|
|
let child = Command::new(&exe)
|
|
.args(&self.args)
|
|
.current_dir(&workdir)
|
|
.stdin(Stdio::null())
|
|
.stdout(Stdio::inherit())
|
|
.stderr(Stdio::inherit())
|
|
.spawn()
|
|
.with_context(|| format!("spawning {}", exe.display()))?;
|
|
|
|
let pid = child.id();
|
|
inner.child = Some(child);
|
|
inner.started_at = Some(Instant::now());
|
|
inner.stop_requested = false;
|
|
drop(inner);
|
|
|
|
self.set_state(InstanceState::Running);
|
|
tracing::info!(
|
|
"instance '{}' started: {} (pid {:?})",
|
|
self.instance_id,
|
|
exe.display(),
|
|
pid
|
|
);
|
|
|
|
// Monitor: reap the child and classify the exit.
|
|
let sup = Arc::clone(self);
|
|
tokio::spawn(async move { sup.monitor().await });
|
|
Ok(())
|
|
}
|
|
|
|
async fn monitor(self: Arc<Self>) {
|
|
// Take a waiter without holding the lock across the whole child
|
|
// lifetime: Child::wait needs &mut, so the child stays in inner and
|
|
// we poll it.
|
|
loop {
|
|
let status = {
|
|
let mut inner = self.inner.lock().await;
|
|
let Some(child) = inner.child.as_mut() else { return };
|
|
match child.try_wait() {
|
|
Ok(Some(status)) => Some(status),
|
|
Ok(None) => None,
|
|
Err(e) => {
|
|
tracing::error!("instance '{}' wait failed: {e}", self.instance_id);
|
|
return;
|
|
}
|
|
}
|
|
};
|
|
|
|
match status {
|
|
Some(status) => {
|
|
let mut inner = self.inner.lock().await;
|
|
inner.child = None;
|
|
inner.started_at = None;
|
|
let ordered = inner.stop_requested;
|
|
inner.stop_requested = false;
|
|
drop(inner);
|
|
|
|
if ordered {
|
|
self.set_state(InstanceState::Stopped);
|
|
tracing::info!("instance '{}' stopped ({status})", self.instance_id);
|
|
} else {
|
|
let exit_code = status.code();
|
|
self.set_state(InstanceState::Crashed { exit_code });
|
|
tracing::warn!(
|
|
"instance '{}' exited unexpectedly ({status}) — marked crashed",
|
|
self.instance_id
|
|
);
|
|
}
|
|
return;
|
|
}
|
|
None => tokio::time::sleep(Duration::from_millis(500)).await,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn stop(self: &Arc<Self>) -> Result<()> {
|
|
let mut inner = self.inner.lock().await;
|
|
if inner.child.is_none() {
|
|
bail!("instance '{}' is not running", self.instance_id);
|
|
}
|
|
inner.stop_requested = true;
|
|
self.set_state(InstanceState::Stopping);
|
|
let child = inner.child.as_mut().expect("checked above");
|
|
|
|
// Graceful first: SIGTERM on unix; Windows has no SIGTERM equivalent
|
|
// for console processes, so it goes straight to kill there.
|
|
#[cfg(unix)]
|
|
if let Some(pid) = child.id() {
|
|
unsafe {
|
|
libc::kill(pid as i32, libc::SIGTERM);
|
|
}
|
|
}
|
|
#[cfg(not(unix))]
|
|
{
|
|
let _ = child.start_kill();
|
|
}
|
|
drop(inner);
|
|
|
|
// Wait for the monitor to observe the exit; force kill on budget.
|
|
let mut rx = self.watch_state();
|
|
let deadline = tokio::time::timeout(GRACEFUL_STOP_BUDGET, async {
|
|
loop {
|
|
if matches!(*rx.borrow(), InstanceState::Stopped) {
|
|
return;
|
|
}
|
|
if rx.changed().await.is_err() {
|
|
return;
|
|
}
|
|
}
|
|
})
|
|
.await;
|
|
|
|
if deadline.is_err() {
|
|
tracing::warn!(
|
|
"instance '{}' ignored SIGTERM for {}s — force killing",
|
|
self.instance_id,
|
|
GRACEFUL_STOP_BUDGET.as_secs()
|
|
);
|
|
let mut inner = self.inner.lock().await;
|
|
if let Some(child) = inner.child.as_mut() {
|
|
let _ = child.start_kill();
|
|
}
|
|
drop(inner);
|
|
|
|
let mut rx = self.watch_state();
|
|
let _ = tokio::time::timeout(Duration::from_secs(5), async {
|
|
while !matches!(*rx.borrow(), InstanceState::Stopped) {
|
|
if rx.changed().await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
})
|
|
.await;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn restart(self: &Arc<Self>) -> Result<()> {
|
|
if !matches!(*self.state_tx.borrow(), InstanceState::Stopped | InstanceState::Crashed { .. } | InstanceState::Unmanaged) {
|
|
self.stop().await?;
|
|
}
|
|
tokio::time::sleep(RESTART_PAUSE).await;
|
|
self.start().await
|
|
}
|
|
|
|
fn set_state(&self, state: InstanceState) {
|
|
// send_replace never fails even with zero receivers.
|
|
let _ = self.state_tx.send_replace(state);
|
|
}
|
|
}
|