From fde0926d524cccb6288edcfd4b868ccf322ac8a3 Mon Sep 17 00:00:00 2001 From: Vantz Stockwell Date: Thu, 11 Jun 2026 10:53:52 -0400 Subject: [PATCH] =?UTF-8?q?feat(host-agent):=20Phase=201b=20RCON=20?= =?UTF-8?q?=E2=80=94=20WebRCON=20(rust)=20+=20Source=20RCON=20(conan/soulm?= =?UTF-8?q?ask)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit rcon func on the instance command channel: WebSocket JSON WebRCON with Identifier correlation (skips chat/log noise frames) and full Valve Source RCON over TCP (auth, exec, multi-packet reassembly via empty probe, 1MiB cap). Protocol inferred from game, explicit kind override in [instance.rcon]. Always 127.0.0.1 — agent is co-located. Hardening from review: WebRCON password never interpolated into error contexts/logs (redacted URL); probe-tolerant termination — a quiet period after received data ends the response for servers that don't echo the probe (Soulmask conformance unverified), so data is never discarded on probe timeout. 13/13 tests green incl. mock Source-RCON server (auth/multi-packet/ errors) and mock WebRCON server (noise-frame skipping). Co-Authored-By: Claude Fable 5 --- corrosion-host-agent/Cargo.lock | 70 +++++ corrosion-host-agent/Cargo.toml | 1 + corrosion-host-agent/PROTOCOL.md | 17 +- corrosion-host-agent/agent.example.toml | 17 ++ corrosion-host-agent/src/config.rs | 6 + corrosion-host-agent/src/instancecmd.rs | 62 +++- corrosion-host-agent/src/lib.rs | 1 + corrosion-host-agent/src/rcon.rs | 320 ++++++++++++++++++++ corrosion-host-agent/tests/rcon.rs | 353 +++++++++++++++++++++++ corrosion-host-agent/tests/supervisor.rs | 1 + 10 files changed, 838 insertions(+), 10 deletions(-) create mode 100644 corrosion-host-agent/src/rcon.rs create mode 100644 corrosion-host-agent/tests/rcon.rs diff --git a/corrosion-host-agent/Cargo.lock b/corrosion-host-agent/Cargo.lock index 304dbac..1ffc2c4 100644 --- a/corrosion-host-agent/Cargo.lock +++ b/corrosion-host-agent/Cargo.lock @@ -149,6 +149,12 @@ version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" @@ -271,6 +277,7 @@ dependencies = [ "serde_json", "sysinfo", "tokio", + "tokio-tungstenite", "tokio-util", "toml", "tracing", @@ -581,6 +588,22 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "http" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -1277,6 +1300,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -1529,6 +1563,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -1655,6 +1701,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.20.1" @@ -1685,6 +1749,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/corrosion-host-agent/Cargo.toml b/corrosion-host-agent/Cargo.toml index d7a2980..935e8fa 100644 --- a/corrosion-host-agent/Cargo.toml +++ b/corrosion-host-agent/Cargo.toml @@ -25,6 +25,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } anyhow = "1" clap = { version = "4.5", features = ["derive"] } rand = "0.8" +tokio-tungstenite = "0.24" [target.'cfg(unix)'.dependencies] libc = "0.2" diff --git a/corrosion-host-agent/PROTOCOL.md b/corrosion-host-agent/PROTOCOL.md index 21e9cc8..c7cd6f6 100644 --- a/corrosion-host-agent/PROTOCOL.md +++ b/corrosion-host-agent/PROTOCOL.md @@ -101,13 +101,18 @@ Payload: `{}`. Lifecycle and control for one game instance. Implemented funcs: `start`, `stop` (graceful with 30s budget, then force -kill), `restart`, `status` (returns `state` + `uptime_seconds`). Errors reply -`{ "status": "error", "message": ... }` — including start on an unmanaged -instance, double start, and unknown funcs. +kill), `restart`, `status` (returns `state` + `uptime_seconds`), and +`rcon` — `{ "func": "rcon", "command": "" }` returns +`{ "status": "success", "output": }`. Protocol per game: +WebRCON (WebSocket JSON) for rust, Source RCON (Valve TCP) for +conan/soulmask; explicit `kind` override available in the instance's +`[instance.rcon]` config. Always targets 127.0.0.1 (agent is co-located). +Errors reply `{ "status": "error", "message": ... }` — including start on an +unmanaged instance, double start, missing rcon config, and unknown funcs. -Planned funcs: `rcon` (process-class games), `steam_update`, `oxide_install` -(rust), plus game-adapter-specific commands (Dune: docker lifecycle, RabbitMQ -bus commands, Coriolis reset). +Planned funcs: `steam_update`, `oxide_install` (rust), plus +game-adapter-specific commands (Dune: docker lifecycle, RabbitMQ bus +commands, Coriolis reset). ### `corrosion.{license_id}.{instance_id}.status` (agent → backend, publish) — LIVE diff --git a/corrosion-host-agent/agent.example.toml b/corrosion-host-agent/agent.example.toml index 1d6e8c9..6004b79 100644 --- a/corrosion-host-agent/agent.example.toml +++ b/corrosion-host-agent/agent.example.toml @@ -23,11 +23,28 @@ game = "rust" # rust | conan | soulmask | dune root = "/opt/rustserver" label = "Main 2x Vanilla" +# RCON lets the panel send console commands to the running server. +# For rust the protocol is WebRCON (WebSocket JSON); for conan/soulmask it is +# Source RCON (Valve TCP binary). `kind` is optional — it is inferred from +# the game name when absent. +# +# The [instance.rcon] sub-table MUST immediately follow the [[instance]] entry +# it belongs to (standard TOML array-of-tables scoping rule). +[instance.rcon] +port = 28016 +password = "changeme" +# kind = "webrcon" # explicit override; omit to infer from game + # [[instance]] # id = "soulmask-main" # game = "soulmask" # root = "/opt/soulmask/main" # label = "Cloud Mist Forest (cluster main)" +# +# [instance.rcon] +# port = 19000 +# password = "changeme" +# # kind = "source" # inferred automatically for soulmask [prober] interval_seconds = 300 diff --git a/corrosion-host-agent/src/config.rs b/corrosion-host-agent/src/config.rs index 2cd271e..e715b4c 100644 --- a/corrosion-host-agent/src/config.rs +++ b/corrosion-host-agent/src/config.rs @@ -10,6 +10,8 @@ use serde::Deserialize; use std::collections::HashSet; use std::path::{Path, PathBuf}; +use crate::rcon::RconConfig; + /// Instance ids share the NATS subject namespace with host-level segments. const RESERVED_INSTANCE_IDS: &[&str] = &["host", "cmd", "files", "update", "agent"]; @@ -59,6 +61,10 @@ pub struct InstanceConfig { /// Working directory for the process. Defaults to the executable's directory. #[serde(default)] pub working_dir: Option, + /// RCON connection settings for this instance. Absent = rcon unavailable. + /// Protocol defaults to WebRcon for rust, Source for conan/soulmask. + #[serde(default)] + pub rcon: Option, } impl InstanceConfig { diff --git a/corrosion-host-agent/src/instancecmd.rs b/corrosion-host-agent/src/instancecmd.rs index 7746c51..d291815 100644 --- a/corrosion-host-agent/src/instancecmd.rs +++ b/corrosion-host-agent/src/instancecmd.rs @@ -1,7 +1,7 @@ //! Per-instance command channel + state-change events. //! //! Each process-managed instance gets a request-reply subscriber on -//! `corrosion.{license}.{instance_id}.cmd` (funcs: start/stop/restart/status) +//! `corrosion.{license}.{instance_id}.cmd` (funcs: start/stop/restart/status/rcon) //! and a publisher task that pushes every supervisor state change to //! `corrosion.{license}.{instance_id}.status` — the panel sees crashes when //! they happen, not when the next heartbeat ambles in. @@ -19,6 +19,9 @@ use crate::subjects; #[derive(Debug, Deserialize)] struct InstanceCommand { func: String, + /// Payload for funcs that carry a text argument (e.g. rcon). + #[serde(default)] + command: Option, } /// Forward every supervisor state change as a status event. @@ -91,7 +94,7 @@ async fn handle(agent: Arc, sup: Arc, msg: async_nats: }; let response = match serde_json::from_slice::(&msg.payload) { - Ok(cmd) => dispatch(&sup, &cmd.func).await, + Ok(cmd) => dispatch(&agent, &sup, &cmd).await, Err(e) => json!({ "status": "error", "message": format!("invalid command payload: {e}") }), }; @@ -107,7 +110,13 @@ async fn handle(agent: Arc, sup: Arc, msg: async_nats: } } -async fn dispatch(sup: &Arc, func: &str) -> serde_json::Value { +async fn dispatch( + agent: &Arc, + sup: &Arc, + cmd: &InstanceCommand, +) -> serde_json::Value { + let func = cmd.func.as_str(); + let outcome = match func { "start" => sup.start().await.map(|_| "starting"), "stop" => sup.stop().await.map(|_| "stopped"), @@ -121,10 +130,55 @@ async fn dispatch(sup: &Arc, func: &str) -> serde_json::Value "uptime_seconds": sup.uptime_seconds().await, }); } + "rcon" => { + // Look up the InstanceConfig for this supervisor so we can access + // rcon settings and the game name without changing the supervisor's + // data model. + let inst_cfg = agent + .cfg + .instances + .iter() + .find(|i| i.id == sup.instance_id); + + let rcon_cfg = inst_cfg.and_then(|i| i.rcon.as_ref()); + let Some(rcon_cfg) = rcon_cfg else { + return json!({ + "status": "error", + "func": "rcon", + "instance_id": sup.instance_id, + "message": format!("instance '{}' has no rcon configured", sup.instance_id), + }); + }; + + let Some(command) = cmd.command.as_deref() else { + return json!({ + "status": "error", + "func": "rcon", + "instance_id": sup.instance_id, + "message": "rcon func requires a 'command' field", + }); + }; + + let game = inst_cfg.map(|i| i.game.as_str()).unwrap_or("rust"); + return match crate::rcon::send_command(rcon_cfg, game, command).await { + Ok(output) => json!({ + "status": "success", + "func": "rcon", + "instance_id": sup.instance_id, + "output": output, + }), + Err(e) => json!({ + "status": "error", + "func": "rcon", + "instance_id": sup.instance_id, + "message": format!("{e:#}"), + }), + }; + } other => { return json!({ "status": "error", - "message": format!("unknown func '{other}' (supported: start, stop, restart, status)"), + "message": format!("unknown func '{other}' (supported: start, stop, restart, status, rcon)"), }); } }; diff --git a/corrosion-host-agent/src/lib.rs b/corrosion-host-agent/src/lib.rs index 8708e0d..e59c5e8 100644 --- a/corrosion-host-agent/src/lib.rs +++ b/corrosion-host-agent/src/lib.rs @@ -8,6 +8,7 @@ pub mod hostcmd; pub mod instancecmd; pub mod prober; pub mod process; +pub mod rcon; pub mod subjects; pub mod telemetry; pub mod version; diff --git a/corrosion-host-agent/src/rcon.rs b/corrosion-host-agent/src/rcon.rs new file mode 100644 index 0000000..86a4edd --- /dev/null +++ b/corrosion-host-agent/src/rcon.rs @@ -0,0 +1,320 @@ +//! RCON client: game-server remote-console over WebRCON (Rust) or Source RCON (Conan/Soulmask). +//! +//! The agent runs co-located with the game server, so every connection targets +//! 127.0.0.1 — no TLS is needed and latency is sub-millisecond. Two protocols +//! are supported because the Rust game ships its own WebSocket-based WebRCON +//! while Conan Exiles and Soulmask use the Valve Source RCON wire format over +//! plain TCP. +//! +//! The protocol selection is explicit in the config (`kind`) but can be inferred +//! from the game name when absent — callers supply the `game` field they already +//! have in `InstanceConfig`. + +use anyhow::{bail, Context, Result}; +use futures::{SinkExt, StreamExt}; +use rand::Rng; +use serde::Deserialize; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::time::{timeout, Duration}; + +/// WebRCON is the Facepunch WebSocket protocol (Rust game). +/// Source RCON is the Valve wire protocol used by Conan Exiles and Soulmask. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum RconKind { + WebRcon, + Source, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RconConfig { + /// Protocol override. When absent the kind is resolved from `game`. + #[serde(default)] + pub kind: Option, + pub port: u16, + pub password: String, +} + +impl RconConfig { + /// Resolve the concrete protocol, falling back to a per-game default when + /// `kind` is not set. rust → WebRcon; conan + soulmask → Source. + pub fn resolved_kind(&self, game: &str) -> RconKind { + if let Some(k) = self.kind { + return k; + } + match game { + "conan" | "soulmask" => RconKind::Source, + // rust is the primary game; anything unknown defaults to WebRcon + // — operators can always override with an explicit `kind`. + _ => RconKind::WebRcon, + } + } +} + +const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +const RESPONSE_TIMEOUT: Duration = Duration::from_secs(10); + +/// Send `command` to the game server and return its text response. +/// +/// The agent runs on the same host as the game server, so the target address +/// is always 127.0.0.1:{port}. Connection and response deadlines are fixed at +/// 5 s and 10 s respectively — enough headroom for a loaded server while still +/// catching hung connections quickly. +pub async fn send_command(cfg: &RconConfig, game: &str, command: &str) -> Result { + match cfg.resolved_kind(game) { + RconKind::WebRcon => webrcon_exec(cfg, command).await, + RconKind::Source => source_rcon_exec(cfg, command).await, + } +} + +// --------------------------------------------------------------------------- +// WebRCON (Rust game) — WebSocket JSON protocol +// --------------------------------------------------------------------------- + +/// WebRCON request/response envelope. The server also emits chat/log frames +/// on this socket with Identifier == 0; those are skipped. +#[derive(serde::Serialize)] +struct WebRconRequest<'a> { + #[serde(rename = "Identifier")] + identifier: i32, + #[serde(rename = "Message")] + message: &'a str, + #[serde(rename = "Name")] + name: &'static str, +} + +#[derive(serde::Deserialize)] +struct WebRconResponse { + #[serde(rename = "Identifier")] + identifier: i32, + #[serde(rename = "Message")] + message: String, +} + +async fn webrcon_exec(cfg: &RconConfig, command: &str) -> Result { + use tokio_tungstenite::connect_async; + use tokio_tungstenite::tungstenite::Message as WsMsg; + + // The Rust game server embeds the password in the WebSocket URL path — + // never interpolate the real URL into errors or logs. + let url = format!("ws://127.0.0.1:{}/{}", cfg.port, cfg.password); + let redacted = format!("ws://127.0.0.1:{}/", cfg.port); + + // Wrap the entire connection + exchange in the connect timeout — we want + // the timeout to cover TCP handshake + WS upgrade, not just the send. + let (mut ws, _) = timeout(CONNECT_TIMEOUT, connect_async(&url)) + .await + .context("connect timeout")? + .with_context(|| format!("WebRCON connect to {redacted}"))?; + + // Use a random positive i32 so correlation is unambiguous even when + // multiple callers share a port (future concurrency). + let id: i32 = rand::thread_rng().gen_range(1..=i32::MAX); + let req = WebRconRequest { identifier: id, message: command, name: "Corrosion" }; + let payload = serde_json::to_string(&req).context("serialize WebRCON request")?; + + ws.send(WsMsg::Text(payload)) + .await + .context("send WebRCON command")?; + + tracing::debug!("WebRCON sent id={id} command={command:?}"); + + // Read frames until we see our Identifier — skip chat/log noise (id 0 or + // any other value that isn't ours). + let result = timeout(RESPONSE_TIMEOUT, async { + loop { + match ws.next().await { + Some(Ok(WsMsg::Text(text))) => { + match serde_json::from_str::(&text) { + Ok(resp) if resp.identifier == id => return Ok(resp.message), + Ok(_) => { + // Not our response (chat, log, another caller's frame). + tracing::trace!("WebRCON skipping frame with different Identifier"); + continue; + } + Err(e) => { + tracing::trace!("WebRCON non-JSON frame ignored: {e}"); + continue; + } + } + } + Some(Ok(WsMsg::Close(_))) => bail!("WebRCON server closed connection"), + Some(Ok(_)) => continue, // binary/ping/pong — skip + Some(Err(e)) => return Err(anyhow::anyhow!(e).context("WebRCON read error")), + None => bail!("WebRCON stream ended without response"), + } + } + }) + .await + .context("WebRCON response timeout")??; + + // Close cleanly; a send error here is cosmetic — we already have our data. + let _ = ws.close(None).await; + + Ok(result) +} + +// --------------------------------------------------------------------------- +// Source RCON (Conan Exiles, Soulmask) — Valve TCP binary protocol +// +// Packet layout (all fields little-endian): +// i32 size — byte count of the remaining packet (id + type + body + 2 nulls) +// i32 id — caller-chosen correlation id; auth failure returns -1 +// i32 type — 0=RESPONSE_VALUE, 2=EXECCOMMAND/AUTH_RESPONSE, 3=AUTH +// [u8] body — UTF-8 command or response text +// u8 0x00 — body null terminator +// u8 0x00 — padding null terminator +// +// Multi-packet handling: after sending the command we also send an empty +// RESPONSE_VALUE probe with a distinct id. We collect all RESPONSE_VALUE +// packets belonging to the command id and stop when we receive the probe's +// response. This is the standard technique specified in the Valve wiki. +// --------------------------------------------------------------------------- + +const RCON_TYPE_AUTH: i32 = 3; +const RCON_TYPE_AUTH_RESPONSE: i32 = 2; +const RCON_TYPE_EXECCOMMAND: i32 = 2; +const RCON_TYPE_RESPONSE_VALUE: i32 = 0; + +/// Maximum accumulated response body (guards against misbehaving servers). +const MAX_RESPONSE_BYTES: usize = 1024 * 1024; // 1 MiB + +async fn source_rcon_exec(cfg: &RconConfig, command: &str) -> Result { + let addr = format!("127.0.0.1:{}", cfg.port); + + let stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&addr)) + .await + .context("connect timeout")? + .with_context(|| format!("Source RCON connect to {addr}"))?; + + let mut stream = stream; + + // --- Auth --- + let auth_id: i32 = rand::thread_rng().gen_range(1..=i32::MAX); + send_packet(&mut stream, auth_id, RCON_TYPE_AUTH, cfg.password.as_bytes()).await?; + + // The server sends two responses to AUTH: first an empty RESPONSE_VALUE, + // then an AUTH_RESPONSE. We skip the first and read until AUTH_RESPONSE. + timeout(RESPONSE_TIMEOUT, async { + loop { + let (id, ptype, _body) = recv_packet(&mut stream).await?; + if ptype == RCON_TYPE_AUTH_RESPONSE { + if id == -1 { + bail!("Source RCON auth failed: wrong password"); + } + tracing::debug!("Source RCON authenticated (id={id})"); + return Ok(()); + } + // Skip the empty RESPONSE_VALUE that precedes AUTH_RESPONSE. + } + #[allow(unreachable_code)] + Ok::<(), anyhow::Error>(()) + }) + .await + .context("Source RCON auth timeout")??; + + // --- Command --- + let cmd_id: i32 = rand::thread_rng().gen_range(1..=i32::MAX); + // Probe id must differ from cmd_id. + let probe_id: i32 = loop { + let id: i32 = rand::thread_rng().gen_range(1..=i32::MAX); + if id != cmd_id { + break id; + } + }; + + send_packet(&mut stream, cmd_id, RCON_TYPE_EXECCOMMAND, command.as_bytes()).await?; + // Empty RESPONSE_VALUE probe — the server echoes it after processing the + // preceding command, signalling end-of-response. + send_packet(&mut stream, probe_id, RCON_TYPE_RESPONSE_VALUE, b"").await?; + + // Not every server is probe-conformant (Soulmask unverified): once we hold + // response data, a short per-read quiet period also terminates — never + // discard a response we already received just because the probe echo + // didn't come back. + const QUIET_PERIOD: Duration = Duration::from_millis(1500); + let response = timeout(RESPONSE_TIMEOUT, async { + let mut body_accum: Vec = Vec::new(); + loop { + let next = if body_accum.is_empty() { + recv_packet(&mut stream).await.map(Some) + } else { + match timeout(QUIET_PERIOD, recv_packet(&mut stream)).await { + Ok(res) => res.map(Some), + Err(_elapsed) => Ok(None), // quiet after data — done + } + }; + let Some((id, ptype, body)) = next? else { + break; + }; + if ptype != RCON_TYPE_RESPONSE_VALUE { + continue; // unexpected packet type — skip + } + if id == probe_id { + // Probe echoed back — all command response packets have arrived. + break; + } + if id == cmd_id { + if body_accum.len() + body.len() > MAX_RESPONSE_BYTES { + bail!("Source RCON response exceeded {MAX_RESPONSE_BYTES} bytes"); + } + body_accum.extend_from_slice(&body); + } + // Skip packets with other ids (shouldn't happen but be defensive). + } + Ok::, anyhow::Error>(body_accum) + }) + .await + .context("Source RCON response timeout")??; + + String::from_utf8(response).context("Source RCON response is not valid UTF-8") +} + +/// Write a Source RCON packet to the stream. +async fn send_packet(stream: &mut TcpStream, id: i32, ptype: i32, body: &[u8]) -> Result<()> { + // size = id(4) + type(4) + body(n) + 2 null terminators + let size = (4 + 4 + body.len() + 2) as i32; + let mut buf: Vec = Vec::with_capacity(4 + size as usize); + buf.extend_from_slice(&size.to_le_bytes()); + buf.extend_from_slice(&id.to_le_bytes()); + buf.extend_from_slice(&ptype.to_le_bytes()); + buf.extend_from_slice(body); + buf.push(0x00); + buf.push(0x00); + stream.write_all(&buf).await.context("Source RCON write")?; + Ok(()) +} + +/// Read one Source RCON packet; returns (id, type, body). +async fn recv_packet(stream: &mut TcpStream) -> Result<(i32, i32, Vec)> { + let mut size_buf = [0u8; 4]; + stream + .read_exact(&mut size_buf) + .await + .context("Source RCON read size")?; + let size = i32::from_le_bytes(size_buf) as usize; + + // Minimum packet: id(4) + type(4) + 2 null terminators = 10 bytes. + if size < 10 { + bail!("Source RCON: malformed packet (size={size})"); + } + if size > MAX_RESPONSE_BYTES + 16 { + bail!("Source RCON: packet too large ({size} bytes)"); + } + + let mut payload = vec![0u8; size]; + stream + .read_exact(&mut payload) + .await + .context("Source RCON read payload")?; + + let id = i32::from_le_bytes(payload[0..4].try_into().unwrap()); + let ptype = i32::from_le_bytes(payload[4..8].try_into().unwrap()); + // Body is everything between the two fields and the two trailing nulls. + let body_end = size.saturating_sub(2); // strip 2 null terminators + let body = payload[8..body_end].to_vec(); + + Ok((id, ptype, body)) +} diff --git a/corrosion-host-agent/tests/rcon.rs b/corrosion-host-agent/tests/rcon.rs new file mode 100644 index 0000000..fc4f0fc --- /dev/null +++ b/corrosion-host-agent/tests/rcon.rs @@ -0,0 +1,353 @@ +//! RCON integration tests using in-process mock servers. +//! +//! Real OS sockets on ephemeral ports — no mocking framework. Each test +//! binds a listener, spawns a task that speaks the expected protocol, then +//! exercises `rcon::send_command` and asserts on the result. Tests are +//! unix-only because the musl cross-compile target and the CI runner are both +//! Linux; the production use case is also Linux-only (game servers don't run +//! on macOS or Windows in production). +//! +//! We use `#[cfg(unix)]` to keep parity with the supervisor integration tests. +#![cfg(unix)] + +use corrosion_host_agent::rcon::{RconConfig, RconKind}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; + +// --------------------------------------------------------------------------- +// Source RCON helpers — duplicate the wire-format encode/decode locally so +// the tests own the mock server without depending on the production code path. +// --------------------------------------------------------------------------- + +/// Build a Source RCON packet: [size(4LE) | id(4LE) | type(4LE) | body | 0x00 0x00] +fn encode_packet(id: i32, ptype: i32, body: &[u8]) -> Vec { + let size = (4 + 4 + body.len() + 2) as i32; + let mut out = Vec::with_capacity(4 + size as usize); + out.extend_from_slice(&size.to_le_bytes()); + out.extend_from_slice(&id.to_le_bytes()); + out.extend_from_slice(&ptype.to_le_bytes()); + out.extend_from_slice(body); + out.push(0x00); + out.push(0x00); + out +} + +/// Read one Source RCON packet from a TcpStream. +async fn read_packet(stream: &mut TcpStream) -> (i32, i32, Vec) { + let mut size_buf = [0u8; 4]; + stream.read_exact(&mut size_buf).await.unwrap(); + let size = i32::from_le_bytes(size_buf) as usize; + + let mut payload = vec![0u8; size]; + stream.read_exact(&mut payload).await.unwrap(); + + let id = i32::from_le_bytes(payload[0..4].try_into().unwrap()); + let ptype = i32::from_le_bytes(payload[4..8].try_into().unwrap()); + let body_end = size.saturating_sub(2); + let body = payload[8..body_end].to_vec(); + (id, ptype, body) +} + +const SOURCE_TYPE_AUTH: i32 = 3; +const SOURCE_TYPE_AUTH_RESPONSE: i32 = 2; +const SOURCE_TYPE_EXECCOMMAND: i32 = 2; +const SOURCE_TYPE_RESPONSE_VALUE: i32 = 0; + +// --------------------------------------------------------------------------- +// Mock Source RCON server +// --------------------------------------------------------------------------- + +/// Run a Source RCON server that accepts password "goodpw", rejects others, +/// and responds to the first EXECCOMMAND with `response_body`. +/// +/// If `split_at` is Some(n) the body is split: the first `n` bytes arrive in +/// one RESPONSE_VALUE packet and the remainder in a second — testing multi- +/// packet reassembly. +async fn run_source_mock( + mut stream: TcpStream, + accept_password: &str, + command_response: &[u8], + split_at: Option, +) { + // --- Auth phase --- + let (auth_id, ptype, body) = read_packet(&mut stream).await; + assert_eq!(ptype, SOURCE_TYPE_AUTH, "expected AUTH packet"); + + let password = String::from_utf8_lossy(&body); + if password != accept_password { + // Send empty RESPONSE_VALUE then AUTH_RESPONSE with id = -1 (failure). + let empty = encode_packet(auth_id, SOURCE_TYPE_RESPONSE_VALUE, b""); + stream.write_all(&empty).await.unwrap(); + let fail = encode_packet(-1, SOURCE_TYPE_AUTH_RESPONSE, b""); + stream.write_all(&fail).await.unwrap(); + return; + } + + // Success: empty RESPONSE_VALUE then AUTH_RESPONSE with the auth id. + let empty = encode_packet(auth_id, SOURCE_TYPE_RESPONSE_VALUE, b""); + stream.write_all(&empty).await.unwrap(); + let ok = encode_packet(auth_id, SOURCE_TYPE_AUTH_RESPONSE, b""); + stream.write_all(&ok).await.unwrap(); + + // --- Command phase --- + let (cmd_id, cmd_ptype, _cmd_body) = read_packet(&mut stream).await; + assert_eq!(cmd_ptype, SOURCE_TYPE_EXECCOMMAND, "expected EXECCOMMAND"); + + // Read the probe packet (empty RESPONSE_VALUE with a different id). + let (probe_id, probe_ptype, _) = read_packet(&mut stream).await; + assert_eq!(probe_ptype, SOURCE_TYPE_RESPONSE_VALUE, "expected probe packet"); + + // Send the command response, optionally split across two packets. + if let Some(n) = split_at { + let (part1, part2) = command_response.split_at(n.min(command_response.len())); + let p1 = encode_packet(cmd_id, SOURCE_TYPE_RESPONSE_VALUE, part1); + stream.write_all(&p1).await.unwrap(); + let p2 = encode_packet(cmd_id, SOURCE_TYPE_RESPONSE_VALUE, part2); + stream.write_all(&p2).await.unwrap(); + } else { + let p = encode_packet(cmd_id, SOURCE_TYPE_RESPONSE_VALUE, command_response); + stream.write_all(&p).await.unwrap(); + } + + // Echo the probe to signal end-of-response. + let probe_echo = encode_packet(probe_id, SOURCE_TYPE_RESPONSE_VALUE, b""); + stream.write_all(&probe_echo).await.unwrap(); +} + +// --------------------------------------------------------------------------- +// Source RCON tests +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn source_rcon_auth_and_exec_returns_response() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + run_source_mock(stream, "goodpw", b"Hello from server", None).await; + }); + + let cfg = RconConfig { kind: Some(RconKind::Source), port, password: "goodpw".to_string() }; + let result = corrosion_host_agent::rcon::send_command(&cfg, "conan", "status") + .await + .expect("command should succeed"); + + assert_eq!(result, "Hello from server"); +} + +#[tokio::test] +async fn source_rcon_wrong_password_returns_auth_error() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + run_source_mock(stream, "goodpw", b"should not see this", None).await; + }); + + let cfg = RconConfig { kind: Some(RconKind::Source), port, password: "wrongpw".to_string() }; + let err = corrosion_host_agent::rcon::send_command(&cfg, "conan", "status") + .await + .expect_err("wrong password should fail"); + + assert!( + err.to_string().to_lowercase().contains("auth"), + "error should mention auth failure, got: {err}" + ); +} + +#[tokio::test] +async fn source_rcon_multi_packet_response_concatenated() { + // Build a body large enough to split meaningfully across two packets. + // Use repeating ASCII so the result is valid UTF-8 and easy to verify. + // 200 'A's then 200 'B's = 400 bytes, split at 200. + let body: Vec = std::iter::repeat_n(b'A', 200) + .chain(std::iter::repeat_n(b'B', 200)) + .collect(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let body_clone = body.clone(); + + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + run_source_mock(stream, "goodpw", &body_clone, Some(200)).await; + }); + + let cfg = RconConfig { kind: Some(RconKind::Source), port, password: "goodpw".to_string() }; + let result = corrosion_host_agent::rcon::send_command(&cfg, "soulmask", "showplayers") + .await + .expect("multi-packet command should succeed"); + + let expected = String::from_utf8(body).unwrap(); + assert_eq!(result, expected, "full body should be concatenated across both packets"); +} + +#[tokio::test] +async fn source_rcon_connect_timeout_to_unreachable_port() { + // Bind a listener but never accept — the connection will time out during + // the RCON auth phase because nothing is reading from the socket. + // We use a port that is bound (so TCP connect itself succeeds) but then + // the mock simply drops the stream, forcing a read error, which should + // surface as an error (not a panic or hang). + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + // Accept the TCP connection but immediately drop it — simulates a port + // that accepts but never speaks RCON. + tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + // _stream dropped here — EOF on the client's read + }); + + let cfg = + RconConfig { kind: Some(RconKind::Source), port, password: "goodpw".to_string() }; + let err = corrosion_host_agent::rcon::send_command(&cfg, "conan", "status") + .await + .expect_err("closed connection should fail"); + + // We just need it to fail and not hang; error message varies by OS. + let _ = err; +} + +// --------------------------------------------------------------------------- +// WebRCON mock server +// --------------------------------------------------------------------------- + +/// Run a WebRCON mock: send one noise frame (Identifier 0), then respond to +/// the first real request with the given output. +async fn run_webrcon_mock(stream: tokio::net::TcpStream, output: &str) { + use futures::{SinkExt, StreamExt}; + use tokio_tungstenite::accept_async; + use tokio_tungstenite::tungstenite::Message as WsMsg; + + let mut ws = accept_async(stream).await.expect("WS handshake failed"); + + // Send noise (chat frame, Identifier 0) before the real request arrives. + let noise = serde_json::json!({ + "Identifier": 0, + "Message": "Player X joined", + "Name": "Server", + "Type": "Chat" + }); + ws.send(WsMsg::Text(noise.to_string())) + .await + .unwrap(); + + // Read the command request. + let msg = ws.next().await.unwrap().unwrap(); + let text = match msg { + WsMsg::Text(t) => t, + other => panic!("expected Text frame, got {other:?}"), + }; + let req: serde_json::Value = serde_json::from_str(&text).unwrap(); + let req_id = req["Identifier"].as_i64().unwrap() as i32; + + // Reply with the same Identifier so the client correlates correctly. + let reply = serde_json::json!({ + "Identifier": req_id, + "Message": output, + "Type": "Generic", + }); + ws.send(WsMsg::Text(reply.to_string())).await.unwrap(); +} + +// --------------------------------------------------------------------------- +// WebRCON tests +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn webrcon_skips_noise_and_returns_correct_message() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + run_webrcon_mock(stream, "Players: 42/100").await; + }); + + // Password is embedded in the URL path — any non-empty string works with + // our mock. + let cfg = RconConfig { + kind: Some(RconKind::WebRcon), + port, + password: "testpw".to_string(), + }; + let result = corrosion_host_agent::rcon::send_command(&cfg, "rust", "playercount") + .await + .expect("WebRCON command should succeed"); + + assert_eq!(result, "Players: 42/100"); +} + +// --------------------------------------------------------------------------- +// TOML parsing test — pins [[instance]] + [instance.rcon] sub-table syntax +// --------------------------------------------------------------------------- + +#[test] +fn toml_instance_with_rcon_parses_correctly() { + let toml = r#" +[agent] +license_id = "test-license" +nats_url = "nats://localhost:4222" + +[[instance]] +id = "rust-main" +game = "rust" +root = "/opt/rustserver" + +[instance.rcon] +port = 28016 +password = "secretpassword" +kind = "webrcon" +"#; + + let cfg: corrosion_host_agent::config::ConfigFile = + toml::from_str(toml).expect("TOML should parse"); + + assert_eq!(cfg.instances.len(), 1); + let inst = &cfg.instances[0]; + assert_eq!(inst.id, "rust-main"); + + let rcon = inst.rcon.as_ref().expect("rcon should be present"); + assert_eq!(rcon.port, 28016); + assert_eq!(rcon.password, "secretpassword"); + assert_eq!(rcon.kind, Some(corrosion_host_agent::rcon::RconKind::WebRcon)); +} + +#[test] +fn toml_instance_without_rcon_defaults_to_none() { + let toml = r#" +[agent] +license_id = "test-license" +nats_url = "nats://localhost:4222" + +[[instance]] +id = "conan-main" +game = "conan" +root = "/opt/conan" +"#; + + let cfg: corrosion_host_agent::config::ConfigFile = + toml::from_str(toml).expect("TOML should parse"); + + assert!(cfg.instances[0].rcon.is_none(), "absent rcon should be None"); +} + +#[test] +fn resolved_kind_infers_from_game_name() { + use corrosion_host_agent::rcon::{RconConfig, RconKind}; + + let cfg_no_kind = RconConfig { kind: None, port: 28016, password: "x".to_string() }; + assert_eq!(cfg_no_kind.resolved_kind("rust"), RconKind::WebRcon); + assert_eq!(cfg_no_kind.resolved_kind("conan"), RconKind::Source); + assert_eq!(cfg_no_kind.resolved_kind("soulmask"), RconKind::Source); + assert_eq!(cfg_no_kind.resolved_kind("dune"), RconKind::WebRcon); // fallback + + // Explicit kind always wins. + let cfg_source = RconConfig { kind: Some(RconKind::Source), ..cfg_no_kind.clone() }; + assert_eq!(cfg_source.resolved_kind("rust"), RconKind::Source); + + let cfg_webrcon = RconConfig { kind: Some(RconKind::WebRcon), ..cfg_no_kind }; + assert_eq!(cfg_webrcon.resolved_kind("conan"), RconKind::WebRcon); +} diff --git a/corrosion-host-agent/tests/supervisor.rs b/corrosion-host-agent/tests/supervisor.rs index db2b94a..9e7db4f 100644 --- a/corrosion-host-agent/tests/supervisor.rs +++ b/corrosion-host-agent/tests/supervisor.rs @@ -19,6 +19,7 @@ fn managed_instance(executable: &str, args: &[&str]) -> InstanceConfig { executable: Some(PathBuf::from(executable)), args: args.iter().map(|s| s.to_string()).collect(), working_dir: None, + rcon: None, } }