From 47fa72763ceab5e62ef2cae64b9fc884e3123add Mon Sep 17 00:00:00 2001 From: Vantz Stockwell Date: Thu, 11 Jun 2026 10:35:58 -0400 Subject: [PATCH] =?UTF-8?q?feat(api):=20host-agent=20protocol=20v2=20consu?= =?UTF-8?q?mer=20=E2=80=94=20heartbeat=20persistence,=20auto-register,=20s?= =?UTF-8?q?taleness=20sweep?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nothing persisted agent heartbeats before: companion_last_seen was written once at setup and connection_status stayed 'connected' forever. HostAgentConsumerService now consumes corrosion.*.host.heartbeat (updates last_seen + status, auto-creates the bare_metal connection row on first contact), host.going_offline (graceful offline), and sweeps connections offline after 180s of heartbeat silence. License-existence tenant validation with caching per NATS-consumer doctrine. WS bridge forwards host_heartbeat/host_going_offline to the panel. Contract-verified against production NATS with the backend's own nats lib: v2 subjects, schema 2, real telemetry, offline beacon. Co-Authored-By: Claude Fable 5 --- backend-nest/src/app.module.ts | 7 + .../services/host-agent-consumer.service.ts | 152 ++++++++++++++++++ backend-nest/src/services/index.ts | 1 + .../src/services/nats-bridge.service.ts | 11 ++ 4 files changed, 171 insertions(+) create mode 100644 backend-nest/src/services/host-agent-consumer.service.ts diff --git a/backend-nest/src/app.module.ts b/backend-nest/src/app.module.ts index 018dc41..c5eea73 100644 --- a/backend-nest/src/app.module.ts +++ b/backend-nest/src/app.module.ts @@ -49,6 +49,9 @@ import { EarlyAccessModule } from './modules/early-access/early-access.module'; // Shared Services import { NatsService } from './services/nats.service'; import { NatsBridgeService } from './services/nats-bridge.service'; +import { HostAgentConsumerService } from './services/host-agent-consumer.service'; +import { ServerConnection } from './entities/server-connection.entity'; +import { License } from './entities/license.entity'; import { SteamService } from './services/steam.service'; // Gateway @@ -91,6 +94,9 @@ import { NatsBridgeGateway } from './gateways/nats-bridge.gateway'; // Scheduler ScheduleModule.forRoot(), + // Repositories for app-level shared services (host-agent consumer) + TypeOrmModule.forFeature([ServerConnection, License]), + // Feature Modules AuthModule, UsersModule, @@ -134,6 +140,7 @@ import { NatsBridgeGateway } from './gateways/nats-bridge.gateway'; // Shared services NatsService, NatsBridgeService, + HostAgentConsumerService, SteamService, // WebSocket gateway diff --git a/backend-nest/src/services/host-agent-consumer.service.ts b/backend-nest/src/services/host-agent-consumer.service.ts new file mode 100644 index 0000000..f6cf56e --- /dev/null +++ b/backend-nest/src/services/host-agent-consumer.service.ts @@ -0,0 +1,152 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Interval } from '@nestjs/schedule'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { NatsService } from './nats.service'; +import { ServerConnection } from '../entities/server-connection.entity'; +import { License } from '../entities/license.entity'; + +/** + * Consumes Corrosion wire protocol v2 host-agent subjects + * (corrosion-host-agent/PROTOCOL.md) and keeps server_connections truthful. + * + * Before this service existed, NOTHING persisted agent heartbeats: + * companion_last_seen was written once at setup and connection_status stayed + * 'connected' forever. Now: heartbeat -> last_seen + connected (row + * auto-created on first contact), going_offline beacon -> offline, and a + * staleness sweep marks hosts offline when heartbeats stop arriving. + */ +@Injectable() +export class HostAgentConsumerService implements OnModuleInit { + private readonly logger = new Logger(HostAgentConsumerService.name); + + /** licenseId -> cache expiry epoch-ms. Positive = exists, absent = unknown. */ + private knownLicenses = new Map(); + /** Unknown/garbage license ids we already warned about (anti log-spam). */ + private warnedUnknown = new Set(); + + private static readonly UUID_RE = + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + private static readonly LICENSE_CACHE_TTL_MS = 5 * 60_000; + /** 3x the agent's default 60s heartbeat (which jitters to max 72s). */ + private static readonly OFFLINE_AFTER_MS = 180_000; + + constructor( + private readonly nats: NatsService, + @InjectRepository(ServerConnection) + private readonly connectionRepository: Repository, + @InjectRepository(License) + private readonly licenseRepository: Repository, + ) {} + + onModuleInit() { + this.nats.subscribe('corrosion.*.host.heartbeat', (data, subject) => { + const licenseId = subject.split('.')[1]; + void this.onHeartbeat(licenseId).catch((err) => + this.logger.error(`heartbeat handling failed for ${licenseId}: ${err.message}`, err.stack), + ); + void data; // payload telemetry is bridged to the browser; persistence here is liveness only + }); + + this.nats.subscribe('corrosion.*.host.going_offline', (_data, subject) => { + const licenseId = subject.split('.')[1]; + void this.onGoingOffline(licenseId).catch((err) => + this.logger.error(`going_offline handling failed for ${licenseId}: ${err.message}`, err.stack), + ); + }); + + this.logger.log('Host agent (protocol v2) consumer subscriptions initialized'); + } + + private async onHeartbeat(licenseId: string): Promise { + if (!(await this.isValidTenant(licenseId))) return; + + const now = new Date(); + const existing = await this.connectionRepository.findOne({ + where: { license_id: licenseId }, + }); + + if (existing) { + await this.connectionRepository.update( + { id: existing.id }, + { companion_last_seen: now, connection_status: 'connected', updated_at: now }, + ); + if (existing.connection_status !== 'connected') { + this.logger.log(`host agent for license ${licenseId} is back online`); + } + } else { + // First contact from a host agent: auto-register the connection so the + // panel lights up without a manual setup step. + await this.connectionRepository.save( + this.connectionRepository.create({ + license_id: licenseId, + connection_type: 'bare_metal', + connection_status: 'connected', + companion_last_seen: now, + }), + ); + this.logger.log(`host agent registered for license ${licenseId} (first heartbeat)`); + } + } + + private async onGoingOffline(licenseId: string): Promise { + if (!(await this.isValidTenant(licenseId))) return; + + await this.connectionRepository.update( + { license_id: licenseId }, + { connection_status: 'offline', updated_at: new Date() }, + ); + this.logger.log(`host agent for license ${licenseId} went offline (graceful beacon)`); + } + + /** + * Heartbeats stopping must flip the panel to offline — an agent that + * crashes or loses network never sends the goodbye beacon. + */ + @Interval(60_000) + async sweepStaleConnections(): Promise { + const threshold = new Date(Date.now() - HostAgentConsumerService.OFFLINE_AFTER_MS); + const result = await this.connectionRepository + .createQueryBuilder() + .update(ServerConnection) + .set({ connection_status: 'offline', updated_at: () => 'NOW()' }) + .where('connection_status = :connected', { connected: 'connected' }) + .andWhere('companion_last_seen IS NOT NULL') + .andWhere('companion_last_seen < :threshold', { threshold }) + .execute(); + + if (result.affected) { + this.logger.warn(`marked ${result.affected} stale host connection(s) offline`); + } + } + + /** + * Tenant validation: the subject segment must be a real license UUID. + * NATS consumers must never write rows for subjects an arbitrary publisher + * invented. Existence is cached to avoid a query per heartbeat. + */ + private async isValidTenant(licenseId: string): Promise { + if (!HostAgentConsumerService.UUID_RE.test(licenseId)) { + this.warnUnknownOnce(licenseId, 'not a UUID'); + return false; + } + + const cachedUntil = this.knownLicenses.get(licenseId); + if (cachedUntil && cachedUntil > Date.now()) return true; + + const exists = await this.licenseRepository.exist({ where: { id: licenseId } }); + if (!exists) { + this.warnUnknownOnce(licenseId, 'no such license'); + return false; + } + + this.knownLicenses.set(licenseId, Date.now() + HostAgentConsumerService.LICENSE_CACHE_TTL_MS); + return true; + } + + private warnUnknownOnce(licenseId: string, reason: string): void { + if (this.warnedUnknown.has(licenseId)) return; + this.warnedUnknown.add(licenseId); + this.logger.warn(`ignoring host-agent traffic for invalid license '${licenseId}' (${reason})`); + } +} diff --git a/backend-nest/src/services/index.ts b/backend-nest/src/services/index.ts index 5339a47..e0dda8e 100644 --- a/backend-nest/src/services/index.ts +++ b/backend-nest/src/services/index.ts @@ -1,3 +1,4 @@ export { NatsService } from './nats.service'; export { NatsBridgeService } from './nats-bridge.service'; +export { HostAgentConsumerService } from './host-agent-consumer.service'; export { SteamService } from './steam.service'; diff --git a/backend-nest/src/services/nats-bridge.service.ts b/backend-nest/src/services/nats-bridge.service.ts index 801c471..c36a673 100644 --- a/backend-nest/src/services/nats-bridge.service.ts +++ b/backend-nest/src/services/nats-bridge.service.ts @@ -44,6 +44,17 @@ export class NatsBridgeService implements OnModuleInit { this.emit(licenseId, 'oxide_status', data); }); + // Wire protocol v2 (corrosion-host-agent) — host-level telemetry + this.nats.subscribe('corrosion.*.host.heartbeat', (data, subject) => { + const licenseId = subject.split('.')[1]; + this.emit(licenseId, 'host_heartbeat', data); + }); + + this.nats.subscribe('corrosion.*.host.going_offline', (data, subject) => { + const licenseId = subject.split('.')[1]; + this.emit(licenseId, 'host_going_offline', data); + }); + this.logger.log('NATS bridge subscriptions initialized'); }