import { WebSocketGateway, WebSocketServer, OnGatewayConnection, OnGatewayDisconnect, SubscribeMessage, MessageBody, ConnectedSocket, } from '@nestjs/websockets'; import { Logger } from '@nestjs/common'; import { IncomingMessage } from 'http'; import WebSocket, { Server } from 'ws'; import { JwtService } from '@nestjs/jwt'; import { ConfigService } from '@nestjs/config'; import { NatsBridgeService } from '../services/nats-bridge.service'; import { NatsService } from '../services/nats.service'; interface ClientMeta { userId: string; licenseId: string; email: string; } @WebSocketGateway({ path: '/api/ws' }) export class NatsBridgeGateway implements OnGatewayConnection, OnGatewayDisconnect { private readonly logger = new Logger(NatsBridgeGateway.name); @WebSocketServer() server!: Server; // Client metadata and listener tracking (native WS has no .data or .join()) private clientMeta = new Map(); private licenseClients = new Map>(); private clientListeners = new Map void>(); constructor( private jwtService: JwtService, private configService: ConfigService, private natsBridge: NatsBridgeService, private natsService: NatsService, ) {} async handleConnection(client: WebSocket, request: IncomingMessage) { try { // Parse token from query string const url = new URL(request.url || '/', `http://${request.headers.host}`); const token = url.searchParams.get('token'); if (!token) { client.send(JSON.stringify({ type: 'error', message: 'Authentication required' })); client.close(4001, 'Authentication required'); return; } const secret = this.configService.get('jwt.secret'); const payload = this.jwtService.verify(token, { secret }); const meta: ClientMeta = { userId: payload.sub, licenseId: payload.license_id, email: payload.email, }; this.clientMeta.set(client, meta); // Track client by license for broadcasting if (payload.license_id) { if (!this.licenseClients.has(payload.license_id)) { this.licenseClients.set(payload.license_id, new Set()); } this.licenseClients.get(payload.license_id)!.add(client); // Subscribe to NATS events for this license const listener = (event: string, data: unknown) => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify({ type: 'event', license_id: payload.license_id, event, data, })); } }; this.natsBridge.addListener(payload.license_id, listener); this.clientListeners.set(client, listener); } client.send(JSON.stringify({ type: 'connected', license_id: payload.license_id })); this.logger.log(`Client connected: ${payload.email} (license: ${payload.license_id})`); } catch { client.send(JSON.stringify({ type: 'error', message: 'Invalid token' })); client.close(4002, 'Invalid token'); } } handleDisconnect(client: WebSocket) { const meta = this.clientMeta.get(client); if (meta?.licenseId) { // Remove NATS listener const listener = this.clientListeners.get(client); if (listener) { this.natsBridge.removeListener(meta.licenseId, listener); this.clientListeners.delete(client); } // Remove from license client set this.licenseClients.get(meta.licenseId)?.delete(client); if (this.licenseClients.get(meta.licenseId)?.size === 0) { this.licenseClients.delete(meta.licenseId); } } this.clientMeta.delete(client); } @SubscribeMessage('console_input') async handleConsoleInput( @ConnectedSocket() client: WebSocket, @MessageBody() data: { command: string }, ) { const meta = this.clientMeta.get(client); if (!meta?.licenseId) return; await this.natsService.sendServerCommand(meta.licenseId, 'command', { command: data.command }); } sendToLicense(licenseId: string, event: string, data: unknown): void { const clients = this.licenseClients.get(licenseId); if (!clients) return; const message = JSON.stringify({ type: 'event', license_id: licenseId, event, data, }); for (const client of clients) { if (client.readyState === WebSocket.OPEN) { client.send(message); } } } }