feat(api): fleet data model Phase A — License -> Host -> Instance
All checks were successful
CI / backend-types (push) Successful in 14s
CI / frontend-build (push) Successful in 16s
CI / agent-tests (push) Successful in 42s
CI / integration (push) Successful in 22s

Migration 022 adds agent_hosts / game_instances / instance_clusters /
instance_stats (named agent_hosts to avoid the existing B2B hosts
table). HostAgentConsumerService now parses the full v2 heartbeat and
upserts an agent_hosts row (host metrics: cpu/mem/disk/agent version,
keyed by license_id+hostname until enrollment) plus one game_instances
row per heartbeat instance entry (state + uptime, the billing unit).
Legacy server_connections write retained so the current panel keeps
working — additive migration, nothing breaks. Staleness sweep + offline
beacon now flip agent_hosts too. cluster_id FK reserved for Soulmask/
Dune. Migration applied to live DB; tsc green.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Vantz Stockwell
2026-06-11 12:00:52 -04:00
parent 700dc2254d
commit 930f655bf5
7 changed files with 452 additions and 32 deletions

View File

@@ -52,6 +52,8 @@ import { NatsBridgeService } from './services/nats-bridge.service';
import { HostAgentConsumerService } from './services/host-agent-consumer.service';
import { ServerConnection } from './entities/server-connection.entity';
import { License } from './entities/license.entity';
import { AgentHost } from './entities/agent-host.entity';
import { GameInstance } from './entities/game-instance.entity';
import { SteamService } from './services/steam.service';
// Gateway
@@ -95,7 +97,7 @@ import { NatsBridgeGateway } from './gateways/nats-bridge.gateway';
ScheduleModule.forRoot(),
// Repositories for app-level shared services (host-agent consumer)
TypeOrmModule.forFeature([ServerConnection, License]),
TypeOrmModule.forFeature([ServerConnection, License, AgentHost, GameInstance]),
// Feature Modules
AuthModule,

View File

@@ -0,0 +1,74 @@
import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, JoinColumn, Check, Unique } from 'typeorm';
import { License } from './license.entity';
export interface AgentHostDisk {
mount: string;
total_mb: number;
free_mb: number;
}
/**
* One Corrosion host agent / one machine. Owns the machine-level facts.
*
* NOTE: distinct from the B2B `hosts` table (hosting-partner companies). This
* is `agent_hosts` — the physical/virtual box a customer runs the agent on.
*/
@Entity('agent_hosts')
@Unique(['license_id', 'hostname'])
@Check(`"status" IN ('connected', 'degraded', 'offline')`)
export class AgentHost {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ type: 'uuid' })
license_id: string;
@Column({ type: 'varchar', length: 255, default: '' })
hostname: string;
@Column({ type: 'varchar', length: 64, nullable: true })
agent_version: string | null;
@Column({ type: 'varchar', length: 64, nullable: true })
agent_commit: string | null;
@Column({ type: 'varchar', length: 32, nullable: true })
os: string | null;
@Column({ type: 'varchar', length: 32, nullable: true })
arch: string | null;
@Column({ type: 'varchar', length: 20, default: 'offline' })
status: string;
@Column({ type: 'timestamptz', nullable: true })
last_heartbeat_at: Date | null;
@Column({ type: 'double precision', nullable: true })
cpu_percent: number | null;
@Column({ type: 'integer', nullable: true })
cpu_cores: number | null;
@Column({ type: 'bigint', nullable: true })
mem_total_mb: number | null;
@Column({ type: 'bigint', nullable: true })
mem_used_mb: number | null;
@Column({ type: 'bigint', nullable: true })
uptime_seconds: number | null;
@Column({ type: 'jsonb', nullable: true })
disks: AgentHostDisk[] | null;
@Column({ type: 'timestamptz', default: () => 'NOW()' })
created_at: Date;
@Column({ type: 'timestamptz', default: () => 'NOW()' })
updated_at: Date;
@ManyToOne(() => License, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'license_id' })
license: License;
}

View File

