afrokick 4 жил өмнө
parent
commit
051a6ba76e

+ 16 - 1
dist/src/services/webSocketServer/index.js

@@ -10,16 +10,19 @@ const enums_1 = require("../../enums");
 const client_1 = require("../../models/client");
 const WS_PATH = 'peerjs';
 class WebSocketServer extends events_1.default {
-    constructor({ server, realm, config }) {
+    constructor({ server, realm, config, messagesTransport }) {
+        var _a;
         super();
         this.setMaxListeners(0);
         this.realm = realm;
         this.config = config;
+        this.messagesTransport = messagesTransport;
         const path = this.config.path;
         this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`;
         this.socketServer = new ws_1.default.Server({ path: this.path, server });
         this.socketServer.on("connection", (socket, req) => this._onSocketConnection(socket, req));
         this.socketServer.on("error", (error) => this._onSocketError(error));
+        (_a = this.messagesTransport) === null || _a === void 0 ? void 0 : _a.registerHanadler((message) => this._handleMessage(message));
     }
     _onSocketConnection(socket, req) {
         var _a;
@@ -74,6 +77,10 @@ class WebSocketServer extends events_1.default {
             try {
                 const message = JSON.parse(data);
                 message.src = client.getId();
+                if (message.type !== "HEARTBEAT" && this.messagesTransport) {
+                    this.messagesTransport.sendMessage(message);
+                    return;
+                }
                 this.emit("message", client, message);
             }
             catch (e) {
@@ -89,5 +96,13 @@ class WebSocketServer extends events_1.default {
         }));
         socket.close();
     }
+    _handleMessage(message) {
+        const clientId = message.dst;
+        const client = clientId ? this.realm.getClientById(clientId) : undefined;
+        if (!client)
+            return false;
+        this.emit("message", client, message);
+        return true;
+    }
 }
 exports.WebSocketServer = WebSocketServer;

+ 35 - 2
src/services/webSocketServer/index.ts

@@ -20,22 +20,37 @@ interface IAuthParams {
 
 type CustomConfig = Pick<IConfig, 'path' | 'key' | 'concurrent_limit'>;
 
+type MessagesTransport = {
+  registerHanadler(handler: (message: { type: string; dst?: string; } & any) => boolean): void;
+  sendMessage(message: { type: string; src: string; } & any): void;
+};
+
 const WS_PATH = 'peerjs';
 
+type Dependencies = {
+  server: any;
+  realm: IRealm;
+  config: CustomConfig;
+  messagesTransport?: MessagesTransport;
+};
+
 export class WebSocketServer extends EventEmitter implements IWebSocketServer {
 
   public readonly path: string;
+  public readonly socketServer: WebSocketLib.Server;
+
   private readonly realm: IRealm;
   private readonly config: CustomConfig;
-  public readonly socketServer: WebSocketLib.Server;
+  private readonly messagesTransport?: MessagesTransport;
 
-  constructor({ server, realm, config }: { server: any; realm: IRealm; config: CustomConfig; }) {
+  constructor({ server, realm, config, messagesTransport }: Dependencies) {
     super();
 
     this.setMaxListeners(0);
 
     this.realm = realm;
     this.config = config;
+    this.messagesTransport = messagesTransport;
 
     const path = this.config.path;
     this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`;
@@ -44,6 +59,8 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer {
 
     this.socketServer.on("connection", (socket: MyWebSocket, req) => this._onSocketConnection(socket, req));
     this.socketServer.on("error", (error: Error) => this._onSocketError(error));
+
+    this.messagesTransport?.registerHanadler((message) => this._handleMessage(message));
   }
 
   private _onSocketConnection(socket: MyWebSocket, req: IncomingMessage): void {
@@ -121,6 +138,11 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer {
 
         message.src = client.getId();
 
+        if (message.type !== "HEARTBEAT" && this.messagesTransport) {
+          this.messagesTransport.sendMessage(message);
+          return;
+        }
+
         this.emit("message", client, message);
       } catch (e) {
         this.emit("error", e);
@@ -140,4 +162,15 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer {
 
     socket.close();
   }
+
+  private _handleMessage(message: any) {
+    const clientId = message.dst;
+    const client = clientId ? this.realm.getClientById(clientId) : undefined;
+
+    if (!client) return false;
+
+    this.emit("message", client, message);
+
+    return true;
+  }
 }