Browse Source

Merge pull request #120 from peers/refactoring/convertToLib

Refactoring: convert to lib
afrokick 6 years ago
parent
commit
2a607c25a5

+ 74 - 8
README.md

@@ -19,8 +19,16 @@ npm install
 ```
 
 3. Run the server:
+
 ```bash
-npm run start
+$> peerjs --port 9000 --key peerjs --path /mypapp
+```
+
+Or, create a custom server:
+
+```javascript
+const PeerServer = require('peer').PeerServer;
+const server = PeerServer({port: 9000, path: '/myapp'});
 ```
 
 Connecting to the server from PeerJS:
@@ -31,21 +39,79 @@ Connecting to the server from PeerJS:
 </script>
 ```
 
-Using HTTPS: Simply pass in paths to PEM-encoded certificate and key.
+Using HTTPS: Simply pass in PEM-encoded certificate and key.
 
-```bash
-node ./src/index.js --port 9000 --path /myapp --sslKeyPath /path/to/your/ssl/key/here.key --sslCertPath /path/to/your/ssl/certificate/here.crt
+```javascript
+const fs = require('fs');
+const PeerServer = require('peer').PeerServer;
+
+const server = PeerServer({
+  port: 9000,
+  ssl: {
+    key: fs.readFileSync('/path/to/your/ssl/key/here.key'),
+    cert: fs.readFileSync('/path/to/your/ssl/certificate/here.crt')
+  }
+});
 ```
 
 #### Running PeerServer behind a reverse proxy
 
-Make sure to set the `proxied` option.
+Make sure to set the `proxied` option, otherwise IP based limiting will fail.
 The option is passed verbatim to the
 [expressjs `trust proxy` setting](http://expressjs.com/4x/api.html#app-settings)
 if it is truthy.
 
-```bash
-node ./src/index.js --port 9000 --path /myapp --proxied true
+```javascript
+const PeerServer = require('peer').PeerServer;
+const server = PeerServer({port: 9000, path: '/myapp', proxied: true});
+```
+
+### Combining with existing express app
+
+```javascript
+const express = require('express');
+const app = express();
+const ExpressPeerServer = require('peer').ExpressPeerServer;
+
+app.get('/', (req, res, next) => { res.send('Hello world!'); });
+
+// =======
+
+const server = app.listen(9000);
+
+const options = {
+    debug: true
+}
+
+const peerserver = ExpressPeerServer(server, options);
+
+app.use('/api', peerserver);
+
+// == OR ==
+
+const server = require('http').createServer(app);
+const peerserver = ExpressPeerServer(server, options);
+
+app.use('/peerjs', peerserver);
+
+server.listen(9000);
+
+// ========
+```
+
+### Events
+
+The `'connection'` event is emitted when a peer connects to the server.
+
+```javascript
+peerserver.on('connection', (client) => { ... });
+```
+
+The `'disconnect'` event is emitted when a peer disconnects from the server or
+when the peer can no longer be reached.
+
+```javascript
+peerserver.on('disconnect', (client) => { ... });
 ```
 
 ## Running tests
@@ -73,4 +139,4 @@ This will start a peerjs server on port 9000 exposed on port 9000.
 Discuss PeerJS on our Google Group:
 https://groups.google.com/forum/?fromgroups#!forum/peerjs
 
-Please post any bugs as a Github issue.
+Please post any bugs as a Github issue.

+ 104 - 0
bin/peerjs

