fix: Replace socket.io with native WS adapter — fixes WebSocket 1006
All checks were successful
Test Asgard Runner / test (push) Successful in 3s
All checks were successful
Test Asgard Runner / test (push) Successful in 3s
Frontend uses native WebSocket API, backend was using socket.io which speaks an incompatible protocol. Switched to @nestjs/platform-ws so both sides speak native WebSocket. Also fixed JWT TTL override in docker-compose.yml (was hardcoded to 900s, now 14400s/4h). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,32 +4,35 @@ import {
|
||||
OnGatewayConnection,
|
||||
OnGatewayDisconnect,
|
||||
SubscribeMessage,
|
||||
MessageBody,
|
||||
ConnectedSocket,
|
||||
} from '@nestjs/websockets';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { Server, Socket } from 'socket.io';
|
||||
import { IncomingMessage } from 'http';
|
||||
import WebSocket, { Server } from 'ws';
|
||||
import { JwtService } from '@nestjs/jwt';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { NatsBridgeService } from '../services/nats-bridge.service';
|
||||
import { NatsService } from '../services/nats.service';
|
||||
|
||||
interface AuthenticatedSocket extends Socket {
|
||||
data: {
|
||||
userId: string;
|
||||
licenseId: string;
|
||||
email: string;
|
||||
};
|
||||
interface ClientMeta {
|
||||
userId: string;
|
||||
licenseId: string;
|
||||
email: string;
|
||||
}
|
||||
|
||||
@WebSocketGateway({
|
||||
namespace: '/ws',
|
||||
cors: { origin: '*' },
|
||||
})
|
||||
@WebSocketGateway({ path: '/api/ws' })
|
||||
export class NatsBridgeGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
private readonly logger = new Logger(NatsBridgeGateway.name);
|
||||
|
||||
@WebSocketServer()
|
||||
server!: Server;
|
||||
|
||||
// Client metadata and listener tracking (native WS has no .data or .join())
|
||||
private clientMeta = new Map<WebSocket, ClientMeta>();
|
||||
private licenseClients = new Map<string, Set<WebSocket>>();
|
||||
private clientListeners = new Map<WebSocket, (event: string, data: unknown) => void>();
|
||||
|
||||
constructor(
|
||||
private jwtService: JwtService,
|
||||
private configService: ConfigService,
|
||||
@@ -37,70 +40,101 @@ export class NatsBridgeGateway implements OnGatewayConnection, OnGatewayDisconne
|
||||
private natsService: NatsService,
|
||||
) {}
|
||||
|
||||
async handleConnection(client: AuthenticatedSocket) {
|
||||
async handleConnection(client: WebSocket, request: IncomingMessage) {
|
||||
try {
|
||||
const token = client.handshake.query.token as string;
|
||||
// Parse token from query string
|
||||
const url = new URL(request.url || '/', `http://${request.headers.host}`);
|
||||
const token = url.searchParams.get('token');
|
||||
|
||||
if (!token) {
|
||||
client.emit('error', { message: 'Authentication required' });
|
||||
client.disconnect();
|
||||
client.send(JSON.stringify({ type: 'error', message: 'Authentication required' }));
|
||||
client.close(4001, 'Authentication required');
|
||||
return;
|
||||
}
|
||||
|
||||
const secret = this.configService.get<string>('jwt.secret');
|
||||
const payload = this.jwtService.verify(token, { secret });
|
||||
|
||||
client.data = {
|
||||
const meta: ClientMeta = {
|
||||
userId: payload.sub,
|
||||
licenseId: payload.license_id,
|
||||
email: payload.email,
|
||||
};
|
||||
this.clientMeta.set(client, meta);
|
||||
|
||||
// Track client by license for broadcasting
|
||||
if (payload.license_id) {
|
||||
await client.join(`license:${payload.license_id}`);
|
||||
}
|
||||
if (!this.licenseClients.has(payload.license_id)) {
|
||||
this.licenseClients.set(payload.license_id, new Set());
|
||||
}
|
||||
this.licenseClients.get(payload.license_id)!.add(client);
|
||||
|
||||
if (payload.license_id) {
|
||||
// Subscribe to NATS events for this license
|
||||
const listener = (event: string, data: unknown) => {
|
||||
client.emit('event', {
|
||||
type: 'event',
|
||||
license_id: payload.license_id,
|
||||
event,
|
||||
data,
|
||||
});
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(JSON.stringify({
|
||||
type: 'event',
|
||||
license_id: payload.license_id,
|
||||
event,
|
||||
data,
|
||||
}));
|
||||
}
|
||||
};
|
||||
this.natsBridge.addListener(payload.license_id, listener);
|
||||
(client as Socket & { _natsListener?: typeof listener })._natsListener = listener;
|
||||
this.clientListeners.set(client, listener);
|
||||
}
|
||||
|
||||
client.emit('connected', { type: 'connected', license_id: payload.license_id });
|
||||
client.send(JSON.stringify({ type: 'connected', license_id: payload.license_id }));
|
||||
this.logger.log(`Client connected: ${payload.email} (license: ${payload.license_id})`);
|
||||
} catch {
|
||||
client.emit('error', { message: 'Invalid token' });
|
||||
client.disconnect();
|
||||
client.send(JSON.stringify({ type: 'error', message: 'Invalid token' }));
|
||||
client.close(4002, 'Invalid token');
|
||||
}
|
||||
}
|
||||
|
||||
handleDisconnect(client: AuthenticatedSocket) {
|
||||
if (client.data?.licenseId) {
|
||||
const listener = (client as Socket & { _natsListener?: (event: string, data: unknown) => void })._natsListener;
|
||||
handleDisconnect(client: WebSocket) {
|
||||
const meta = this.clientMeta.get(client);
|
||||
if (meta?.licenseId) {
|
||||
// Remove NATS listener
|
||||
const listener = this.clientListeners.get(client);
|
||||
if (listener) {
|
||||
this.natsBridge.removeListener(client.data.licenseId, listener);
|
||||
this.natsBridge.removeListener(meta.licenseId, listener);
|
||||
this.clientListeners.delete(client);
|
||||
}
|
||||
// Remove from license client set
|
||||
this.licenseClients.get(meta.licenseId)?.delete(client);
|
||||
if (this.licenseClients.get(meta.licenseId)?.size === 0) {
|
||||
this.licenseClients.delete(meta.licenseId);
|
||||
}
|
||||
}
|
||||
this.clientMeta.delete(client);
|
||||
}
|
||||
|
||||
@SubscribeMessage('console_input')
|
||||
async handleConsoleInput(client: AuthenticatedSocket, data: { command: string }) {
|
||||
if (!client.data?.licenseId) return;
|
||||
await this.natsService.sendServerCommand(client.data.licenseId, 'command', { command: data.command });
|
||||
async handleConsoleInput(
|
||||
@ConnectedSocket() client: WebSocket,
|
||||
@MessageBody() data: { command: string },
|
||||
) {
|
||||
const meta = this.clientMeta.get(client);
|
||||
if (!meta?.licenseId) return;
|
||||
await this.natsService.sendServerCommand(meta.licenseId, 'command', { command: data.command });
|
||||
}
|
||||
|
||||
sendToLicense(licenseId: string, event: string, data: unknown): void {
|
||||
this.server.to(`license:${licenseId}`).emit(event, {
|
||||
const clients = this.licenseClients.get(licenseId);
|
||||
if (!clients) return;
|
||||
|
||||
const message = JSON.stringify({
|
||||
type: 'event',
|
||||
license_id: licenseId,
|
||||
event,
|
||||
data,
|
||||
});
|
||||
|
||||
for (const client of clients) {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user