//! Host-level command handler: request-reply on `corrosion.{license}.host.cmd`. //! //! One subscriber; each message handled in its own task so a slow command //! never blocks the dispatch loop. Phase 0 commands: ping, probe, sysinfo. use futures::StreamExt; use serde::Deserialize; use serde_json::json; use std::sync::Arc; use sysinfo::System; use crate::agent::Agent; use crate::prober; use crate::subjects; use crate::telemetry; use crate::update; use crate::version; #[derive(Debug, Deserialize)] struct HostCommand { func: String, /// Signed-update artifact URL (for func = "update"). #[serde(default)] url: Option, } pub async fn run(agent: Arc) -> anyhow::Result<()> { let subject = subjects::host_cmd(&agent.cfg.license_id); let mut sub = agent.nats.subscribe(subject.clone()).await?; tracing::info!("host command handler listening on {subject}"); let cancel = agent.shutdown.clone(); loop { tokio::select! { msg = sub.next() => { match msg { Some(msg) => { let agent = agent.clone(); tokio::spawn(async move { handle(agent, msg).await }); } None => { tracing::warn!("host command subscription ended"); break; } } } _ = cancel.cancelled() => { tracing::info!("host command handler stopping"); break; } } } Ok(()) } async fn handle(agent: Arc, msg: async_nats::Message) { let Some(reply) = msg.reply.clone() else { tracing::warn!("host command without reply subject ignored"); return; }; let cmd = match serde_json::from_slice::(&msg.payload) { Ok(cmd) => cmd, Err(e) => { publish(&agent, &reply, json!({ "status": "error", "message": format!("invalid command payload: {e}") })).await; return; } }; // Self-update is special: it must reply BEFORE relaunching, because the // relaunch replaces this process and nothing after it would run. if cmd.func == "update" { let Some(url) = cmd.url else { publish(&agent, &reply, json!({ "status": "error", "message": "update requires a 'url'" })).await; return; }; match update::download_verify_swap(&url).await { Ok(_) => { publish(&agent, &reply, json!({ "status": "success", "func": "update", "message": "verified and swapped; relaunching" })).await; let _ = agent.nats.flush().await; update::relaunch_and_exit(); } Err(e) => { publish(&agent, &reply, json!({ "status": "error", "func": "update", "message": format!("{e:#}") })).await; } } return; } let response = dispatch(&agent, &cmd.func).await; publish(&agent, &reply, response).await; } async fn publish(agent: &Arc, reply: &async_nats::Subject, value: serde_json::Value) { match serde_json::to_vec(&value) { Ok(bytes) => { if let Err(e) = agent.nats.publish(reply.clone(), bytes.into()).await { tracing::warn!("response publish failed: {e}"); } } Err(e) => tracing::error!("response serialize failed: {e}"), } } async fn dispatch(agent: &Arc, func: &str) -> serde_json::Value { match func { "ping" => json!({ "status": "success", "func": "ping", "version": version::VERSION, "commit": version::GIT_HASH, "uptime_seconds": agent.started.elapsed().as_secs(), }), "probe" => { let report = prober::run_probe(&agent.cfg.probe_targets).await; *agent.last_probe.write().await = Some(report.clone()); match serde_json::to_value(&report) { Ok(report_json) => json!({ "status": "success", "func": "probe", "report": report_json, }), Err(e) => json!({ "status": "error", "message": format!("probe serialize: {e}") }), } } "sysinfo" => { let mut sys = System::new(); sys.refresh_cpu_usage(); tokio::time::sleep(std::time::Duration::from_millis(250)).await; let payload = telemetry::collect(agent, &mut sys).await; match serde_json::to_value(&payload) { Ok(snapshot) => json!({ "status": "success", "func": "sysinfo", "snapshot": snapshot, }), Err(e) => json!({ "status": "error", "message": format!("sysinfo serialize: {e}") }), } } other => json!({ "status": "error", "message": format!("unknown func '{other}' (supported: ping, probe, sysinfo)"), }), } }