@@ -0,0 +1,104 @@
+#!/usr/bin/env node
+
+const path = require('path');
+const pkg = require('../package.json');
+const fs = require('fs');
+const version = pkg.version;
+const PeerServer = require('../src').PeerServer;
+const opts = require('optimist')
+  .usage('Usage: $0')
+  .options({
+    debug: {
+      demand: false,
+      alias: 'd',
+      description: 'debug',
+      default: false
+    },
+    timeout: {
+      demand: false,
+      alias: 't',
+      description: 'timeout (milliseconds)',
+      default: 5000
+    },
+    ip_limit: {
+      demand: false,
+      alias: 'i',
+      description: 'IP limit',
+      default: 5000
+    },
+    concurrent_limit: {
+      demand: false,
+      alias: 'c',
+      description: 'concurrent limit',
+      default: 5000
+    },
+    key: {
+      demand: false,
+      alias: 'k',
+      description: 'connection key',
+      default: 'peerjs'
+    },
+    sslkey: {
+      demand: false,
+      description: 'path to SSL key'
+    },
+    sslcert: {
+      demand: false,
+      description: 'path to SSL certificate'
+    },
+    port: {
+      demand: true,
+      alias: 'p',
+      description: 'port'
+    },
+    path: {
+      demand: false,
+      description: 'custom path',
+      default: '/'
+    },
+    allow_discovery: {
+      demand: false,
+      description: 'allow discovery of peers'
+    }
+  })
+  .boolean('allow_discovery')
+  .argv;
+
+process.on('uncaughtException', function (e) {
+  console.error('Error: ' + e);
+});
+
+if (opts.sslkey || opts.sslcert) {
+  if (opts.sslkey && opts.sslcert) {
+    opts.ssl = {
+      key: fs.readFileSync(path.resolve(opts.sslkey)),
+      cert: fs.readFileSync(path.resolve(opts.sslcert))
+    };
+
+    delete opts.sslkey;
+    delete opts.sslcert;
+  } else {
+    console.error('Warning: PeerServer will not run because either ' +
+      'the key or the certificate has not been provided.');
+    process.exit(1);
+  }
+}
+
+const userPath = opts.path;
+const server = PeerServer(opts, server => {
+  var host = server.address().address;
+  var port = server.address().port;
+
+  console.log(
+    'Started PeerServer on %s, port: %s, path: %s (v. %s)',
+    host, port, userPath || '/', version
+  );
+});
+
+server.on('connection', client => {
+  console.log(`Client connected: ${client.getId()}`);
+});
+
+server.on('disconnect', client => {
+  console.log(`Client disconnected: ${client.getId()}`);
+});

+ 15 - 5
config/index.js

@@ -1,5 +1,15 @@
-const config = require('./schema');
-
-config.validate({ allowed: 'strict' });
-
-module.exports = config;
+module.exports = {
+  host: '0.0.0.0',
+  port: 9000,
+  expire_timeout: 5000,
+  key: 'peerjs',
+  path: '/myapp',
+  concurrent_limit: 5000,
+  allow_discovery: false,
+  proxied: false,
+  cleanup_out_msgs: 1000,
+  ssl: {
+    key: '',
+    cert: ''
+  }
+};

+ 0 - 101
config/schema.js

@@ -1,101 +0,0 @@
-const convict = require('convict');
-
-module.exports = convict({
-  logger: {
-    level: {
-      doc: 'The log level. See log4js',
-      format: [
-        'ALL',
-        'MARK',
-        'TRACE',
-        'DEBUG',
-        'INFO',
-        'WARN',
-        'ERROR',
-        'FATAL',
-        'OFF'
-      ],
-      default: 'ERROR',
-      env: 'LOG_LEVEL',
-      arg: 'logLevel'
-    }
-  },
-  env: {
-    doc: 'The application environment.',
-    format: ['prod', 'dev', 'test'],
-    default: 'dev',
-    env: 'NODE_ENV'
-  },
-  host: {
-    doc: 'The host to bind.',
-    format: '*',
-    default: '0.0.0.0',
-    env: 'HOST',
-    arg: 'host'
-  },
-  port: {
-    doc: 'The port to bind.',
-    format: 'port',
-    default: 9000,
-    env: 'PORT',
-    arg: 'port'
-  },
-  expire_timeout: {
-    doc: 'The timeout before EXPIRE message send',
-    format: 'duration',
-    default: 5000,
-    arg: 'expireTimeout'
-  },
-  key: {
-    doc: 'The key to check incoming clients',
-    format: String,
-    default: 'peerjs',
-    env: 'APP_KEY',
-    arg: 'key'
-  },
-  path: {
-    doc: '',
-    format: String,
-    default: '/myapp',
-    env: 'APP_PATH',
-    arg: 'path'
-  },
-  concurrent_limit: {
-    doc: 'Max connections',
-    format: 'duration',
-    default: 5000,
-    arg: 'concurrentLimit'
-  },
-  allow_discovery: {
-    doc: 'Allow discovery of peers',
-    format: Boolean,
-    default: false,
-    arg: 'allowDiscovery'
-  },
-  proxied: {
-    doc: 'Set true if server running behind proxy',
-    format: Boolean,
-    default: false,
-    env: 'APP_PROXIED',
-    arg: 'proxied'
-  },
-  cleanup_out_msgs: {
-    doc: 'The period in ms to check expired messages',
-    format: 'duration',
-    default: 1000
-  },
-  ssl: {
-    key_path: {
-      doc: 'The path to the private key file',
-      format: String,
-      default: '',
-      arg: 'sslKeyPath'
-    },
-    cert_path: {
-      doc: 'The path to the cert file',
-      format: String,
-      default: '',
-      arg: 'sslCertPath'
-    }
-  }
-});

