// Full-pipeline contract test: Rust host agent → NATS → NestJS consumer → Postgres. // // Proves the wire protocol v2 chain end to end against a REAL backend and DB: // 1. agent heartbeat arrives with schema 2 + measured telemetry // 2. backend auto-registers the server_connections row and marks it connected // 3. instance command channel round-trips (start/status/stop) with push events // 4. graceful agent shutdown publishes the offline beacon and the row flips offline // // Required env: // LICENSE_ID — existing license uuid (CI: from the admin seed) // DATABASE_URL — postgres connection string for assertions // NATS_URL — broker both agent and backend use (default nats://localhost:4222) // AGENT_BIN — path to the corrosion-host-agent binary // // Uses the backend's own node_modules (nats, pg) so the client libs under test // are exactly what production runs. import { createRequire } from 'node:module'; import { spawn } from 'node:child_process'; import { writeFileSync, mkdtempSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; const repoRoot = join(dirname(fileURLToPath(import.meta.url)), '..'); const require = createRequire(join(repoRoot, 'backend-nest', 'node_modules', 'x.js')); const { connect, StringCodec } = require('nats'); const { Client: PgClient } = require('pg'); const LICENSE = process.env.LICENSE_ID; const NATS_URL = process.env.NATS_URL ?? 'nats://localhost:4222'; const DATABASE_URL = process.env.DATABASE_URL; const AGENT_BIN = process.env.AGENT_BIN ?? join(repoRoot, 'corrosion-host-agent', 'target', 'debug', 'corrosion-host-agent'); if (!LICENSE || !DATABASE_URL) { console.error('LICENSE_ID and DATABASE_URL are required'); process.exit(2); } const sc = StringCodec(); const errs = []; const check = (cond, msg) => { if (!cond) errs.push(msg); }; const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); async function pollDb(pg, predicate, label, timeoutMs = 30_000) { const deadline = Date.now() + timeoutMs; for (;;) { const { rows } = await pg.query( 'SELECT connection_type, connection_status, companion_last_seen FROM server_connections WHERE license_id = $1', [LICENSE], ); if (predicate(rows)) return rows; if (Date.now() > deadline) { errs.push(`${label}: timeout after ${timeoutMs}ms — rows: ${JSON.stringify(rows)}`); return rows; } await sleep(1000); } } const main = async () => { const pg = new PgClient({ connectionString: DATABASE_URL }); await pg.connect(); const nc = await connect({ servers: NATS_URL }); const heartbeats = []; const statusEvents = []; (async () => { for await (const m of nc.subscribe(`corrosion.${LICENSE}.host.heartbeat`)) heartbeats.push(JSON.parse(sc.decode(m.data))); })(); (async () => { for await (const m of nc.subscribe(`corrosion.${LICENSE}.ci-instance.status`)) statusEvents.push(JSON.parse(sc.decode(m.data))); })(); // --- spawn the real agent --- const dir = mkdtempSync(join(tmpdir(), 'cha-contract-')); const cfgPath = join(dir, 'agent.toml'); writeFileSync(cfgPath, ` [agent] license_id = "${LICENSE}" nats_url = "${NATS_URL}" heartbeat_seconds = 10 log_level = "info" [[instance]] id = "ci-instance" game = "rust" root = "/tmp" label = "Contract CI" executable = "/bin/sleep" args = ["300"] `); const agent = spawn(AGENT_BIN, ['--config', cfgPath], { stdio: ['ignore', 'inherit', 'inherit'] }); const agentExited = new Promise((r) => agent.on('exit', r)); // --- 1. heartbeat shape + real telemetry --- const hbDeadline = Date.now() + 20_000; while (heartbeats.length === 0 && Date.now() < hbDeadline) await sleep(500); check(heartbeats.length > 0, 'no heartbeat within 20s'); if (heartbeats.length) { const hb = heartbeats[0]; check(hb.schema === 2, `schema != 2: ${hb.schema}`); check(typeof hb.host?.cpu_percent === 'number', 'missing host.cpu_percent'); check(hb.host?.mem_total_mb > 0, 'mem_total_mb not measured'); check(Array.isArray(hb.host?.disks) && hb.host.disks.length > 0, 'no disks reported'); check(hb.instances?.[0]?.id === 'ci-instance', 'instance missing from heartbeat'); check(!!hb.agent?.version && !!hb.agent?.commit, 'agent version/commit missing'); } // --- 2. backend auto-registers + connects --- const rows = await pollDb(pg, (r) => r.length === 1 && r[0].connection_status === 'connected', 'auto-register connected'); if (rows.length === 1) { check(rows[0].connection_type === 'bare_metal', `connection_type: ${rows[0].connection_type}`); check(rows[0].companion_last_seen !== null, 'companion_last_seen not set'); } // --- 3. instance command channel --- const cmd = async (payload) => JSON.parse(sc.decode((await nc.request(`corrosion.${LICENSE}.ci-instance.cmd`, sc.encode(JSON.stringify(payload)), { timeout: 8000 })).data)); const st0 = await cmd({ func: 'status' }); check(st0.state?.state === 'stopped', `initial state: ${JSON.stringify(st0.state)}`); const start = await cmd({ func: 'start' }); check(start.status === 'success', `start: ${JSON.stringify(start)}`); await sleep(1000); const st1 = await cmd({ func: 'status' }); check(st1.state?.state === 'running', `post-start state: ${JSON.stringify(st1.state)}`); check((await cmd({ func: 'start' })).status === 'error', 'double start must error'); check((await cmd({ func: 'bogus' })).status === 'error', 'unknown func must error'); const stop = await cmd({ func: 'stop' }); check(stop.status === 'success', `stop: ${JSON.stringify(stop)}`); await sleep(1000); const seq = statusEvents.map((e) => e.event?.state); check(seq.includes('running') && seq.includes('stopped'), `status events incomplete: ${seq.join(',')}`); // --- 4. graceful shutdown → offline beacon → DB flips offline --- agent.kill('SIGTERM'); await Promise.race([agentExited, sleep(8000)]); await pollDb(pg, (r) => r.length === 1 && r[0].connection_status === 'offline', 'beacon offline', 20_000); await nc.close(); await pg.end(); if (errs.length) { console.error('\nCONTRACT FAIL:'); errs.forEach((e) => console.error(' -', e)); process.exit(1); } console.log('\nCONTRACT PASS: heartbeat shape, auto-register, connected/offline lifecycle, instance command channel, push events'); process.exit(0); }; main().catch((e) => { console.error('contract test crashed:', e); process.exit(1); });