@@ -0,0 +1,59 @@
import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, JoinColumn, Unique } from 'typeorm';
import { License } from './license.entity';
import { AgentHost } from './agent-host.entity';
/**
* One game server process / orchestrated unit (a Rust server, a Conan world,
* a Dune battlegroup). The billing unit — plans count instances.
* `agent_instance_id` is the agent's slug and the NATS subject segment.
*/
@Entity('game_instances')
@Unique(['license_id', 'agent_instance_id'])
export class GameInstance {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ type: 'uuid' })
license_id: string;
@Column({ type: 'uuid', nullable: true })
host_id: string | null;
@Column({ type: 'uuid', nullable: true })
cluster_id: string | null;
@Column({ type: 'varchar', length: 64 })
agent_instance_id: string;
@Column({ type: 'varchar', length: 32 })
game: string;
@Column({ type: 'varchar', length: 255, nullable: true })
label: string | null;
@Column({ type: 'varchar', length: 32, default: 'unknown' })
state: string;
@Column({ type: 'text', nullable: true })
root_path: string | null;
@Column({ type: 'bigint', default: 0 })
uptime_seconds: number;
@Column({ type: 'timestamptz', nullable: true })
last_seen_at: Date | null;
@Column({ type: 'timestamptz', default: () => 'NOW()' })
created_at: Date;
@Column({ type: 'timestamptz', default: () => 'NOW()' })
updated_at: Date;
@ManyToOne(() => License, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'license_id' })
license: License;
@ManyToOne(() => AgentHost, { onDelete: 'SET NULL', nullable: true })
@JoinColumn({ name: 'host_id' })
host: AgentHost | null;
}

View File

@@ -0,0 +1,38 @@
import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, JoinColumn } from 'typeorm';
import { License } from './license.entity';
/**
* Optional grouping of instances for games with linked topologies:
* Soulmask main/child clusters, Dune BattleGroup → Sietches. Reserved now;
* cluster orchestration ships with those game adapters.
*/
@Entity('instance_clusters')
export class InstanceCluster {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ type: 'uuid' })
license_id: string;
@Column({ type: 'varchar', length: 32 })
game: string;
@Column({ type: 'varchar', length: 255 })
name: string;
@Column({ type: 'varchar', length: 32, nullable: true })
topology: string | null;
@Column({ type: 'jsonb', nullable: true })
config: Record<string, unknown> | null;
@Column({ type: 'timestamptz', default: () => 'NOW()' })
created_at: Date;
@Column({ type: 'timestamptz', default: () => 'NOW()' })
updated_at: Date;
@ManyToOne(() => License, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'license_id' })
license: License;
}

View File

@@ -0,0 +1,38 @@
import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, JoinColumn } from 'typeorm';
import { GameInstance } from './game-instance.entity';
/**
* Per-instance time-series game metrics (player count, FPS, …). Populated once
* game-level telemetry is collected via RCON/plugin — the host heartbeat
* carries host metrics, not game metrics, so this stays empty in Phase A.
*/
@Entity('instance_stats')
export class InstanceStats {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ type: 'uuid' })
instance_id: string;
@Column({ type: 'uuid' })
license_id: string;
@Column({ type: 'integer', default: 0 })
player_count: number;
@Column({ type: 'integer', default: 0 })
max_players: number;
@Column({ type: 'double precision', default: 0 })
fps: number;
@Column({ type: 'integer', default: 0 })
memory_usage_mb: number;
@Column({ type: 'timestamptz', default: () => 'NOW()' })
recorded_at: Date;
@ManyToOne(() => GameInstance, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'instance_id' })
instance: GameInstance;
}

View File

