浏览代码

Добавлен WebSocketServer и контроллер для него

Book Pauk 5 年之前
父节点
当前提交
3199af570d
共有 5 个文件被更改,包括 125 次插入6 次删除
  1. 111 0
      server/controllers/WebSocketController.js
  2. 1 0
      server/controllers/WorkerController.js
  3. 1 0
      server/controllers/index.js
  4. 10 5
      server/index.js
  5. 2 1
      server/routes.js

+ 111 - 0
server/controllers/WebSocketController.js

@@ -0,0 +1,111 @@
+const WebSocket = require ('ws');
+const WorkerState = require('../core/WorkerState');//singleton
+const utils = require('../core/utils');
+
+const cleanPeriod = 1*60*1000;//1 минута
+const closeSocketOnIdle = 5*60*1000;//5 минут
+
+class WebSocketController {
+    constructor(wss, config) {
+        this.config = config;
+        this.workerState = new WorkerState();
+
+        this.wss = wss;
+
+        wss.on('connection', (ws) => {
+            ws.on('message', (message) => {
+                this.onMessage(ws, message);
+            });
+        });
+
+        setTimeout(() => { this.periodicClean(); }, cleanPeriod);
+    }
+
+    periodicClean() {
+        try {
+            const now = Date.now();
+            this.wss.clients.forEach((ws) => {
+                if (!ws.lastActivity || now - ws.lastActivity > closeSocketOnIdle - 50) {
+                    ws.terminate();
+                }
+            });
+        } finally {
+            setTimeout(() => { this.periodicClean(); }, cleanPeriod);
+        }
+    }
+
+    async onMessage(ws, message) {
+        let req = {};
+        try {
+            ws.lastActivity = Date.now();
+            req = JSON.parse(message);
+            switch (req.action) {
+                case 'test':
+                    this.test(req, ws); break;
+                case 'worker-get-state':
+                    this.workerGetState(req, ws); break;
+                case 'worker-get-state-finish':
+                    this.workerGetStateFinish(req, ws); break;
+
+                default:
+                    throw new Error(`Action not found: ${req.action}`);
+            }
+        } catch (e) {
+            this.send({error: e.message}, req, ws);
+        }
+    }
+
+    send(res, req, ws) {
+        if (ws.readyState == WebSocket.OPEN) {
+            ws.lastActivity = Date.now();
+            let r = Object.assign({}, res);
+            if (req.requestId)
+                r.requestId = req.requestId;
+            ws.send(JSON.stringify(r));
+        }
+    }
+
+    //Actions
+    async test(req, ws) {
+        this.send({message: 'Liberama project is awesome'}, req, ws);
+    }
+
+    async workerGetState(req, ws) {
+        if (!req.workerId)
+            throw new Error(`key 'workerId' is wrong`);
+
+        const state = this.workerState.getState(req.workerId);
+        this.send((state ? state : {}), req, ws);
+    }
+
+    async workerGetStateFinish(req, ws) {
+        if (!req.workerId)
+            throw new Error(`key 'workerId' is wrong`);
+
+        const refreshPause = 200;
+        let i = 0;
+        let state = {};
+        while (1) {// eslint-disable-line no-constant-condition
+            const prevProgress = state.progress || -1;
+            const prevState = state.state || '';
+            state = this.workerState.getState(req.workerId);
+
+            this.send((state ? state : {}), req, ws);
+            if (!state) break;
+
+            if (state.state != 'finish' && state.state != 'error')
+                await utils.sleep(refreshPause);
+            else
+                break;
+
+            i++;
+            if (i > 2*60*1000/refreshPause) {//2 мин ждем телодвижений воркера
+                this.send({state: 'error', error: 'Время ожидания процесса истекло'}, req, ws);
+            }
+            i = (prevProgress != state.progress || prevState != state.state ? 1 : i);
+        }        
+    }
+
+}
+
+module.exports = WebSocketController;

+ 1 - 0
server/controllers/WorkerController.js

@@ -26,6 +26,7 @@ class WorkerController extends BaseController {
         return false;
     }
 
+    //TODO: удалить бесполезную getStateFinish
     async getStateFinish(req, res) {
         const request = req.body;
         let error = '';

+ 1 - 0
server/controllers/index.js

@@ -2,4 +2,5 @@ module.exports = {
     MiscController: require('./MiscController'),
     ReaderController: require('./ReaderController'),
     WorkerController: require('./WorkerController'),
+    WebSocketController: require('./WebSocketController'),
 }

+ 10 - 5
server/index.js

@@ -4,6 +4,8 @@ const path = require('path');
 const argv = require('minimist')(process.argv.slice(2));
 const express = require('express');
 const compression = require('compression');
+const http = require('http');
+const WebSocket = require ('ws');
 
 async function init() {
     //config
@@ -46,10 +48,13 @@ async function main() {
     const config = new (require('./config'))().config;//singleton
 
     //servers
-    for (let server of config.servers) {
-        if (server.mode !== 'none') {
+    for (let serverCfg of config.servers) {
+        if (serverCfg.mode !== 'none') {
             const app = express();
-            const serverConfig = Object.assign({}, config, server);
+            const server = http.createServer(app);
+            const wss = new WebSocket.Server({ server, maxPayload: 10*1024*1024 });
+
+            const serverConfig = Object.assign({}, config, serverCfg);
 
             let devModule = undefined;
             if (serverConfig.branch == 'development') {
@@ -73,7 +78,7 @@ async function main() {
                 }               
             }));
 
-            require('./routes').initRoutes(app, serverConfig);
+            require('./routes').initRoutes(app, wss, serverConfig);
 
             if (devModule) {
                 devModule.logErrors(app);
@@ -84,7 +89,7 @@ async function main() {
                 });
             }
 
-            app.listen(serverConfig.port, serverConfig.ip, function() {
+            server.listen(serverConfig.port, serverConfig.ip, function() {
                 log(`Server-${serverConfig.serverName} is ready on ${serverConfig.ip}:${serverConfig.port}, mode: ${serverConfig.mode}`);
             });
         }

+ 2 - 1
server/routes.js

@@ -2,10 +2,11 @@ const c = require('./controllers');
 const utils = require('./core/utils');
 const multer = require('multer');
 
-function initRoutes(app, config) {
+function initRoutes(app, wss, config) {
     const misc = new c.MiscController(config);
     const reader = new c.ReaderController(config);
     const worker = new c.WorkerController(config);
+    new c.WebSocketController(wss, config);
 
     //access
     const [aAll, aNormal, aSite, aReader, aOmnireader] = // eslint-disable-line no-unused-vars