feat(host-agent): Phase 1b RCON — WebRCON (rust) + Source RCON (conan/soulmask)

rcon func on the instance command channel: WebSocket JSON WebRCON with
Identifier correlation (skips chat/log noise frames) and full Valve
Source RCON over TCP (auth, exec, multi-packet reassembly via empty
probe, 1MiB cap). Protocol inferred from game, explicit kind override
in [instance.rcon]. Always 127.0.0.1 — agent is co-located.

Hardening from review: WebRCON password never interpolated into error
contexts/logs (redacted URL); probe-tolerant termination — a quiet
period after received data ends the response for servers that don't
echo the probe (Soulmask conformance unverified), so data is never
discarded on probe timeout.

13/13 tests green incl. mock Source-RCON server (auth/multi-packet/
errors) and mock WebRCON server (noise-frame skipping).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Vantz Stockwell
2026-06-11 10:53:52 -04:00
parent 4d99c9d99d
commit fde0926d52
10 changed files with 838 additions and 10 deletions

View File

@@ -10,6 +10,8 @@ use serde::Deserialize;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use crate::rcon::RconConfig;
/// Instance ids share the NATS subject namespace with host-level segments.
const RESERVED_INSTANCE_IDS: &[&str] = &["host", "cmd", "files", "update", "agent"];
@@ -59,6 +61,10 @@ pub struct InstanceConfig {
/// Working directory for the process. Defaults to the executable's directory.
#[serde(default)]
pub working_dir: Option<PathBuf>,
/// RCON connection settings for this instance. Absent = rcon unavailable.
/// Protocol defaults to WebRcon for rust, Source for conan/soulmask.
#[serde(default)]
pub rcon: Option<RconConfig>,
}
impl InstanceConfig {

View File

@@ -1,7 +1,7 @@
//! Per-instance command channel + state-change events.
//!
//! Each process-managed instance gets a request-reply subscriber on
//! `corrosion.{license}.{instance_id}.cmd` (funcs: start/stop/restart/status)
//! `corrosion.{license}.{instance_id}.cmd` (funcs: start/stop/restart/status/rcon)
//! and a publisher task that pushes every supervisor state change to
//! `corrosion.{license}.{instance_id}.status` — the panel sees crashes when
//! they happen, not when the next heartbeat ambles in.
@@ -19,6 +19,9 @@ use crate::subjects;
#[derive(Debug, Deserialize)]
struct InstanceCommand {
func: String,
/// Payload for funcs that carry a text argument (e.g. rcon).
#[serde(default)]
command: Option<String>,
}
/// Forward every supervisor state change as a status event.
@@ -91,7 +94,7 @@ async fn handle(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>, msg: async_nats:
};
let response = match serde_json::from_slice::<InstanceCommand>(&msg.payload) {
Ok(cmd) => dispatch(&sup, &cmd.func).await,
Ok(cmd) => dispatch(&agent, &sup, &cmd).await,
Err(e) => json!({ "status": "error", "message": format!("invalid command payload: {e}") }),
};
@@ -107,7 +110,13 @@ async fn handle(agent: Arc<Agent>, sup: Arc<ProcessSupervisor>, msg: async_nats:
}
}
async fn dispatch(sup: &Arc<ProcessSupervisor>, func: &str) -> serde_json::Value {
async fn dispatch(
agent: &Arc<Agent>,
sup: &Arc<ProcessSupervisor>,
cmd: &InstanceCommand,
) -> serde_json::Value {
let func = cmd.func.as_str();
let outcome = match func {
"start" => sup.start().await.map(|_| "starting"),
"stop" => sup.stop().await.map(|_| "stopped"),
@@ -121,10 +130,55 @@ async fn dispatch(sup: &Arc<ProcessSupervisor>, func: &str) -> serde_json::Value
"uptime_seconds": sup.uptime_seconds().await,
});
}
"rcon" => {
// Look up the InstanceConfig for this supervisor so we can access
// rcon settings and the game name without changing the supervisor's
// data model.
let inst_cfg = agent
.cfg
.instances
.iter()
.find(|i| i.id == sup.instance_id);
let rcon_cfg = inst_cfg.and_then(|i| i.rcon.as_ref());
let Some(rcon_cfg) = rcon_cfg else {
return json!({
"status": "error",
"func": "rcon",
"instance_id": sup.instance_id,
"message": format!("instance '{}' has no rcon configured", sup.instance_id),
});
};
let Some(command) = cmd.command.as_deref() else {
return json!({
"status": "error",
"func": "rcon",
"instance_id": sup.instance_id,
"message": "rcon func requires a 'command' field",
});
};
let game = inst_cfg.map(|i| i.game.as_str()).unwrap_or("rust");
return match crate::rcon::send_command(rcon_cfg, game, command).await {
Ok(output) => json!({
"status": "success",
"func": "rcon",
"instance_id": sup.instance_id,
"output": output,
}),
Err(e) => json!({
"status": "error",
"func": "rcon",
"instance_id": sup.instance_id,
"message": format!("{e:#}"),
}),
};
}
other => {
return json!({
"status": "error",
"message": format!("unknown func '{other}' (supported: start, stop, restart, status)"),
"message": format!("unknown func '{other}' (supported: start, stop, restart, status, rcon)"),
});
}
};