@@ -5,30 +5,53 @@ import { Repository } from 'typeorm';
import { NatsService } from './nats.service';
import { ServerConnection } from '../entities/server-connection.entity';
import { License } from '../entities/license.entity';
import { AgentHost, AgentHostDisk } from '../entities/agent-host.entity';
import { GameInstance } from '../entities/game-instance.entity';
/**
* Consumes Corrosion wire protocol v2 host-agent subjects
* (corrosion-host-agent/PROTOCOL.md) and keeps server_connections truthful.
* (corrosion-host-agent/PROTOCOL.md) and keeps the fleet model truthful.
*
* Before this service existed, NOTHING persisted agent heartbeats:
* companion_last_seen was written once at setup and connection_status stayed
* 'connected' forever. Now: heartbeat -> last_seen + connected (row
* auto-created on first contact), going_offline beacon -> offline, and a
* staleness sweep marks hosts offline when heartbeats stop arriving.
* Writes the License → Host → Instance model (hosts + game_instances) from
* each heartbeat, AND maintains the legacy single-server `server_connections`
* row so the current panel keeps working during the fleet UI transition.
*
* Host identity: until enrollment issues a stable host id, a host is keyed by
* (license_id, hostname). One agent = one host today; the schema is already
* multi-host-ready.
*/
interface HeartbeatPayload {
schema?: number;
timestamp?: string;
agent?: { version?: string; commit?: string; os?: string; arch?: string };
host?: {
hostname?: string | null;
cpu_percent?: number;
cpu_cores?: number;
mem_total_mb?: number;
mem_used_mb?: number;
uptime_seconds?: number;
disks?: AgentHostDisk[];
};
instances?: Array<{
id: string;
game: string;
label?: string | null;
state?: string;
uptime_seconds?: number;
}>;
}
@Injectable()
export class HostAgentConsumerService implements OnApplicationBootstrap {
private readonly logger = new Logger(HostAgentConsumerService.name);
/** licenseId -> cache expiry epoch-ms. Positive = exists, absent = unknown. */
private knownLicenses = new Map<string, number>();
/** Unknown/garbage license ids we already warned about (anti log-spam). */
private warnedUnknown = new Set<string>();
private static readonly UUID_RE =
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
private static readonly LICENSE_CACHE_TTL_MS = 5 * 60_000;
/** 3x the agent's default 60s heartbeat (which jitters to max 72s). */
private static readonly OFFLINE_AFTER_MS = 180_000;
constructor(
@@ -37,6 +60,10 @@ export class HostAgentConsumerService implements OnApplicationBootstrap {
private readonly connectionRepository: Repository<ServerConnection>,
@InjectRepository(License)
private readonly licenseRepository: Repository<License>,
@InjectRepository(AgentHost)
private readonly hostRepository: Repository<AgentHost>,
@InjectRepository(GameInstance)
private readonly instanceRepository: Repository<GameInstance>,
) {}
// Bootstrap, not module-init: subscriptions registered before NatsService
@@ -44,10 +71,9 @@ export class HostAgentConsumerService implements OnApplicationBootstrap {
onApplicationBootstrap() {
this.nats.subscribe('corrosion.*.host.heartbeat', (data, subject) => {
const licenseId = subject.split('.')[1];
void this.onHeartbeat(licenseId).catch((err) =>
void this.onHeartbeat(licenseId, data as HeartbeatPayload).catch((err) =>
this.logger.error(`heartbeat handling failed for ${licenseId}: ${err.message}`, err.stack),
);
void data; // payload telemetry is bridged to the browser; persistence here is liveness only
});
this.nats.subscribe('corrosion.*.host.going_offline', (_data, subject) => {
@@ -60,25 +86,24 @@ export class HostAgentConsumerService implements OnApplicationBootstrap {
this.logger.log('Host agent (protocol v2) consumer subscriptions initialized');
}
private async onHeartbeat(licenseId: string): Promise<void> {
private async onHeartbeat(licenseId: string, payload: HeartbeatPayload): Promise<void> {
if (!(await this.isValidTenant(licenseId))) return;
const now = new Date();
const existing = await this.connectionRepository.findOne({
where: { license_id: licenseId },
});
await this.updateLegacyConnection(licenseId, now);
const host = await this.upsertHost(licenseId, payload, now);
await this.upsertInstances(licenseId, host, payload, now);
}
/** Legacy single-server row — keeps the current panel working. */
private async updateLegacyConnection(licenseId: string, now: Date): Promise<void> {
const existing = await this.connectionRepository.findOne({ where: { license_id: licenseId } });
if (existing) {
await this.connectionRepository.update(
{ id: existing.id },
{ companion_last_seen: now, connection_status: 'connected', updated_at: now },
);
if (existing.connection_status !== 'connected') {
this.logger.log(`host agent for license ${licenseId} is back online`);
}
} else {
// First contact from a host agent: auto-register the connection so the
// panel lights up without a manual setup step.
await this.connectionRepository.save(
this.connectionRepository.create({
license_id: licenseId,
@@ -87,28 +112,102 @@ export class HostAgentConsumerService implements OnApplicationBootstrap {
companion_last_seen: now,
}),
);
this.logger.log(`host agent registered for license ${licenseId} (first heartbeat)`);
}
}
/** Upsert the fleet host row, keyed by (license_id, hostname). */
private async upsertHost(licenseId: string, payload: HeartbeatPayload, now: Date): Promise<AgentHost> {
const hostname = payload.host?.hostname ?? '';
const fields = {
agent_version: payload.agent?.version ?? null,
agent_commit: payload.agent?.commit ?? null,
os: payload.agent?.os ?? null,
arch: payload.agent?.arch ?? null,
status: 'connected',
last_heartbeat_at: now,
cpu_percent: payload.host?.cpu_percent ?? null,
cpu_cores: payload.host?.cpu_cores ?? null,
mem_total_mb: payload.host?.mem_total_mb ?? null,
mem_used_mb: payload.host?.mem_used_mb ?? null,
uptime_seconds: payload.host?.uptime_seconds ?? null,
disks: payload.host?.disks ?? null,
updated_at: now,
};
const existing = await this.hostRepository.findOne({
where: { license_id: licenseId, hostname },
});
if (existing) {
await this.hostRepository.update({ id: existing.id }, fields);
return { ...existing, ...fields } as AgentHost;
}
const created = await this.hostRepository.save(
this.hostRepository.create({ license_id: licenseId, hostname, ...fields }),
);
this.logger.log(`host registered for license ${licenseId} (hostname '${hostname || 'unknown'}')`);
return created;
}
/** Upsert one game_instances row per heartbeat instance entry. */
private async upsertInstances(
licenseId: string,
host: AgentHost,
payload: HeartbeatPayload,
now: Date,
): Promise<void> {
for (const inst of payload.instances ?? []) {
if (!inst?.id || !inst?.game) continue;
const fields = {
host_id: host.id,
game: inst.game,
label: inst.label ?? null,
state: inst.state ?? 'unknown',
uptime_seconds: inst.uptime_seconds ?? 0,
last_seen_at: now,
updated_at: now,
};
const existing = await this.instanceRepository.findOne({
where: { license_id: licenseId, agent_instance_id: inst.id },
});
if (existing) {
await this.instanceRepository.update({ id: existing.id }, fields);
} else {
await this.instanceRepository.save(
this.instanceRepository.create({
license_id: licenseId,
agent_instance_id: inst.id,
...fields,
}),
);
this.logger.log(`instance '${inst.id}' (${inst.game}) registered for license ${licenseId}`);
}
}
}
private async onGoingOffline(licenseId: string): Promise<void> {
if (!(await this.isValidTenant(licenseId))) return;
const now = new Date();
await this.connectionRepository.update(
{ license_id: licenseId },
{ connection_status: 'offline', updated_at: new Date() },
{ connection_status: 'offline', updated_at: now },
);
this.logger.log(`host agent for license ${licenseId} went offline (graceful beacon)`);
await this.hostRepository.update(
{ license_id: licenseId },
{ status: 'offline', updated_at: now },
);
this.logger.log(`host(s) for license ${licenseId} went offline (graceful beacon)`);
}
/**
* Heartbeats stopping must flip the panel to offline — an agent that
* crashes or loses network never sends the goodbye beacon.
* crashes or loses network never sends the goodbye beacon. Sweeps both the
* legacy connection and fleet hosts.
*/
@Interval(60_000)
async sweepStaleConnections(): Promise<void> {
const threshold = new Date(Date.now() - HostAgentConsumerService.OFFLINE_AFTER_MS);
const result = await this.connectionRepository
const conn = await this.connectionRepository
.createQueryBuilder()
.update(ServerConnection)
.set({ connection_status: 'offline', updated_at: () => 'NOW()' })
@@ -117,8 +216,18 @@ export class HostAgentConsumerService implements OnApplicationBootstrap {
.andWhere('companion_last_seen < :threshold', { threshold })
.execute();
if (result.affected) {
this.logger.warn(`marked ${result.affected} stale host connection(s) offline`);
const hosts = await this.hostRepository
.createQueryBuilder()
.update(AgentHost)
.set({ status: 'offline', updated_at: () => 'NOW()' })
.where('status = :connected', { connected: 'connected' })
.andWhere('last_heartbeat_at IS NOT NULL')
.andWhere('last_heartbeat_at < :threshold', { threshold })
.execute();
const affected = (conn.affected ?? 0) + (hosts.affected ?? 0);
if (affected) {
this.logger.warn(`marked ${affected} stale connection/host record(s) offline`);
}
}
@@ -132,7 +241,6 @@ export class HostAgentConsumerService implements OnApplicationBootstrap {
this.warnUnknownOnce(licenseId, 'not a UUID');
return false;
}
const cachedUntil = this.knownLicenses.get(licenseId);
if (cachedUntil && cachedUntil > Date.now()) return true;
@@ -141,7 +249,6 @@ export class HostAgentConsumerService implements OnApplicationBootstrap {
this.warnUnknownOnce(licenseId, 'no such license');
return false;
}
this.knownLicenses.set(licenseId, Date.now() + HostAgentConsumerService.LICENSE_CACHE_TTL_MS);
return true;
}

View File

@@ -0,0 +1,102 @@
-- Fleet data model — License → Host → Instance (with optional Cluster)
--
-- ADDITIVE: existing server_connections / server_config / server_stats are
-- left untouched so the current single-server panel keeps working. The
-- host-agent consumer writes BOTH the legacy connection row and these fleet
-- tables during the transition; the panel migrates to the fleet tables in a
-- later phase.
--
-- Shape mirrors the host agent's wire protocol v2 heartbeat:
-- host{} block → agent_hosts
-- instances[] entries → game_instances
-- Host metrics (CPU/RAM/disk) live on the HOST, not duplicated per instance.
--
-- Named `agent_hosts` (not `hosts`) to avoid collision with the existing B2B
-- `hosts` table (hosting-partner companies) — different concept entirely.
-----------------------------------------------------------
-- AGENT_HOSTS — one Corrosion host agent / one machine
-----------------------------------------------------------
CREATE TABLE IF NOT EXISTS agent_hosts (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
license_id UUID NOT NULL REFERENCES licenses(id) ON DELETE CASCADE,
-- Natural key until enrollment issues a stable host identity.
hostname VARCHAR(255) NOT NULL DEFAULT '',
agent_version VARCHAR(64),
agent_commit VARCHAR(64),
os VARCHAR(32),
arch VARCHAR(32),
status VARCHAR(20) NOT NULL DEFAULT 'offline'
CHECK (status IN ('connected', 'degraded', 'offline')),
last_heartbeat_at TIMESTAMPTZ,
cpu_percent DOUBLE PRECISION,
cpu_cores INTEGER,
mem_total_mb BIGINT,
mem_used_mb BIGINT,
uptime_seconds BIGINT,
disks JSONB, -- [{ "mount": "/", "total_mb": n, "free_mb": n }]
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (license_id, hostname)
);
CREATE INDEX IF NOT EXISTS idx_agent_hosts_license ON agent_hosts(license_id);
-----------------------------------------------------------
-- INSTANCE CLUSTERS — optional grouping (Soulmask main/child, Dune battlegroup)
-- Reserved now; cluster logic ships with those game adapters.
-----------------------------------------------------------
CREATE TABLE IF NOT EXISTS instance_clusters (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
license_id UUID NOT NULL REFERENCES licenses(id) ON DELETE CASCADE,
game VARCHAR(32) NOT NULL,
name VARCHAR(255) NOT NULL,
topology VARCHAR(32), -- main_client | battlegroup
config JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_clusters_license ON instance_clusters(license_id);
-----------------------------------------------------------
-- GAME INSTANCES — one game server process / orchestrated unit.
-- The billing unit (plans count instances).
-----------------------------------------------------------
CREATE TABLE IF NOT EXISTS game_instances (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
license_id UUID NOT NULL REFERENCES licenses(id) ON DELETE CASCADE,
host_id UUID REFERENCES agent_hosts(id) ON DELETE SET NULL,
cluster_id UUID REFERENCES instance_clusters(id) ON DELETE SET NULL,
-- The agent's instance slug; the NATS subject segment.
agent_instance_id VARCHAR(64) NOT NULL,
game VARCHAR(32) NOT NULL,
label VARCHAR(255),
-- running | stopped | starting | stopping | crashed
-- | configured | missing_root | unmanaged | unknown
state VARCHAR(32) NOT NULL DEFAULT 'unknown',
root_path TEXT,
uptime_seconds BIGINT NOT NULL DEFAULT 0,
last_seen_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (license_id, agent_instance_id)
);
CREATE INDEX IF NOT EXISTS idx_instances_license ON game_instances(license_id);
CREATE INDEX IF NOT EXISTS idx_instances_host ON game_instances(host_id);
-----------------------------------------------------------
-- INSTANCE STATS — per-instance time series (game metrics).
-- Populated once game-level telemetry (player count/FPS via RCON/plugin) is
-- collected; the host heartbeat carries host metrics, not game metrics.
-----------------------------------------------------------
CREATE TABLE IF NOT EXISTS instance_stats (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
instance_id UUID NOT NULL REFERENCES game_instances(id) ON DELETE CASCADE,
license_id UUID NOT NULL REFERENCES licenses(id) ON DELETE CASCADE,
player_count INTEGER NOT NULL DEFAULT 0,
max_players INTEGER NOT NULL DEFAULT 0,
fps DOUBLE PRECISION NOT NULL DEFAULT 0,
memory_usage_mb INTEGER NOT NULL DEFAULT 0,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_instance_stats_instance
ON instance_stats(instance_id, recorded_at DESC);