Migration 022 adds agent_hosts / game_instances / instance_clusters / instance_stats (named agent_hosts to avoid the existing B2B hosts table). HostAgentConsumerService now parses the full v2 heartbeat and upserts an agent_hosts row (host metrics: cpu/mem/disk/agent version, keyed by license_id+hostname until enrollment) plus one game_instances row per heartbeat instance entry (state + uptime, the billing unit). Legacy server_connections write retained so the current panel keeps working — additive migration, nothing breaks. Staleness sweep + offline beacon now flip agent_hosts too. cluster_id FK reserved for Soulmask/ Dune. Migration applied to live DB; tsc green. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
262 lines
9.7 KiB
TypeScript
262 lines
9.7 KiB
TypeScript
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
|
|
import { Interval } from '@nestjs/schedule';
|
|
import { InjectRepository } from '@nestjs/typeorm';
|
|
import { Repository } from 'typeorm';
|
|
import { NatsService } from './nats.service';
|
|
import { ServerConnection } from '../entities/server-connection.entity';
|
|
import { License } from '../entities/license.entity';
|
|
import { AgentHost, AgentHostDisk } from '../entities/agent-host.entity';
|
|
import { GameInstance } from '../entities/game-instance.entity';
|
|
|
|
/**
|
|
* Consumes Corrosion wire protocol v2 host-agent subjects
|
|
* (corrosion-host-agent/PROTOCOL.md) and keeps the fleet model truthful.
|
|
*
|
|
* Writes the License → Host → Instance model (hosts + game_instances) from
|
|
* each heartbeat, AND maintains the legacy single-server `server_connections`
|
|
* row so the current panel keeps working during the fleet UI transition.
|
|
*
|
|
* Host identity: until enrollment issues a stable host id, a host is keyed by
|
|
* (license_id, hostname). One agent = one host today; the schema is already
|
|
* multi-host-ready.
|
|
*/
|
|
interface HeartbeatPayload {
|
|
schema?: number;
|
|
timestamp?: string;
|
|
agent?: { version?: string; commit?: string; os?: string; arch?: string };
|
|
host?: {
|
|
hostname?: string | null;
|
|
cpu_percent?: number;
|
|
cpu_cores?: number;
|
|
mem_total_mb?: number;
|
|
mem_used_mb?: number;
|
|
uptime_seconds?: number;
|
|
disks?: AgentHostDisk[];
|
|
};
|
|
instances?: Array<{
|
|
id: string;
|
|
game: string;
|
|
label?: string | null;
|
|
state?: string;
|
|
uptime_seconds?: number;
|
|
}>;
|
|
}
|
|
|
|
@Injectable()
|
|
export class HostAgentConsumerService implements OnApplicationBootstrap {
|
|
private readonly logger = new Logger(HostAgentConsumerService.name);
|
|
|
|
private knownLicenses = new Map<string, number>();
|
|
private warnedUnknown = new Set<string>();
|
|
|
|
private static readonly UUID_RE =
|
|
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
|
|
private static readonly LICENSE_CACHE_TTL_MS = 5 * 60_000;
|
|
private static readonly OFFLINE_AFTER_MS = 180_000;
|
|
|
|
constructor(
|
|
private readonly nats: NatsService,
|
|
@InjectRepository(ServerConnection)
|
|
private readonly connectionRepository: Repository<ServerConnection>,
|
|
@InjectRepository(License)
|
|
private readonly licenseRepository: Repository<License>,
|
|
@InjectRepository(AgentHost)
|
|
private readonly hostRepository: Repository<AgentHost>,
|
|
@InjectRepository(GameInstance)
|
|
private readonly instanceRepository: Repository<GameInstance>,
|
|
) {}
|
|
|
|
// Bootstrap, not module-init: subscriptions registered before NatsService
|
|
// finished connecting silently no-op (see NatsBridgeService note).
|
|
onApplicationBootstrap() {
|
|
this.nats.subscribe('corrosion.*.host.heartbeat', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
void this.onHeartbeat(licenseId, data as HeartbeatPayload).catch((err) =>
|
|
this.logger.error(`heartbeat handling failed for ${licenseId}: ${err.message}`, err.stack),
|
|
);
|
|
});
|
|
|
|
this.nats.subscribe('corrosion.*.host.going_offline', (_data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
void this.onGoingOffline(licenseId).catch((err) =>
|
|
this.logger.error(`going_offline handling failed for ${licenseId}: ${err.message}`, err.stack),
|
|
);
|
|
});
|
|
|
|
this.logger.log('Host agent (protocol v2) consumer subscriptions initialized');
|
|
}
|
|
|
|
private async onHeartbeat(licenseId: string, payload: HeartbeatPayload): Promise<void> {
|
|
if (!(await this.isValidTenant(licenseId))) return;
|
|
const now = new Date();
|
|
|
|
await this.updateLegacyConnection(licenseId, now);
|
|
const host = await this.upsertHost(licenseId, payload, now);
|
|
await this.upsertInstances(licenseId, host, payload, now);
|
|
}
|
|
|
|
/** Legacy single-server row — keeps the current panel working. */
|
|
private async updateLegacyConnection(licenseId: string, now: Date): Promise<void> {
|
|
const existing = await this.connectionRepository.findOne({ where: { license_id: licenseId } });
|
|
if (existing) {
|
|
await this.connectionRepository.update(
|
|
{ id: existing.id },
|
|
{ companion_last_seen: now, connection_status: 'connected', updated_at: now },
|
|
);
|
|
} else {
|
|
await this.connectionRepository.save(
|
|
this.connectionRepository.create({
|
|
license_id: licenseId,
|
|
connection_type: 'bare_metal',
|
|
connection_status: 'connected',
|
|
companion_last_seen: now,
|
|
}),
|
|
);
|
|
}
|
|
}
|
|
|
|
/** Upsert the fleet host row, keyed by (license_id, hostname). */
|
|
private async upsertHost(licenseId: string, payload: HeartbeatPayload, now: Date): Promise<AgentHost> {
|
|
const hostname = payload.host?.hostname ?? '';
|
|
const fields = {
|
|
agent_version: payload.agent?.version ?? null,
|
|
agent_commit: payload.agent?.commit ?? null,
|
|
os: payload.agent?.os ?? null,
|
|
arch: payload.agent?.arch ?? null,
|
|
status: 'connected',
|
|
last_heartbeat_at: now,
|
|
cpu_percent: payload.host?.cpu_percent ?? null,
|
|
cpu_cores: payload.host?.cpu_cores ?? null,
|
|
mem_total_mb: payload.host?.mem_total_mb ?? null,
|
|
mem_used_mb: payload.host?.mem_used_mb ?? null,
|
|
uptime_seconds: payload.host?.uptime_seconds ?? null,
|
|
disks: payload.host?.disks ?? null,
|
|
updated_at: now,
|
|
};
|
|
|
|
const existing = await this.hostRepository.findOne({
|
|
where: { license_id: licenseId, hostname },
|
|
});
|
|
if (existing) {
|
|
await this.hostRepository.update({ id: existing.id }, fields);
|
|
return { ...existing, ...fields } as AgentHost;
|
|
}
|
|
const created = await this.hostRepository.save(
|
|
this.hostRepository.create({ license_id: licenseId, hostname, ...fields }),
|
|
);
|
|
this.logger.log(`host registered for license ${licenseId} (hostname '${hostname || 'unknown'}')`);
|
|
return created;
|
|
}
|
|
|
|
/** Upsert one game_instances row per heartbeat instance entry. */
|
|
private async upsertInstances(
|
|
licenseId: string,
|
|
host: AgentHost,
|
|
payload: HeartbeatPayload,
|
|
now: Date,
|
|
): Promise<void> {
|
|
for (const inst of payload.instances ?? []) {
|
|
if (!inst?.id || !inst?.game) continue;
|
|
const fields = {
|
|
host_id: host.id,
|
|
game: inst.game,
|
|
label: inst.label ?? null,
|
|
state: inst.state ?? 'unknown',
|
|
uptime_seconds: inst.uptime_seconds ?? 0,
|
|
last_seen_at: now,
|
|
updated_at: now,
|
|
};
|
|
const existing = await this.instanceRepository.findOne({
|
|
where: { license_id: licenseId, agent_instance_id: inst.id },
|
|
});
|
|
if (existing) {
|
|
await this.instanceRepository.update({ id: existing.id }, fields);
|
|
} else {
|
|
await this.instanceRepository.save(
|
|
this.instanceRepository.create({
|
|
license_id: licenseId,
|
|
agent_instance_id: inst.id,
|
|
...fields,
|
|
}),
|
|
);
|
|
this.logger.log(`instance '${inst.id}' (${inst.game}) registered for license ${licenseId}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async onGoingOffline(licenseId: string): Promise<void> {
|
|
if (!(await this.isValidTenant(licenseId))) return;
|
|
const now = new Date();
|
|
await this.connectionRepository.update(
|
|
{ license_id: licenseId },
|
|
{ connection_status: 'offline', updated_at: now },
|
|
);
|
|
await this.hostRepository.update(
|
|
{ license_id: licenseId },
|
|
{ status: 'offline', updated_at: now },
|
|
);
|
|
this.logger.log(`host(s) for license ${licenseId} went offline (graceful beacon)`);
|
|
}
|
|
|
|
/**
|
|
* Heartbeats stopping must flip the panel to offline — an agent that
|
|
* crashes or loses network never sends the goodbye beacon. Sweeps both the
|
|
* legacy connection and fleet hosts.
|
|
*/
|
|
@Interval(60_000)
|
|
async sweepStaleConnections(): Promise<void> {
|
|
const threshold = new Date(Date.now() - HostAgentConsumerService.OFFLINE_AFTER_MS);
|
|
|
|
const conn = await this.connectionRepository
|
|
.createQueryBuilder()
|
|
.update(ServerConnection)
|
|
.set({ connection_status: 'offline', updated_at: () => 'NOW()' })
|
|
.where('connection_status = :connected', { connected: 'connected' })
|
|
.andWhere('companion_last_seen IS NOT NULL')
|
|
.andWhere('companion_last_seen < :threshold', { threshold })
|
|
.execute();
|
|
|
|
const hosts = await this.hostRepository
|
|
.createQueryBuilder()
|
|
.update(AgentHost)
|
|
.set({ status: 'offline', updated_at: () => 'NOW()' })
|
|
.where('status = :connected', { connected: 'connected' })
|
|
.andWhere('last_heartbeat_at IS NOT NULL')
|
|
.andWhere('last_heartbeat_at < :threshold', { threshold })
|
|
.execute();
|
|
|
|
const affected = (conn.affected ?? 0) + (hosts.affected ?? 0);
|
|
if (affected) {
|
|
this.logger.warn(`marked ${affected} stale connection/host record(s) offline`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Tenant validation: the subject segment must be a real license UUID.
|
|
* NATS consumers must never write rows for subjects an arbitrary publisher
|
|
* invented. Existence is cached to avoid a query per heartbeat.
|
|
*/
|
|
private async isValidTenant(licenseId: string): Promise<boolean> {
|
|
if (!HostAgentConsumerService.UUID_RE.test(licenseId)) {
|
|
this.warnUnknownOnce(licenseId, 'not a UUID');
|
|
return false;
|
|
}
|
|
const cachedUntil = this.knownLicenses.get(licenseId);
|
|
if (cachedUntil && cachedUntil > Date.now()) return true;
|
|
|
|
const exists = await this.licenseRepository.exist({ where: { id: licenseId } });
|
|
if (!exists) {
|
|
this.warnUnknownOnce(licenseId, 'no such license');
|
|
return false;
|
|
}
|
|
this.knownLicenses.set(licenseId, Date.now() + HostAgentConsumerService.LICENSE_CACHE_TTL_MS);
|
|
return true;
|
|
}
|
|
|
|
private warnUnknownOnce(licenseId: string, reason: string): void {
|
|
if (this.warnedUnknown.has(licenseId)) return;
|
|
this.warnedUnknown.add(licenseId);
|
|
this.logger.warn(`ignoring host-agent traffic for invalid license '${licenseId}' (${reason})`);
|
|
}
|
|
}
|