+ 7 - 5
package.json

@@ -3,6 +3,9 @@
   "version": "0.2.9",
   "description": "PeerJS server component",
   "main": "src/index.js",
+  "bin": {
+    "peerjs": "./bin/peerjs"
+  },
   "repository": {
     "type": "git",
     "url": "git://github.com/peers/peerjs-server.git"
@@ -11,19 +14,18 @@
   "license": "MIT",
   "scripts": {
     "test": "mocha test/**/*.js",
-    "start": "node ./src/index.js"
+    "start": "bin/peerjs --port ${PORT:=9000}"
   },
   "dependencies": {
     "body-parser": "^1.18.3",
-    "convict": "^4.4.1",
     "cors": "~2.8.4",
     "express": "^4.16.3",
-    "log4js": "^4.1.0",
-    "ws": "6.0.0"
+    "ws": "6.0.0",
+    "optimist": "~0.6.1"
   },
   "devDependencies": {
-    "mocha": "^6.0.2",
     "chai": "^4.2.0",
+    "mocha": "^6.1.2",
     "semistandard": "^13.0.1",
     "sinon": "^7.3.1"
   },

+ 14 - 9
src/api/index.js

@@ -1,18 +1,23 @@
 const express = require('express');
 const cors = require('cors');
 const bodyParser = require('body-parser');
-const authMiddleware = require('./middleware/auth');
 const publicContent = require('../../app.json');
 
-const app = module.exports = express.Router();
+module.exports = ({ config, realm, messageHandler }) => {
+  const authMiddleware = require('./middleware/auth')({ config, realm });
 
-const jsonParser = bodyParser.json();
+  const app = express.Router();
 
-app.use(cors());
+  const jsonParser = bodyParser.json();
 
-app.get('/', (req, res, next) => {
-  res.send(publicContent);
-});
+  app.use(cors());
 
-app.use('/:key', require('./v1/public'));
-app.use('/:key/:id/:token', authMiddleware, jsonParser, require('./v1/calls'));
+  app.get('/', (req, res, next) => {
+    res.send(publicContent);
+  });
+
+  app.use('/:key', require('./v1/public')({ config, realm }));
+  app.use('/:key/:id/:token', authMiddleware, jsonParser, require('./v1/calls')({ realm, messageHandler }));
+
+  return app;
+};

+ 2 - 4
src/api/middleware/auth/index.js

@@ -1,11 +1,9 @@
-const config = require('../../../../config');
-const realm = require('../../../services/realm');
 const { Errors } = require('../../../enums');
 
-module.exports = (req, res, next) => {
+module.exports = ({ config, realm }) => (req, res, next) => {
   const { id, token, key } = req.params;
 
-  if (key !== config.get('key')) {
+  if (key !== config.key) {
     return res.status(401).send(Errors.INVALID_KEY);
   }
 

+ 23 - 21
src/api/v1/calls/index.js

@@ -1,34 +1,36 @@
 const express = require('express');
-const realm = require('../../../services/realm');
-const messageHandler = require('../../../messageHandler');
 
-const app = module.exports = express.Router();
+module.exports = ({ realm, messageHandler }) => {
+  const app = express.Router();
 
-const handle = (req, res, next) => {
-  const { id } = req.params;
+  const handle = (req, res, next) => {
+    const { id } = req.params;
 
-  if (!id) return next();
+    if (!id) return next();
 
-  const client = realm.getClientById(id);
+    const client = realm.getClientById(id);
 
-  const { type, dst, payload } = req.body;
+    const { type, dst, payload } = req.body;
 
-  const message = {
-    type,
-    src: id,
-    dst,
-    payload
-  };
+    const message = {
+      type,
+      src: id,
+      dst,
+      payload
+    };
 
-  messageHandler(client, message);
+    messageHandler(client, message);
 
-  res.sendStatus(200);
-};
+    res.sendStatus(200);
+  };
 
-app.post('/offer', handle);
+  app.post('/offer', handle);
 
-app.post('/candidate', handle);
+  app.post('/candidate', handle);
 
-app.post('/answer', handle);
+  app.post('/answer', handle);
 
-app.post('/leave', handle);
+  app.post('/leave', handle);
+
+  return app;
+};

+ 18 - 16
src/api/v1/public/index.js

@@ -1,22 +1,24 @@
 const express = require('express');
-const realm = require('../../../services/realm');
-const config = require('../../../../config');
 
-const app = module.exports = express.Router();
+module.exports = ({ config, realm }) => {
+  const app = express.Router();
 
-// Retrieve guaranteed random ID.
-app.get('/id', (req, res, next) => {
-  res.contentType = 'text/html';
-  res.send(realm.generateClientId());
-});
+  // Retrieve guaranteed random ID.
+  app.get('/id', (req, res, next) => {
+    res.contentType = 'text/html';
+    res.send(realm.generateClientId());
+  });
 
-// Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
-app.get('/peers', (req, res, next) => {
-  if (config.get('allow_discovery')) {
-    const clientsIds = realm.getClientsIds();
+  // Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
+  app.get('/peers', (req, res, next) => {
+    if (config.allow_discovery) {
+      const clientsIds = realm.getClientsIds();
 
-    return res.send(clientsIds);
-  }
+      return res.send(clientsIds);
+    }
 
-  res.sendStatus(401);
-});
+    res.sendStatus(401);
+  });
+
+  return app;
+};

+ 98 - 64
src/index.js

@@ -1,94 +1,128 @@
 const express = require('express');
 const http = require('http');
 const https = require('https');
-const fs = require('fs');
 
 const config = require('../config');
 const WebSocketServer = require('./services/webSocketServer');
-const logger = require('./services/logger');
-const realm = require('./services/realm');
-const { startMessagesExpiration } = require('./services/messagesExpire');
-const api = require('./api');
-const messageHandler = require('./messageHandler');
+const Realm = require('./models/realm');
+
+const init = ({ app, server, options }) => {
+  const config = options;
+  const realm = new Realm();
+  const messageHandler = require('./messageHandler')({ realm });
+  const api = require('./api')({ config, realm, messageHandler });
+  const { startMessagesExpiration } = require('./services/messagesExpire')({ realm, config });
+
+  app.use(options.path, api);
+
+  const wss = new WebSocketServer({
+    server,
+    realm,
+    config: {
+      ...config,
+      path: app.mountpath
+    }
+  });
 
-process.on('uncaughtException', (e) => {
-  logger.error('Error: ' + e);
-});
+  wss.on('connection', client => {
+    const messageQueue = realm.getMessageQueueById(client.getId());
 
-// parse config
-let path = config.get('path');
+    if (messageQueue) {
+      let message;
+      while (message = messageQueue.readMessage()) {
+        messageHandler(client, message);
+      }
+      realm.clearMessageQueue(client.getId());
+    }
 
-if (path[0] !== '/') {
-  path = '/' + path;
-}
+    app.emit('connection', client);
+  });
 
-if (path[path.length - 1] !== '/') {
-  path += '/';
-}
+  wss.on('message', (client, message) => {
+    app.emit('message', client, message);
+    messageHandler(client, message);
+  });
 
-const app = express();
+  wss.on('close', client => {
+    app.emit('disconnect', client);
+  });
 
-if (config.get('proxied')) {
-  app.set('trust proxy', config.get('proxied'));
-}
+  wss.on('error', error => {
+    app.emit('error', error);
+  });
 
-let server;
+  startMessagesExpiration();
+};
 
-if (config.get('ssl.key_path') && config.get('ssl.cert_path')) {
-  const keyPath = config.get('ssl.key_path');
-  const certPath = config.get('ssl.cert_path');
+function ExpressPeerServer (server, options) {
+  const app = express();
 
-  const opts = {
-    key: fs.readFileSync(path.resolve(keyPath)),
-    cert: fs.readFileSync(path.resolve(certPath))
+  options = {
+    ...config,
+    ...options
   };
 
-  server = https.createServer(opts, app);
-} else {
-  server = http.createServer(app);
+  if (options.proxied) {
+    app.set('trust proxy', options.proxied);
+  }
+
+  app.on('mount', () => {
+    if (!server) {
+      throw new Error('Server is not passed to constructor - ' +
+        'can\'t start PeerServer');
+    }
+
+    init({ app, server, options });
+  });
+
+  return app;
 }
 
-app.use(path, api);
+function PeerServer (options = {}, callback) {
+  const app = express();
 
-const wss = new WebSocketServer(server, app.mountpath);
+  options = {
+    ...config,
+    ...options
+  };
 
-wss.on('connection', client => {
-  const messageQueue = realm.getMessageQueueById(client.getId());
+  let path = options.path;
+  const port = options.port;
 
-  if (messageQueue) {
-    let message;
-    while (message = messageQueue.readMessage()) {
-      messageHandler(client, message);
-    }
-    realm.clearMessageQueue(client.getId());
-  }
+  delete options.path;
 
-  logger.info(`client ${client.getId()} was connected`);
-});
+  if (path[0] !== '/') {
+    path = '/' + path;
+  }
 
-wss.on('message', (client, message) => {
-  messageHandler(client, message);
-});
+  if (path[path.length - 1] !== '/') {
+    path += '/';
+  }
 
-wss.on('close', client => {
-  logger.info(`client ${client.getId()} was disconnected`);
-});
+  let server;
 
-wss.on('error', error => {
-  logger.error(error);
-});
+  if (options.ssl && options.ssl.key && options.ssl.cert) {
+    server = https.createServer(options.ssl, app);
+    delete options.ssl;
+  } else {
+    server = http.createServer(app);
+  }
 
-const port = config.get('port');
-const host = config.get('host');
+  const peerjs = ExpressPeerServer(server, options);
+  app.use(path, peerjs);
 
-server.listen(port, host, () => {
-  const host = server.address().address;
-  const port = server.address().port;
+  if (callback) {
+    server.listen(port, () => {
+      callback(server);
+    });
+  } else {
+    server.listen(port);
+  }
 
-  logger.info(
-    'Started PeerServer on %s, port: %s',
-    host, port
-  );
+  return peerjs;
+}
 
-  startMessagesExpiration();
-});
+exports = module.exports = {
+  ExpressPeerServer: ExpressPeerServer,
+  PeerServer: PeerServer
+};

+ 2 - 11
src/messageHandler/handlers/transmission/index.js

@@ -1,8 +1,6 @@
-const realm = require('../../../services/realm');
-const logger = require('../../../services/logger');
 const { MessageType } = require('../../../enums');
 
-const handler = (client, message) => {
+module.exports = ({ realm }) => (client, message) => {
   const type = message.type;
   const srcId = message.src;
   const dstId = message.dst;
@@ -12,8 +10,6 @@ const handler = (client, message) => {
   // User is connected!
   if (destinationClient) {
     try {
-      logger.debug(type, 'from', srcId, 'to', dstId);
-
       if (destinationClient.socket) {
         const data = JSON.stringify(message);
 
@@ -23,7 +19,6 @@ const handler = (client, message) => {
         throw new Error('Peer dead');
       }
     } catch (e) {
-      logger.error(e);
       // This happens when a peer disconnects without closing connections and
       // the associated WebSocket has not closed.
       // Tell other side to stop trying.
@@ -33,7 +28,7 @@ const handler = (client, message) => {
         realm.removeClientById(destinationClient.getId());
       }
 
-      handler(client, {
+      module.exports({ realm })(client, {
         type: MessageType.LEAVE,
         src: dstId,
         dst: srcId
@@ -43,10 +38,8 @@ const handler = (client, message) => {
     // Wait for this client to connect/reconnect (XHR) for important
     // messages.
     if (type !== MessageType.LEAVE && type !== MessageType.EXPIRE && dstId) {
-      logger.debug(`[transmission] dst client ${dstId} not found, add msg ${type} to queue`);
       realm.addMessageToQueue(dstId, message);
     } else if (type === MessageType.LEAVE && !dstId) {
-      logger.debug(`[transmission] remove client ${srcId}`);
       realm.removeClientById(srcId);
     } else {
       // Unavailable destination specified with message LEAVE or EXPIRE
@@ -54,5 +47,3 @@ const handler = (client, message) => {
     }
   }
 };
-
-module.exports = handler;

+ 38 - 30
src/messageHandler/index.js

@@ -1,42 +1,50 @@
-const logger = require('../services/logger');
 const { MessageType } = require('../enums');
-const transmissionHandler = require('./handlers/transmission');
 
-const handlers = {};
+class MessageHandlers {
+  constructor ({ realm }) {
+    this.handlers = {};
+  }
 
-const registerHandler = (messageType, handler) => {
-  logger.debug(`[MSGHANDLER] register handler for ${messageType}`);
-  handlers[messageType] = handler;
-};
+  registerHandler (messageType, handler) {
+    this.handlers[messageType] = handler;
+  }
 
-module.exports = (client, message) => {
-  const { type } = message;
+  handle (client, message) {
+    const { type } = message;
 
-  const handler = handlers[type];
+    const handler = this.handlers[type];
 
-  if (!handler) {
-    return logger.error(`[MSGHANDLER] Message unrecognized:${type}`);
+    if (!handler) {
+      return;
+    }
+
+    handler(client, message);
   }
+}
+module.exports = ({ realm }) => {
+  const transmissionHandler = require('./handlers/transmission')({ realm });
 
-  handler(client, message);
-};
+  const messageHandlers = new MessageHandlers({ realm });
 
-const handleTransmission = (client, message) => {
-  transmissionHandler(client, {
-    type: message.type,
-    src: message.src,
-    dst: message.dst,
-    payload: message.payload
-  });
-};
+  const handleTransmission = (client, message) => {
+    transmissionHandler(client, {
+      type: message.type,
+      src: message.src,
+      dst: message.dst,
+      payload: message.payload
+    });
+  };
 
-const handleHeartbeat = (client, message) => {
+  const handleHeartbeat = (client, message) => {
 
-};
+  };
 
-registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
-registerHandler(MessageType.OFFER, handleTransmission);
-registerHandler(MessageType.ANSWER, handleTransmission);
-registerHandler(MessageType.CANDIDATE, handleTransmission);
-registerHandler(MessageType.LEAVE, handleTransmission);
-registerHandler(MessageType.EXPIRE, handleTransmission);
+  messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
+  messageHandlers.registerHandler(MessageType.OFFER, handleTransmission);
+  messageHandlers.registerHandler(MessageType.ANSWER, handleTransmission);
+  messageHandlers.registerHandler(MessageType.CANDIDATE, handleTransmission);
+  messageHandlers.registerHandler(MessageType.LEAVE, handleTransmission);
+  messageHandlers.registerHandler(MessageType.EXPIRE, handleTransmission);
+
+  return (client, message) => messageHandlers.handle(client, message);
+};

+ 0 - 8
src/services/logger/index.js

@@ -1,8 +0,0 @@
-const log4js = require('log4js');
-const config = require('../../../config');
-
-const logger = log4js.getLogger();
-
-logger.level = config.get('logger.level');
-
-module.exports = logger;

+ 45 - 48
src/services/messagesExpire/index.js

@@ -1,67 +1,64 @@
-const config = require('../../../config');
 const messageHandler = require('../../messageHandler');
 const { MessageType } = require('../../enums');
-const realm = require('../realm');
-const logger = require('../logger');
 
-const pruneOutstanding = () => {
-  const destinationClientsIds = realm._messageQueues.keys();
+module.exports = ({ realm, config }) => {
+  const pruneOutstanding = () => {
+    const destinationClientsIds = realm._messageQueues.keys();
 
-  const now = new Date().getTime();
-  const maxDiff = config.get('expire_timeout');
+    const now = new Date().getTime();
+    const maxDiff = config.expire_timeout;
 
-  const seen = {};
+    const seen = {};
 
-  for (const destinationClientId of destinationClientsIds) {
-    const messageQueue = realm.getMessageQueueById(destinationClientId);
-    const lastReadDiff = now - messageQueue.getLastReadAt();
+    for (const destinationClientId of destinationClientsIds) {
+      const messageQueue = realm.getMessageQueueById(destinationClientId);
+      const lastReadDiff = now - messageQueue.getLastReadAt();
 
-    if (lastReadDiff < maxDiff) continue;
+      if (lastReadDiff < maxDiff) continue;
 
-    const messages = messageQueue.getMessages();
+      const messages = messageQueue.getMessages();
 
-    for (const message of messages) {
-      if (!seen[message.src]) {
-        messageHandler(null, {
-          type: MessageType.EXPIRE,
-          src: message.dst,
-          dst: message.src
-        });
-        seen[message.src] = true;
+      for (const message of messages) {
+        if (!seen[message.src]) {
+          messageHandler(null, {
+            type: MessageType.EXPIRE,
+            src: message.dst,
+            dst: message.src
+          });
+          seen[message.src] = true;
+        }
       }
-    }
-
-    realm.clearMessageQueue(destinationClientId);
 
-    logger.trace(`[MSGSEXPIRE] mq ${destinationClientId} was cleared`);
-  }
-};
+      realm.clearMessageQueue(destinationClientId);
+    }
+  };
 
-let timeoutId;
+  let timeoutId;
 
-const startMessagesExpiration = () => {
-  if (timeoutId) {
-    clearTimeout(timeoutId);
-  }
+  const startMessagesExpiration = () => {
+    if (timeoutId) {
+      clearTimeout(timeoutId);
+    }
 
-  // Clean up outstanding messages
-  timeoutId = setTimeout(() => {
-    pruneOutstanding();
+    // Clean up outstanding messages
+    timeoutId = setTimeout(() => {
+      pruneOutstanding();
 
-    timeoutId = null;
+      timeoutId = null;
 
-    startMessagesExpiration();
-  }, config.get('cleanup_out_msgs'));
-};
+      startMessagesExpiration();
+    }, config.cleanup_out_msgs);
+  };
 
-const stopMessagesExpiration = () => {
-  if (timeoutId) {
-    clearTimeout(timeoutId);
-    timeoutId = null;
-  }
-};
+  const stopMessagesExpiration = () => {
+    if (timeoutId) {
+      clearTimeout(timeoutId);
+      timeoutId = null;
+    }
+  };
 
-module.exports = {
-  startMessagesExpiration,
-  stopMessagesExpiration
+  return {
+    startMessagesExpiration,
+    stopMessagesExpiration
+  };
 };

+ 0 - 3
src/services/realm/index.js

@@ -1,3 +0,0 @@
-const Realm = require('../../models/realm');
-
-module.exports = new Realm();

+ 10 - 17
src/services/webSocketServer/index.js

@@ -1,22 +1,19 @@
 const WSS = require('ws').Server;
 const url = require('url');
 const EventEmitter = require('events');
-const logger = require('../logger');
 const { MessageType, Errors } = require('../../enums');
-const config = require('../../../config');
-const realm = require('../realm');
 const Client = require('../../models/client');
 
 class WebSocketServer extends EventEmitter {
-  constructor (server) {
+  constructor ({ server, realm, config }) {
     super();
     this.setMaxListeners(0);
+    this.realm = realm;
+    this.config = config;
 
-    let path = config.get('path');
+    let path = this.config.path;
     path = path + (path[path.length - 1] !== '/' ? '/' : '') + 'peerjs';
 
-    logger.info(`ws opened on path:${path}`);
-
     this._wss = new WSS({ path, server });
 
     this._wss.on('connection', (socket, req) => this._onSocketConnection(socket, req));
@@ -32,11 +29,11 @@ class WebSocketServer extends EventEmitter {
       return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS);
     }
 
-    if (key !== config.get('key')) {
+    if (key !== this.config.key) {
       return this._sendErrorAndClose(socket, Errors.INVALID_KEY);
     }
 
-    const client = realm.getClientById(id);
+    const client = this.realm.getClientById(id);
 
     if (client) {
       if (token !== client.getToken()) {
@@ -56,21 +53,20 @@ class WebSocketServer extends EventEmitter {
   }
 
   _onSocketError (error) {
-    logger.debug(`[WSS] on error:${error}`);
     // handle error
     this.emit('error', error);
   }
 
   _registerClient ({ socket, id, token }) {
     // Check concurrent limit
-    const clientsCount = realm.getClientsIds().length;
+    const clientsCount = this.realm.getClientsIds().length;
 
-    if (clientsCount >= config.get('concurrent_limit')) {
+    if (clientsCount >= this.config.concurrent_limit) {
       return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED);
     }
 
     const newClient = new Client({ id, token });
-    realm.setClient(newClient, id);
+    this.realm.setClient(newClient, id);
     socket.send(JSON.stringify({ type: MessageType.OPEN }));
 
     this._configureWS(socket, newClient);
@@ -81,10 +77,8 @@ class WebSocketServer extends EventEmitter {
 
     // Cleanup after a socket closes.
     socket.on('close', () => {
-      logger.info('Socket closed:', client.getId());
-
       if (client.socket === socket) {
-        realm.removeClientById(client.getId());
+        this.realm.removeClientById(client.getId());
         this.emit('close', client);
       }
     });
@@ -98,7 +92,6 @@ class WebSocketServer extends EventEmitter {
 
         this.emit('message', client, message);
       } catch (e) {
-        logger.error('Invalid message', data);
         throw e;
       }
     });