Просмотр исходного кода

fix https://github.com/peers/peerjs-server/issues/75

afrokick 6 лет назад
Родитель
Сommit
9b15b4b4f3

+ 6 - 5
config/schema.js

@@ -40,10 +40,11 @@ module.exports = convict({
     env: 'PORT',
     arg: 'port'
   },
-  timeout: {
-    doc: '',
+  expire_timeout: {
+    doc: 'The timeout before EXPIRE message send',
     format: 'duration',
-    default: 5000
+    default: 5000,
+    arg: 'expireTimeout'
   },
   key: {
     doc: 'The key to check incoming clients',
@@ -79,9 +80,9 @@ module.exports = convict({
     arg: 'proxied'
   },
   cleanup_out_msgs: {
-    doc: '',
+    doc: 'The period in ms to check expired messages',
     format: 'duration',
-    default: 5000
+    default: 1000
   },
   ssl: {
     key_path: {

+ 10 - 35
src/index.js

@@ -6,10 +6,10 @@ const fs = require('fs');
 const config = require('../config');
 const WebSocketServer = require('./services/webSocketServer');
 const logger = require('./services/logger');
+const realm = require('./services/realm');
+const { startMessagesExpiration } = require('./services/messagesExpire');
 const api = require('./api');
 const messageHandler = require('./messageHandler');
-const realm = require('./services/realm');
-const { MessageType } = require('./enums');
 
 process.on('uncaughtException', (e) => {
   logger.error('Error: ' + e);
@@ -53,10 +53,13 @@ app.use(path, api);
 const wss = new WebSocketServer(server, app.mountpath);
 
 wss.on('connection', client => {
-  const messages = realm.getMessageQueueById(client.getId());
+  const messageQueue = realm.getMessageQueueById(client.getId());
 
-  if (messages) {
-    messages.forEach(message => messageHandler(client, message));
+  if (messageQueue) {
+    let message;
+    while (message = messageQueue.readMessage()) {
+      messageHandler(client, message);
+    }
     realm.clearMessageQueue(client.getId());
   }
 
@@ -86,34 +89,6 @@ server.listen(port, host, () => {
     'Started PeerServer on %s, port: %s',
     host, port
   );
-});
-
-const pruneOutstanding = () => {
-  const destinationClientsIds = realm.messageQueue.keys();
-
-  for (const destinationClientId of destinationClientsIds) {
-    const messages = realm.getMessageQueueById(destinationClientId);
-
-    const seen = {};
 
-    for (const message of messages) {
-      if (!seen[message.src]) {
-        messageHandler(null, {
-          type: MessageType.EXPIRE,
-          src: message.dst,
-          dst: message.src
-        });
-        seen[message.src] = true;
-      }
-    }
-  }
-
-  realm.messageQueue.clear();
-
-  logger.debug(`message queue was cleared`);
-};
-
-// Clean up outstanding messages
-setInterval(() => {
-  pruneOutstanding();
-}, config.get('cleanup_out_msgs'));
+  startMessagesExpiration();
+});

+ 1 - 1
src/messageHandler/index.js

@@ -5,7 +5,7 @@ const transmissionHandler = require('./handlers/transmission');
 const handlers = {};
 
 const registerHandler = (messageType, handler) => {
-  logger.info(`[MSGHANDLER] register handler for ${messageType}`);
+  logger.debug(`[MSGHANDLER] register handler for ${messageType}`);
   handlers[messageType] = handler;
 };
 

+ 30 - 0
src/models/messageQueue.js

@@ -0,0 +1,30 @@
+class MessageQueue {
+  constructor (id) {
+    this._id = id;
+    this._lastReadAt = new Date().getTime();
+    this._messages = [];
+  }
+
+  getLastReadAt () {
+    return this._lastReadAt;
+  }
+
+  addMessage (message) {
+    this._messages.push(message);
+  }
+
+  readMessage () {
+    if (this._messages.length > 0) {
+      this._lastReadAt = new Date().getTime();
+      return this._messages.shift();
+    }
+
+    return null;
+  }
+
+  getMessages () {
+    return this._messages;
+  }
+}
+
+module.exports = MessageQueue;

+ 12 - 10
src/models/realm.js

@@ -1,19 +1,21 @@
+const MessageQueue = require('./messageQueue');
+
 class Realm {
   constructor () {
-    this.clients = new Map();
-    this.messageQueue = new Map();
+    this._clients = new Map();
+    this._messageQueues = new Map();
   }
 
   getClientsIds () {
-    return [...this.clients.keys()];
+    return [...this._clients.keys()];
   }
 
   getClientById (clientId) {
-    return this.clients.get(clientId);
+    return this._clients.get(clientId);
   }
 
   setClient (client, id) {
-    this.clients.set(id, client);
+    this._clients.set(id, client);
   }
 
   removeClientById (id) {
@@ -21,23 +23,23 @@ class Realm {
 
     if (!client) return false;
 
-    this.clients.delete(id);
+    this._clients.delete(id);
   }
 
   getMessageQueueById (id) {
-    return this.messageQueue.get(id);
+    return this._messageQueues.get(id);
   }
 
   addMessageToQueue (id, message) {
     if (!this.getMessageQueueById(id)) {
-      this.messageQueue.set(id, []);
+      this._messageQueues.set(id, new MessageQueue(id));
     }
 
-    this.getMessageQueueById(id).push(message);
+    this.getMessageQueueById(id).addMessage(message);
   }
 
   clearMessageQueue (id) {
-    this.messageQueue.delete(id);
+    this._messageQueues.delete(id);
   }
 
   generateClientId () {

+ 67 - 0
src/services/messagesExpire/index.js

@@ -0,0 +1,67 @@
+const config = require('../../../config');
+const messageHandler = require('../../messageHandler');
+const { MessageType } = require('../../enums');
+const realm = require('../realm');
+const logger = require('../logger');
+
+const pruneOutstanding = () => {
+  const destinationClientsIds = realm._messageQueues.keys();
+
+  const now = new Date().getTime();
+  const maxDiff = config.get('expire_timeout');
+
+  const seen = {};
+
+  for (const destinationClientId of destinationClientsIds) {
+    const messageQueue = realm.getMessageQueueById(destinationClientId);
+    const lastReadDiff = now - messageQueue.getLastReadAt();
+
+    if (lastReadDiff < maxDiff) continue;
+
+    const messages = messageQueue.getMessages();
+
+    for (const message of messages) {
+      if (!seen[message.src]) {
+        messageHandler(null, {
+          type: MessageType.EXPIRE,
+          src: message.dst,
+          dst: message.src
+        });
+        seen[message.src] = true;
+      }
+    }
+
+    realm.clearMessageQueue(destinationClientId);
+
+    logger.trace(`[MSGSEXPIRE] mq ${destinationClientId} was cleared`);
+  }
+};
+
+let timeoutId;
+
+const startMessagesExpiration = () => {
+  if (timeoutId) {
+    clearTimeout(timeoutId);
+  }
+
+  // Clean up outstanding messages
+  timeoutId = setTimeout(() => {
+    pruneOutstanding();
+
+    timeoutId = null;
+
+    startMessagesExpiration();
+  }, config.get('cleanup_out_msgs'));
+};
+
+const stopMessagesExpiration = () => {
+  if (timeoutId) {
+    clearTimeout(timeoutId);
+    timeoutId = null;
+  }
+};
+
+module.exports = {
+  startMessagesExpiration,
+  stopMessagesExpiration
+};

+ 0 - 2
src/services/webSocketServer/index.js

@@ -24,8 +24,6 @@ class WebSocketServer extends EventEmitter {
   }
 
   _onSocketConnection (socket, req) {
-    logger.debug(`[WSS] on new connection:${req}`);
-
     const { query = {} } = url.parse(req.url, true);
 
     const { id, token, key } = query;