Production bug caught live: provider onModuleInit order put bridge/ consumer subscription hooks BEFORE NatsService finished connecting, so every subscribe() hit the [OFFLINE] no-op path — the WS bridge has been dead-on-boot in every production build, and the new v2 consumer never saw a heartbeat (server_connections stayed empty under a live agent). onApplicationBootstrap is guaranteed to run after all module inits, including the awaited NATS connect. The new CI contract suite fails on exactly this class of bug. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
86 lines
3.1 KiB
TypeScript
86 lines
3.1 KiB
TypeScript
import { Injectable, OnApplicationBootstrap, Logger } from '@nestjs/common';
|
|
import { NatsService } from './nats.service';
|
|
|
|
@Injectable()
|
|
export class NatsBridgeService implements OnApplicationBootstrap {
|
|
private readonly logger = new Logger(NatsBridgeService.name);
|
|
private listeners: Map<string, Set<(event: string, data: unknown) => void>> = new Map();
|
|
|
|
constructor(private nats: NatsService) {}
|
|
|
|
// Subscriptions MUST happen in onApplicationBootstrap, not onModuleInit:
|
|
// provider onModuleInit order is not guaranteed, and these hooks once ran
|
|
// before NatsService connected — every subscribe() silently no-oped and the
|
|
// WS bridge was dead from boot. Bootstrap runs after ALL module inits
|
|
// (including the awaited NATS connect) complete.
|
|
onApplicationBootstrap() {
|
|
this.nats.subscribe('corrosion.*.companion.heartbeat', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'heartbeat', data);
|
|
});
|
|
|
|
this.nats.subscribe('corrosion.*.console.output', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'console_output', data);
|
|
});
|
|
|
|
this.nats.subscribe('corrosion.*.files.response', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'files_response', data);
|
|
});
|
|
|
|
this.nats.subscribe('corrosion.*.wipe.status', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'wipe_status', data);
|
|
});
|
|
|
|
this.nats.subscribe('corrosion.*.server.status', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'server_status', data);
|
|
});
|
|
|
|
this.nats.subscribe('corrosion.*.deploy.status', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'deploy_status', data);
|
|
});
|
|
|
|
this.nats.subscribe('corrosion.*.oxide.status', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'oxide_status', data);
|
|
});
|
|
|
|
// Wire protocol v2 (corrosion-host-agent) — host-level telemetry
|
|
this.nats.subscribe('corrosion.*.host.heartbeat', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'host_heartbeat', data);
|
|
});
|
|
|
|
this.nats.subscribe('corrosion.*.host.going_offline', (data, subject) => {
|
|
const licenseId = subject.split('.')[1];
|
|
this.emit(licenseId, 'host_going_offline', data);
|
|
});
|
|
|
|
this.logger.log('NATS bridge subscriptions initialized');
|
|
}
|
|
|
|
addListener(licenseId: string, callback: (event: string, data: unknown) => void): void {
|
|
if (!this.listeners.has(licenseId)) {
|
|
this.listeners.set(licenseId, new Set());
|
|
}
|
|
this.listeners.get(licenseId)!.add(callback);
|
|
}
|
|
|
|
removeListener(licenseId: string, callback: (event: string, data: unknown) => void): void {
|
|
this.listeners.get(licenseId)?.delete(callback);
|
|
}
|
|
|
|
private emit(licenseId: string, event: string, data: unknown): void {
|
|
const callbacks = this.listeners.get(licenseId);
|
|
if (callbacks) {
|
|
for (const cb of callbacks) {
|
|
cb(event, data);
|
|
}
|
|
}
|
|
}
|
|
}
|