Forráskód Böngészése

Merge pull request #139 from peers/feature/brokenConnectionDetection

add checkConnections service
afrokick 5 éve
szülő
commit
eda26cecd7

+ 5 - 0
bin/peerjs

@@ -20,6 +20,11 @@ const opts = require('optimist')
       description: 'concurrent limit',
       description: 'concurrent limit',
       default: 5000
       default: 5000
     },
     },
+    alive_timeout: {
+      demand: false,
+      description: 'broken connection check timeout (milliseconds)',
+      default: 60000
+    },
     key: {
     key: {
       demand: false,
       demand: false,
       alias: 'k',
       alias: 'k',

+ 1 - 0
config/index.js

@@ -1,6 +1,7 @@
 module.exports = {
 module.exports = {
   port: 9000,
   port: 9000,
   expire_timeout: 5000,
   expire_timeout: 5000,
+  alive_timeout: 60000,
   key: 'peerjs',
   key: 'peerjs',
   path: '/myapp',
   path: '/myapp',
   concurrent_limit: 5000,
   concurrent_limit: 5000,

+ 1 - 1
package.json

@@ -13,7 +13,7 @@
   "author": "Michelle Bu, Eric Zhang",
   "author": "Michelle Bu, Eric Zhang",
   "license": "MIT",
   "license": "MIT",
   "scripts": {
   "scripts": {
-    "test": "eslint . && mocha test/**/*.js",
+    "test": "eslint . && mocha \"test/**/*.js\"",
     "start": "bin/peerjs --port ${PORT:=9000}"
     "start": "bin/peerjs --port ${PORT:=9000}"
   },
   },
   "dependencies": {
   "dependencies": {

+ 8 - 0
src/index.js

@@ -11,7 +11,13 @@ const init = ({ app, server, options }) => {
   const realm = new Realm();
   const realm = new Realm();
   const messageHandler = require('./messageHandler')({ realm });
   const messageHandler = require('./messageHandler')({ realm });
   const api = require('./api')({ config, realm, messageHandler });
   const api = require('./api')({ config, realm, messageHandler });
+
   const { startMessagesExpiration } = require('./services/messagesExpire')({ realm, config, messageHandler });
   const { startMessagesExpiration } = require('./services/messagesExpire')({ realm, config, messageHandler });
+  const checkBrokenConnections = require('./services/checkBrokenConnections')({
+    realm, config, onClose: (client) => {
+      app.emit('disconnect', client);
+    }
+  });
 
 
   app.use(options.path, api);
   app.use(options.path, api);
 
 
@@ -52,6 +58,8 @@ const init = ({ app, server, options }) => {
   });
   });
 
 
   startMessagesExpiration();
   startMessagesExpiration();
+
+  checkBrokenConnections.start();
 };
 };
 
 
 function ExpressPeerServer(server, options) {
 function ExpressPeerServer(server, options) {

+ 4 - 0
src/messageHandler/handlers/heartbeat/index.js

@@ -0,0 +1,4 @@
+module.exports = (client) => {
+  const nowTime = new Date().getTime();
+  client.setLastPing(nowTime);
+};

+ 5 - 6
src/messageHandler/index.js

@@ -1,15 +1,15 @@
 const { MessageType } = require('../enums');
 const { MessageType } = require('../enums');
 
 
 class MessageHandlers {
 class MessageHandlers {
-  constructor () {
+  constructor() {
     this.handlers = {};
     this.handlers = {};
   }
   }
 
 
-  registerHandler (messageType, handler) {
+  registerHandler(messageType, handler) {
     this.handlers[messageType] = handler;
     this.handlers[messageType] = handler;
   }
   }
 
 
-  handle (client, message) {
+  handle(client, message) {
     const { type } = message;
     const { type } = message;
 
 
     const handler = this.handlers[type];
     const handler = this.handlers[type];
@@ -23,6 +23,7 @@ class MessageHandlers {
 }
 }
 module.exports = ({ realm }) => {
 module.exports = ({ realm }) => {
   const transmissionHandler = require('./handlers/transmission')({ realm });
   const transmissionHandler = require('./handlers/transmission')({ realm });
+  const heartbeatHandler = require('./handlers/heartbeat');
 
 
   const messageHandlers = new MessageHandlers();
   const messageHandlers = new MessageHandlers();
 
 
@@ -35,9 +36,7 @@ module.exports = ({ realm }) => {
     });
     });
   };
   };
 
 
-  const handleHeartbeat = () => {
-
-  };
+  const handleHeartbeat = (client) => heartbeatHandler(client);
 
 
   messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
   messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
   messageHandlers.registerHandler(MessageType.OFFER, handleTransmission);
   messageHandlers.registerHandler(MessageType.OFFER, handleTransmission);

+ 18 - 5
src/models/client.js

@@ -1,23 +1,36 @@
 class Client {
 class Client {
-  constructor ({ id, token }) {
+  constructor({ id, token }) {
     this.id = id;
     this.id = id;
     this.token = token;
     this.token = token;
     this.socket = null;
     this.socket = null;
+    this.lastPing = new Date().getTime();
   }
   }
 
 
-  getId () {
+  getId() {
     return this.id;
     return this.id;
   }
   }
 
 
-  getToken () {
+  getToken() {
     return this.token;
     return this.token;
   }
   }
 
 
-  setSocket (socket) {
+  getSocket() {
+    return this.socket;
+  }
+
+  setSocket(socket) {
     this.socket = socket;
     this.socket = socket;
   }
   }
 
 
-  send (data) {
+  getLastPing() {
+    return this.lastPing;
+  }
+
+  setLastPing(lastPing) {
+    this.lastPing = lastPing;
+  }
+
+  send(data) {
     this.socket.send(JSON.stringify(data));
     this.socket.send(JSON.stringify(data));
   }
   }
 }
 }

+ 57 - 0
src/services/checkBrokenConnections/index.js

@@ -0,0 +1,57 @@
+const DEFAULT_CHECK_INTERVAL = 300;
+
+module.exports = ({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose = () => { } }) => {
+  const checkConnections = () => {
+    const clientsIds = realm.getClientsIds();
+
+    const now = new Date().getTime();
+    const aliveTimeout = config.alive_timeout;
+
+    for (const clientId of clientsIds) {
+      const client = realm.getClientById(clientId);
+      const timeSinceLastPing = now - client.getLastPing();
+
+      if (timeSinceLastPing < aliveTimeout) continue;
+
+      try {
+        client.getSocket().close();
+        // eslint-disable-next-line no-empty
+      } catch (e) { } finally {
+        realm.clearMessageQueue(clientId);
+        realm.removeClientById(clientId);
+        client.setSocket(null);
+
+        if (onClose) onClose(client);
+      }
+    }
+  };
+
+  let timeoutId;
+
+  const start = () => {
+    if (timeoutId) {
+      clearTimeout(timeoutId);
+    }
+
+    timeoutId = setTimeout(() => {
+      checkConnections();
+
+      timeoutId = null;
+
+      start();
+    }, checkInterval);
+  };
+
+  const stop = () => {
+    if (timeoutId) {
+      clearTimeout(timeoutId);
+      timeoutId = null;
+    }
+  };
+
+  return {
+    start,
+    stop,
+    CHECK_INTERVAL: checkInterval
+  };
+};

+ 16 - 0
test/messageHandler/handlers/heartbeat/index.js

@@ -0,0 +1,16 @@
+const { expect } = require('chai');
+const Client = require('../../../../src/models/client');
+const heartbeatHandler = require('../../../../src/messageHandler/handlers/heartbeat');
+
+describe('Heartbeat handler', () => {
+    it('should update last ping time', () => {
+        const client = new Client({ id: 'id', token: '' });
+        client.setLastPing(0);
+
+        const nowTime = new Date().getTime();
+
+        heartbeatHandler(client);
+
+        expect(client.getLastPing()).to.be.closeTo(nowTime, 2);
+    });
+});

+ 43 - 0
test/services/checkBrokenConnections/index.js

@@ -0,0 +1,43 @@
+const { expect } = require('chai');
+const Client = require('../../../src/models/client');
+const Realm = require('../../../src/models/realm');
+const checkBrokenConnectionsBuilder = require('../../../src/services/checkBrokenConnections');
+
+describe('checkBrokenConnections service', () => {
+    it('should remove client after 2 checks', (done) => {
+        const realm = new Realm();
+        const doubleCheckTime = 55;//~ equals to checkBrokenConnections.CHECK_INTERVAL * 2
+        const checkBrokenConnections = checkBrokenConnectionsBuilder({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
+        const client = new Client({ id: 'id', token: '' });
+        realm.setClient(client, 'id');
+
+        checkBrokenConnections.start();
+
+        setTimeout(() => {
+            expect(realm.getClientById('id')).to.be.undefined;
+            checkBrokenConnections.stop();
+            done();
+        }, checkBrokenConnections.CHECK_INTERVAL * 2 + 3);
+    });
+
+    it('should remove client after 1 ping', (done) => {
+        const realm = new Realm();
+        const doubleCheckTime = 55;//~ equals to checkBrokenConnections.CHECK_INTERVAL * 2
+        const checkBrokenConnections = checkBrokenConnectionsBuilder({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
+        const client = new Client({ id: 'id', token: '' });
+        realm.setClient(client, 'id');
+
+        checkBrokenConnections.start();
+
+        //set ping after first check
+        setTimeout(() => {
+            client.setLastPing(new Date().getTime());
+
+            setTimeout(() => {
+                expect(realm.getClientById('id')).to.be.undefined;
+                checkBrokenConnections.stop();
+                done();
+            }, checkBrokenConnections.CHECK_INTERVAL * 2 + 10);
+        }, checkBrokenConnections.CHECK_INTERVAL);
+    });
+});