import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; import { Interval } from '@nestjs/schedule'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { NatsService } from './nats.service'; import { ServerConnection } from '../entities/server-connection.entity'; import { License } from '../entities/license.entity'; import { AgentHost, AgentHostDisk } from '../entities/agent-host.entity'; import { GameInstance } from '../entities/game-instance.entity'; /** * Consumes Corrosion wire protocol v2 host-agent subjects * (corrosion-host-agent/PROTOCOL.md) and keeps the fleet model truthful. * * Writes the License → Host → Instance model (hosts + game_instances) from * each heartbeat, AND maintains the legacy single-server `server_connections` * row so the current panel keeps working during the fleet UI transition. * * Host identity: until enrollment issues a stable host id, a host is keyed by * (license_id, hostname). One agent = one host today; the schema is already * multi-host-ready. */ interface HeartbeatPayload { schema?: number; timestamp?: string; agent?: { version?: string; commit?: string; os?: string; arch?: string }; host?: { hostname?: string | null; cpu_percent?: number; cpu_cores?: number; mem_total_mb?: number; mem_used_mb?: number; uptime_seconds?: number; disks?: AgentHostDisk[]; }; instances?: Array<{ id: string; game: string; label?: string | null; state?: string; uptime_seconds?: number; }>; } @Injectable() export class HostAgentConsumerService implements OnApplicationBootstrap { private readonly logger = new Logger(HostAgentConsumerService.name); private knownLicenses = new Map(); private warnedUnknown = new Set(); private static readonly UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; private static readonly LICENSE_CACHE_TTL_MS = 5 * 60_000; private static readonly OFFLINE_AFTER_MS = 180_000; constructor( private readonly nats: NatsService, @InjectRepository(ServerConnection) private readonly connectionRepository: Repository, @InjectRepository(License) private readonly licenseRepository: Repository, @InjectRepository(AgentHost) private readonly hostRepository: Repository, @InjectRepository(GameInstance) private readonly instanceRepository: Repository, ) {} // Bootstrap, not module-init: subscriptions registered before NatsService // finished connecting silently no-op (see NatsBridgeService note). onApplicationBootstrap() { this.nats.subscribe('corrosion.*.host.heartbeat', (data, subject) => { const licenseId = subject.split('.')[1]; void this.onHeartbeat(licenseId, data as HeartbeatPayload).catch((err) => this.logger.error(`heartbeat handling failed for ${licenseId}: ${err.message}`, err.stack), ); }); this.nats.subscribe('corrosion.*.host.going_offline', (_data, subject) => { const licenseId = subject.split('.')[1]; void this.onGoingOffline(licenseId).catch((err) => this.logger.error(`going_offline handling failed for ${licenseId}: ${err.message}`, err.stack), ); }); this.logger.log('Host agent (protocol v2) consumer subscriptions initialized'); } private async onHeartbeat(licenseId: string, payload: HeartbeatPayload): Promise { if (!(await this.isValidTenant(licenseId))) return; // A well-formed v2 heartbeat always carries a host block. Reject malformed // payloads so a stray/empty publish can't create a phantom host row. if (!payload || typeof payload.host !== 'object' || payload.host === null) { this.logger.warn(`ignoring malformed heartbeat for license ${licenseId} (no host block)`); return; } const now = new Date(); await this.updateLegacyConnection(licenseId, now); const host = await this.upsertHost(licenseId, payload, now); await this.upsertInstances(licenseId, host, payload, now); } /** Legacy single-server row — keeps the current panel working. */ private async updateLegacyConnection(licenseId: string, now: Date): Promise { const existing = await this.connectionRepository.findOne({ where: { license_id: licenseId } }); if (existing) { await this.connectionRepository.update( { id: existing.id }, { companion_last_seen: now, connection_status: 'connected', updated_at: now }, ); } else { await this.connectionRepository.save( this.connectionRepository.create({ license_id: licenseId, connection_type: 'bare_metal', connection_status: 'connected', companion_last_seen: now, }), ); } } /** Upsert the fleet host row, keyed by (license_id, hostname). */ private async upsertHost(licenseId: string, payload: HeartbeatPayload, now: Date): Promise { const hostname = payload.host?.hostname ?? ''; const fields = { agent_version: payload.agent?.version ?? null, agent_commit: payload.agent?.commit ?? null, os: payload.agent?.os ?? null, arch: payload.agent?.arch ?? null, status: 'connected', last_heartbeat_at: now, cpu_percent: payload.host?.cpu_percent ?? null, cpu_cores: payload.host?.cpu_cores ?? null, mem_total_mb: payload.host?.mem_total_mb ?? null, mem_used_mb: payload.host?.mem_used_mb ?? null, uptime_seconds: payload.host?.uptime_seconds ?? null, disks: payload.host?.disks ?? null, updated_at: now, }; const existing = await this.hostRepository.findOne({ where: { license_id: licenseId, hostname }, }); if (existing) { await this.hostRepository.update({ id: existing.id }, fields); return { ...existing, ...fields } as AgentHost; } const created = await this.hostRepository.save( this.hostRepository.create({ license_id: licenseId, hostname, ...fields }), ); this.logger.log(`host registered for license ${licenseId} (hostname '${hostname || 'unknown'}')`); return created; } /** Upsert one game_instances row per heartbeat instance entry. */ private async upsertInstances( licenseId: string, host: AgentHost, payload: HeartbeatPayload, now: Date, ): Promise { for (const inst of payload.instances ?? []) { if (!inst?.id || !inst?.game) continue; const fields = { host_id: host.id, game: inst.game, label: inst.label ?? null, state: inst.state ?? 'unknown', uptime_seconds: inst.uptime_seconds ?? 0, last_seen_at: now, updated_at: now, }; const existing = await this.instanceRepository.findOne({ where: { license_id: licenseId, agent_instance_id: inst.id }, }); if (existing) { await this.instanceRepository.update({ id: existing.id }, fields); } else { await this.instanceRepository.save( this.instanceRepository.create({ license_id: licenseId, agent_instance_id: inst.id, ...fields, }), ); this.logger.log(`instance '${inst.id}' (${inst.game}) registered for license ${licenseId}`); } } } private async onGoingOffline(licenseId: string): Promise { if (!(await this.isValidTenant(licenseId))) return; const now = new Date(); await this.connectionRepository.update( { license_id: licenseId }, { connection_status: 'offline', updated_at: now }, ); await this.hostRepository.update( { license_id: licenseId }, { status: 'offline', updated_at: now }, ); this.logger.log(`host(s) for license ${licenseId} went offline (graceful beacon)`); } /** * Heartbeats stopping must flip the panel to offline — an agent that * crashes or loses network never sends the goodbye beacon. Sweeps both the * legacy connection and fleet hosts. */ @Interval(60_000) async sweepStaleConnections(): Promise { const threshold = new Date(Date.now() - HostAgentConsumerService.OFFLINE_AFTER_MS); const conn = await this.connectionRepository .createQueryBuilder() .update(ServerConnection) .set({ connection_status: 'offline', updated_at: () => 'NOW()' }) .where('connection_status = :connected', { connected: 'connected' }) .andWhere('companion_last_seen IS NOT NULL') .andWhere('companion_last_seen < :threshold', { threshold }) .execute(); const hosts = await this.hostRepository .createQueryBuilder() .update(AgentHost) .set({ status: 'offline', updated_at: () => 'NOW()' }) .where('status = :connected', { connected: 'connected' }) .andWhere('last_heartbeat_at IS NOT NULL') .andWhere('last_heartbeat_at < :threshold', { threshold }) .execute(); const affected = (conn.affected ?? 0) + (hosts.affected ?? 0); if (affected) { this.logger.warn(`marked ${affected} stale connection/host record(s) offline`); } } /** * Tenant validation: the subject segment must be a real license UUID. * NATS consumers must never write rows for subjects an arbitrary publisher * invented. Existence is cached to avoid a query per heartbeat. */ private async isValidTenant(licenseId: string): Promise { if (!HostAgentConsumerService.UUID_RE.test(licenseId)) { this.warnUnknownOnce(licenseId, 'not a UUID'); return false; } const cachedUntil = this.knownLicenses.get(licenseId); if (cachedUntil && cachedUntil > Date.now()) return true; const exists = await this.licenseRepository.exist({ where: { id: licenseId } }); if (!exists) { this.warnUnknownOnce(licenseId, 'no such license'); return false; } this.knownLicenses.set(licenseId, Date.now() + HostAgentConsumerService.LICENSE_CACHE_TTL_MS); return true; } private warnUnknownOnce(licenseId: string, reason: string): void { if (this.warnedUnknown.has(licenseId)) return; this.warnedUnknown.add(licenseId); this.logger.warn(`ignoring host-agent traffic for invalid license '${licenseId}' (${reason})`); } }