import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; import { Interval } from '@nestjs/schedule'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { NatsService } from './nats.service'; import { ServerConnection } from '../entities/server-connection.entity'; import { License } from '../entities/license.entity'; /** * Consumes Corrosion wire protocol v2 host-agent subjects * (corrosion-host-agent/PROTOCOL.md) and keeps server_connections truthful. * * Before this service existed, NOTHING persisted agent heartbeats: * companion_last_seen was written once at setup and connection_status stayed * 'connected' forever. Now: heartbeat -> last_seen + connected (row * auto-created on first contact), going_offline beacon -> offline, and a * staleness sweep marks hosts offline when heartbeats stop arriving. */ @Injectable() export class HostAgentConsumerService implements OnApplicationBootstrap { private readonly logger = new Logger(HostAgentConsumerService.name); /** licenseId -> cache expiry epoch-ms. Positive = exists, absent = unknown. */ private knownLicenses = new Map(); /** Unknown/garbage license ids we already warned about (anti log-spam). */ private warnedUnknown = new Set(); private static readonly UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; private static readonly LICENSE_CACHE_TTL_MS = 5 * 60_000; /** 3x the agent's default 60s heartbeat (which jitters to max 72s). */ private static readonly OFFLINE_AFTER_MS = 180_000; constructor( private readonly nats: NatsService, @InjectRepository(ServerConnection) private readonly connectionRepository: Repository, @InjectRepository(License) private readonly licenseRepository: Repository, ) {} // Bootstrap, not module-init: subscriptions registered before NatsService // finished connecting silently no-op (see NatsBridgeService note). onApplicationBootstrap() { this.nats.subscribe('corrosion.*.host.heartbeat', (data, subject) => { const licenseId = subject.split('.')[1]; void this.onHeartbeat(licenseId).catch((err) => this.logger.error(`heartbeat handling failed for ${licenseId}: ${err.message}`, err.stack), ); void data; // payload telemetry is bridged to the browser; persistence here is liveness only }); this.nats.subscribe('corrosion.*.host.going_offline', (_data, subject) => { const licenseId = subject.split('.')[1]; void this.onGoingOffline(licenseId).catch((err) => this.logger.error(`going_offline handling failed for ${licenseId}: ${err.message}`, err.stack), ); }); this.logger.log('Host agent (protocol v2) consumer subscriptions initialized'); } private async onHeartbeat(licenseId: string): Promise { if (!(await this.isValidTenant(licenseId))) return; const now = new Date(); const existing = await this.connectionRepository.findOne({ where: { license_id: licenseId }, }); if (existing) { await this.connectionRepository.update( { id: existing.id }, { companion_last_seen: now, connection_status: 'connected', updated_at: now }, ); if (existing.connection_status !== 'connected') { this.logger.log(`host agent for license ${licenseId} is back online`); } } else { // First contact from a host agent: auto-register the connection so the // panel lights up without a manual setup step. await this.connectionRepository.save( this.connectionRepository.create({ license_id: licenseId, connection_type: 'bare_metal', connection_status: 'connected', companion_last_seen: now, }), ); this.logger.log(`host agent registered for license ${licenseId} (first heartbeat)`); } } private async onGoingOffline(licenseId: string): Promise { if (!(await this.isValidTenant(licenseId))) return; await this.connectionRepository.update( { license_id: licenseId }, { connection_status: 'offline', updated_at: new Date() }, ); this.logger.log(`host agent for license ${licenseId} went offline (graceful beacon)`); } /** * Heartbeats stopping must flip the panel to offline — an agent that * crashes or loses network never sends the goodbye beacon. */ @Interval(60_000) async sweepStaleConnections(): Promise { const threshold = new Date(Date.now() - HostAgentConsumerService.OFFLINE_AFTER_MS); const result = await this.connectionRepository .createQueryBuilder() .update(ServerConnection) .set({ connection_status: 'offline', updated_at: () => 'NOW()' }) .where('connection_status = :connected', { connected: 'connected' }) .andWhere('companion_last_seen IS NOT NULL') .andWhere('companion_last_seen < :threshold', { threshold }) .execute(); if (result.affected) { this.logger.warn(`marked ${result.affected} stale host connection(s) offline`); } } /** * Tenant validation: the subject segment must be a real license UUID. * NATS consumers must never write rows for subjects an arbitrary publisher * invented. Existence is cached to avoid a query per heartbeat. */ private async isValidTenant(licenseId: string): Promise { if (!HostAgentConsumerService.UUID_RE.test(licenseId)) { this.warnUnknownOnce(licenseId, 'not a UUID'); return false; } const cachedUntil = this.knownLicenses.get(licenseId); if (cachedUntil && cachedUntil > Date.now()) return true; const exists = await this.licenseRepository.exist({ where: { id: licenseId } }); if (!exists) { this.warnUnknownOnce(licenseId, 'no such license'); return false; } this.knownLicenses.set(licenseId, Date.now() + HostAgentConsumerService.LICENSE_CACHE_TTL_MS); return true; } private warnUnknownOnce(licenseId: string, reason: string): void { if (this.warnedUnknown.has(licenseId)) return; this.warnedUnknown.add(licenseId); this.logger.warn(`ignoring host-agent traffic for invalid license '${licenseId}' (${reason})`); } }