import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { connect, NatsConnection, StringCodec, Subscription } from 'nats'; import { createHmac, randomUUID } from 'crypto'; export interface AgentCredentials { license_id: string; nats_user: string; nats_password: string; nats_url: string; } @Injectable() export class NatsService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(NatsService.name); private nc: NatsConnection | null = null; private sc = StringCodec(); constructor(private config: ConfigService) {} async onModuleInit() { try { const url = this.config.get('nats.url') || 'nats://localhost:4222'; const user = this.config.get('nats.internalUser'); const pass = this.config.get('nats.internalPassword'); // Authenticate with the privileged internal user when configured; // otherwise connect anonymously (broker hasn't enforced auth yet). const opts = user && pass ? { servers: url, user, pass } : { servers: url }; this.nc = await connect(opts); this.logger.log(`Connected to NATS at ${url}${user ? ` as ${user}` : ' (anonymous)'}`); } catch (err) { this.logger.warn(`NATS connection failed — running in offline mode: ${(err as Error).message}`); } } async onModuleDestroy() { if (this.nc) { await this.nc.drain(); } } async publish(subject: string, data: Record): Promise { if (!this.nc) { this.logger.debug(`[OFFLINE] Would publish to ${subject}: ${JSON.stringify(data)}`); return; } this.nc.publish(subject, this.sc.encode(JSON.stringify(data))); } async request(subject: string, data: Record, timeout = 5000): Promise { if (!this.nc) { this.logger.debug(`[OFFLINE] Would request ${subject}: ${JSON.stringify(data)}`); return null; } const msg = await this.nc.request(subject, this.sc.encode(JSON.stringify(data)), { timeout }); return JSON.parse(this.sc.decode(msg.data)); } subscribe(subject: string, callback: (data: unknown, subject: string) => void): Subscription | null { if (!this.nc) { this.logger.debug(`[OFFLINE] Would subscribe to ${subject}`); return null; } const sub = this.nc.subscribe(subject); (async () => { for await (const msg of sub) { try { const parsed = JSON.parse(this.sc.decode(msg.data)); callback(parsed, msg.subject); } catch { callback(this.sc.decode(msg.data), msg.subject); } } })(); return sub; } /** * Request-reply to a host-agent subject with a LICENSE-SCOPED reply subject. * * Per-license agent users are confined to corrosion.{license}.> and have no * _INBOX permission, so the agent cannot publish a reply to the default * global inbox. The reply must live inside the license namespace * (corrosion.{license}.reply.); the privileged backend subscribes there. * See corrosion-host-agent/PROTOCOL.md ("Reply-subject rule"). */ async requestScoped( licenseId: string, subject: string, payload: Record, timeoutMs = 8000, ): Promise { if (!this.nc) { throw new Error('NATS unavailable — agent is not reachable'); } const replySubject = `corrosion.${licenseId}.reply.${randomUUID()}`; const nc = this.nc; return new Promise((resolve, reject) => { nc.subscribe(replySubject, { max: 1, timeout: timeoutMs, callback: (err, msg) => { if (err) { reject(new Error(`agent did not respond within ${timeoutMs}ms`)); return; } try { resolve(JSON.parse(this.sc.decode(msg.data)) as T); } catch { resolve(this.sc.decode(msg.data) as unknown as T); } }, }); nc.publish(subject, this.sc.encode(JSON.stringify(payload)), { reply: replySubject }); }); } /** * Derive a license's agent NATS credentials. Password is * HMAC-SHA256(license_id, NATS_TOKEN_SECRET) — must match the broker config * generated by scripts/generate-nats-auth.mjs. Returns null if the secret * isn't configured (broker not yet enforcing auth). */ getAgentCredentials(licenseId: string): AgentCredentials | null { const secret = this.config.get('nats.tokenSecret'); if (!secret) return null; const password = createHmac('sha256', secret).update(licenseId).digest('hex'); return { license_id: licenseId, nats_user: licenseId, nats_password: password, nats_url: this.config.get('nats.publicUrl') || 'nats://nats.corrosionmgmt.com:4222', }; } /** Publish a command to a specific license's server */ async sendServerCommand(licenseId: string, action: string, payload: Record = {}): Promise { await this.publish(`corrosion.${licenseId}.cmd.server`, { action, ...payload, timestamp: new Date().toISOString(), }); } /** Publish a deploy command to a specific license's companion agent */ async sendDeployCommand(licenseId: string, config: Record): Promise { await this.publish(`corrosion.${licenseId}.cmd.deploy`, { action: 'deploy', config, timestamp: new Date().toISOString(), }); } /** Publish an Oxide install command to a specific license's companion agent */ async sendOxideInstallCommand(licenseId: string): Promise { await this.publish(`corrosion.${licenseId}.cmd.oxide`, { action: 'install_oxide', timestamp: new Date().toISOString(), }); } }