View File

@@ -8,6 +8,7 @@ pub mod hostcmd;
pub mod instancecmd;
pub mod prober;
pub mod process;
pub mod rcon;
pub mod subjects;
pub mod telemetry;
pub mod version;

View File

@@ -0,0 +1,320 @@
//! RCON client: game-server remote-console over WebRCON (Rust) or Source RCON (Conan/Soulmask).
//!
//! The agent runs co-located with the game server, so every connection targets
//! 127.0.0.1 — no TLS is needed and latency is sub-millisecond. Two protocols
//! are supported because the Rust game ships its own WebSocket-based WebRCON
//! while Conan Exiles and Soulmask use the Valve Source RCON wire format over
//! plain TCP.
//!
//! The protocol selection is explicit in the config (`kind`) but can be inferred
//! from the game name when absent — callers supply the `game` field they already
//! have in `InstanceConfig`.
use anyhow::{bail, Context, Result};
use futures::{SinkExt, StreamExt};
use rand::Rng;
use serde::Deserialize;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{timeout, Duration};
/// WebRCON is the Facepunch WebSocket protocol (Rust game).
/// Source RCON is the Valve wire protocol used by Conan Exiles and Soulmask.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RconKind {
WebRcon,
Source,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct RconConfig {
/// Protocol override. When absent the kind is resolved from `game`.
#[serde(default)]
pub kind: Option<RconKind>,
pub port: u16,
pub password: String,
}
impl RconConfig {
/// Resolve the concrete protocol, falling back to a per-game default when
/// `kind` is not set. rust → WebRcon; conan + soulmask → Source.
pub fn resolved_kind(&self, game: &str) -> RconKind {
if let Some(k) = self.kind {
return k;
}
match game {
"conan" | "soulmask" => RconKind::Source,
// rust is the primary game; anything unknown defaults to WebRcon
// — operators can always override with an explicit `kind`.
_ => RconKind::WebRcon,
}
}
}
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
/// Send `command` to the game server and return its text response.
///
/// The agent runs on the same host as the game server, so the target address
/// is always 127.0.0.1:{port}. Connection and response deadlines are fixed at
/// 5 s and 10 s respectively — enough headroom for a loaded server while still
/// catching hung connections quickly.
pub async fn send_command(cfg: &RconConfig, game: &str, command: &str) -> Result<String> {
match cfg.resolved_kind(game) {
RconKind::WebRcon => webrcon_exec(cfg, command).await,
RconKind::Source => source_rcon_exec(cfg, command).await,
}
}
// ---------------------------------------------------------------------------
// WebRCON (Rust game) — WebSocket JSON protocol
// ---------------------------------------------------------------------------
/// WebRCON request/response envelope. The server also emits chat/log frames
/// on this socket with Identifier == 0; those are skipped.
#[derive(serde::Serialize)]
struct WebRconRequest<'a> {
#[serde(rename = "Identifier")]
identifier: i32,
#[serde(rename = "Message")]
message: &'a str,
#[serde(rename = "Name")]
name: &'static str,
}
#[derive(serde::Deserialize)]
struct WebRconResponse {
#[serde(rename = "Identifier")]
identifier: i32,
#[serde(rename = "Message")]
message: String,
}
async fn webrcon_exec(cfg: &RconConfig, command: &str) -> Result<String> {
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as WsMsg;
// The Rust game server embeds the password in the WebSocket URL path —
// never interpolate the real URL into errors or logs.
let url = format!("ws://127.0.0.1:{}/{}", cfg.port, cfg.password);
let redacted = format!("ws://127.0.0.1:{}/<redacted>", cfg.port);
// Wrap the entire connection + exchange in the connect timeout — we want
// the timeout to cover TCP handshake + WS upgrade, not just the send.
let (mut ws, _) = timeout(CONNECT_TIMEOUT, connect_async(&url))
.await
.context("connect timeout")?
.with_context(|| format!("WebRCON connect to {redacted}"))?;
// Use a random positive i32 so correlation is unambiguous even when
// multiple callers share a port (future concurrency).
let id: i32 = rand::thread_rng().gen_range(1..=i32::MAX);
let req = WebRconRequest { identifier: id, message: command, name: "Corrosion" };
let payload = serde_json::to_string(&req).context("serialize WebRCON request")?;
ws.send(WsMsg::Text(payload))
.await
.context("send WebRCON command")?;
tracing::debug!("WebRCON sent id={id} command={command:?}");
// Read frames until we see our Identifier — skip chat/log noise (id 0 or
// any other value that isn't ours).
let result = timeout(RESPONSE_TIMEOUT, async {
loop {
match ws.next().await {
Some(Ok(WsMsg::Text(text))) => {
match serde_json::from_str::<WebRconResponse>(&text) {
Ok(resp) if resp.identifier == id => return Ok(resp.message),
Ok(_) => {
// Not our response (chat, log, another caller's frame).
tracing::trace!("WebRCON skipping frame with different Identifier");
continue;
}
Err(e) => {
tracing::trace!("WebRCON non-JSON frame ignored: {e}");
continue;
}
}
}
Some(Ok(WsMsg::Close(_))) => bail!("WebRCON server closed connection"),
Some(Ok(_)) => continue, // binary/ping/pong — skip
Some(Err(e)) => return Err(anyhow::anyhow!(e).context("WebRCON read error")),
None => bail!("WebRCON stream ended without response"),
}
}
})
.await
.context("WebRCON response timeout")??;
// Close cleanly; a send error here is cosmetic — we already have our data.
let _ = ws.close(None).await;
Ok(result)
}
// ---------------------------------------------------------------------------
// Source RCON (Conan Exiles, Soulmask) — Valve TCP binary protocol
//
// Packet layout (all fields little-endian):
// i32 size — byte count of the remaining packet (id + type + body + 2 nulls)
// i32 id — caller-chosen correlation id; auth failure returns -1
// i32 type — 0=RESPONSE_VALUE, 2=EXECCOMMAND/AUTH_RESPONSE, 3=AUTH
// [u8] body — UTF-8 command or response text
// u8 0x00 — body null terminator
// u8 0x00 — padding null terminator
//
// Multi-packet handling: after sending the command we also send an empty
// RESPONSE_VALUE probe with a distinct id. We collect all RESPONSE_VALUE
// packets belonging to the command id and stop when we receive the probe's
// response. This is the standard technique specified in the Valve wiki.
// ---------------------------------------------------------------------------
const RCON_TYPE_AUTH: i32 = 3;
const RCON_TYPE_AUTH_RESPONSE: i32 = 2;
const RCON_TYPE_EXECCOMMAND: i32 = 2;
const RCON_TYPE_RESPONSE_VALUE: i32 = 0;
/// Maximum accumulated response body (guards against misbehaving servers).
const MAX_RESPONSE_BYTES: usize = 1024 * 1024; // 1 MiB
async fn source_rcon_exec(cfg: &RconConfig, command: &str) -> Result<String> {
let addr = format!("127.0.0.1:{}", cfg.port);
let stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr))
.await
.context("connect timeout")?
.with_context(|| format!("Source RCON connect to {addr}"))?;
let mut stream = stream;
// --- Auth ---
let auth_id: i32 = rand::thread_rng().gen_range(1..=i32::MAX);
send_packet(&mut stream, auth_id, RCON_TYPE_AUTH, cfg.password.as_bytes()).await?;
// The server sends two responses to AUTH: first an empty RESPONSE_VALUE,
// then an AUTH_RESPONSE. We skip the first and read until AUTH_RESPONSE.
timeout(RESPONSE_TIMEOUT, async {
loop {
let (id, ptype, _body) = recv_packet(&mut stream).await?;
if ptype == RCON_TYPE_AUTH_RESPONSE {
if id == -1 {
bail!("Source RCON auth failed: wrong password");
}
tracing::debug!("Source RCON authenticated (id={id})");
return Ok(());
}
// Skip the empty RESPONSE_VALUE that precedes AUTH_RESPONSE.
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
})
.await
.context("Source RCON auth timeout")??;
// --- Command ---
let cmd_id: i32 = rand::thread_rng().gen_range(1..=i32::MAX);
// Probe id must differ from cmd_id.
let probe_id: i32 = loop {
let id: i32 = rand::thread_rng().gen_range(1..=i32::MAX);
if id != cmd_id {
break id;
}
};
send_packet(&mut stream, cmd_id, RCON_TYPE_EXECCOMMAND, command.as_bytes()).await?;
// Empty RESPONSE_VALUE probe — the server echoes it after processing the
// preceding command, signalling end-of-response.
send_packet(&mut stream, probe_id, RCON_TYPE_RESPONSE_VALUE, b"").await?;
// Not every server is probe-conformant (Soulmask unverified): once we hold
// response data, a short per-read quiet period also terminates — never
// discard a response we already received just because the probe echo
// didn't come back.
const QUIET_PERIOD: Duration = Duration::from_millis(1500);
let response = timeout(RESPONSE_TIMEOUT, async {
let mut body_accum: Vec<u8> = Vec::new();
loop {
let next = if body_accum.is_empty() {
recv_packet(&mut stream).await.map(Some)
} else {
match timeout(QUIET_PERIOD, recv_packet(&mut stream)).await {
Ok(res) => res.map(Some),
Err(_elapsed) => Ok(None), // quiet after data — done
}
};
let Some((id, ptype, body)) = next? else {
break;
};
if ptype != RCON_TYPE_RESPONSE_VALUE {
continue; // unexpected packet type — skip
}
if id == probe_id {
// Probe echoed back — all command response packets have arrived.
break;
}
if id == cmd_id {
if body_accum.len() + body.len() > MAX_RESPONSE_BYTES {
bail!("Source RCON response exceeded {MAX_RESPONSE_BYTES} bytes");
}
body_accum.extend_from_slice(&body);
}
// Skip packets with other ids (shouldn't happen but be defensive).
}
Ok::<Vec<u8>, anyhow::Error>(body_accum)
})
.await
.context("Source RCON response timeout")??;
String::from_utf8(response).context("Source RCON response is not valid UTF-8")
}
/// Write a Source RCON packet to the stream.
async fn send_packet(stream: &mut TcpStream, id: i32, ptype: i32, body: &[u8]) -> Result<()> {
// size = id(4) + type(4) + body(n) + 2 null terminators
let size = (4 + 4 + body.len() + 2) as i32;
let mut buf: Vec<u8> = Vec::with_capacity(4 + size as usize);
buf.extend_from_slice(&size.to_le_bytes());
buf.extend_from_slice(&id.to_le_bytes());
buf.extend_from_slice(&ptype.to_le_bytes());
buf.extend_from_slice(body);
buf.push(0x00);
buf.push(0x00);
stream.write_all(&buf).await.context("Source RCON write")?;
Ok(())
}
/// Read one Source RCON packet; returns (id, type, body).
async fn recv_packet(stream: &mut TcpStream) -> Result<(i32, i32, Vec<u8>)> {
let mut size_buf = [0u8; 4];
stream
.read_exact(&mut size_buf)
.await
.context("Source RCON read size")?;
let size = i32::from_le_bytes(size_buf) as usize;
// Minimum packet: id(4) + type(4) + 2 null terminators = 10 bytes.
if size < 10 {
bail!("Source RCON: malformed packet (size={size})");
}
if size > MAX_RESPONSE_BYTES + 16 {
bail!("Source RCON: packet too large ({size} bytes)");
}
let mut payload = vec![0u8; size];
stream
.read_exact(&mut payload)
.await
.context("Source RCON read payload")?;
let id = i32::from_le_bytes(payload[0..4].try_into().unwrap());
let ptype = i32::from_le_bytes(payload[4..8].try_into().unwrap());
// Body is everything between the two fields and the two trailing nulls.
let body_end = size.saturating_sub(2); // strip 2 null terminators
let body = payload[8..body_end].to_vec();
Ok((id, ptype, body))
}