Sfoglia il codice sorgente

Merge branch 'release/0.8.2-3'

Book Pauk 5 anni fa
parent
commit
bfa315c68b

+ 167 - 0
client/api/WebSocketConnection.js

@@ -0,0 +1,167 @@
+const cleanPeriod = 60*1000;//1 минута
+
+class WebSocketConnection {
+    //messageLifeTime в минутах (cleanPeriod)
+    constructor(messageLifeTime = 5) {
+        this.ws = null;
+        this.timer = null;
+        this.listeners = [];
+        this.messageQueue = [];
+        this.messageLifeTime = messageLifeTime;
+        this.requestId = 0;
+    }
+
+    addListener(listener) {
+        if (this.listeners.indexOf(listener) < 0)
+            this.listeners.push(Object.assign({regTime: Date.now()}, listener));
+    }
+
+    //рассылаем сообщение и удаляем те обработчики, которые его получили
+    emit(mes, isError) {
+        const len = this.listeners.length;
+        if (len > 0) {
+            let newListeners = [];
+            for (const listener of this.listeners) {
+                let emitted = false;
+                if (isError) {
+                    if (listener.onError)
+                        listener.onError(mes);
+                    emitted = true;
+                } else {
+                    if (listener.onMessage) {
+                        if (listener.requestId) {
+                            if (listener.requestId === mes.requestId) {
+                                listener.onMessage(mes);
+                                emitted = true;
+                            }
+                        } else {
+                            listener.onMessage(mes);
+                            emitted = true;
+                        }
+                    } else {
+                        emitted = true;
+                    }
+                }
+
+                if (!emitted)
+                    newListeners.push(listener);
+            }
+            this.listeners = newListeners;
+        }
+        
+        return this.listeners.length != len;
+    }
+
+    open(url) {
+        return new Promise((resolve, reject) => {
+            if (this.ws && this.ws.readyState == WebSocket.OPEN) {
+                resolve(this.ws);
+            } else {
+                url = url || `ws://${window.location.host}/ws`;
+                
+                this.ws = new WebSocket(url);
+
+                if (this.timer) {
+                    clearTimeout(this.timer);
+                }
+                this.timer = setTimeout(() => { this.periodicClean(); }, cleanPeriod);
+
+                let resolved = false;
+                this.ws.onopen = (e) => {
+                    resolved = true;
+                    resolve(e);
+                };
+
+                this.ws.onmessage = (e) => {
+                    try {
+                        const mes = JSON.parse(e.data);
+                        this.messageQueue.push({regTime: Date.now(), mes});
+
+                        let newMessageQueue = [];
+                        for (const message of this.messageQueue) {
+                            if (!this.emit(message.mes)) {
+                                newMessageQueue.push(message);
+                            }
+                        }
+
+                        this.messageQueue = newMessageQueue;
+                    } catch (e) {
+                        this.emit(e.message, true);
+                    }
+                };
+
+                this.ws.onerror = (e) => {
+                    this.emit(e.message, true);
+                    if (!resolved)
+                        reject(e);
+                };
+            }
+        });
+    }
+
+    //timeout в минутах (cleanPeriod)
+    message(requestId, timeout = 2) {
+        return new Promise((resolve, reject) => {
+            this.addListener({
+                requestId,
+                timeout,
+                onMessage: (mes) => {
+                    resolve(mes);
+                },
+                onError: (e) => {
+                    reject(e);
+                }
+            });
+        });
+    }
+
+    send(req) {
+        if (this.ws && this.ws.readyState == WebSocket.OPEN) {
+            const requestId = ++this.requestId;
+            this.ws.send(JSON.stringify(Object.assign({requestId}, req)));
+            return requestId;
+        } else {
+            throw new Error('WebSocket connection is not ready');
+        }
+    }
+
+    close() {
+        if (this.ws && this.ws.readyState == WebSocket.OPEN) {
+            this.ws.close();
+        }
+    }
+
+    periodicClean() {
+        try {
+            this.timer = null;
+
+            const now = Date.now();
+            //чистка listeners
+            let newListeners = [];
+            for (const listener of this.listeners) {
+                if (now - listener.regTime < listener.timeout*cleanPeriod - 50) {
+                    newListeners.push(listener);
+                } else {
+                    if (listener.onError)
+                        listener.onError('Время ожидания ответа истекло');
+                }
+            }
+            this.listeners = newListeners;
+
+            //чистка messageQueue
+            let newMessageQueue = [];
+            for (const message of this.messageQueue) {
+                if (now - message.regTime < this.messageLifeTime*cleanPeriod - 50) {
+                    newMessageQueue.push(message);
+                }
+            }
+            this.messageQueue = newMessageQueue;
+        } finally {
+            if (this.ws.readyState == WebSocket.OPEN) {
+                this.timer = setTimeout(() => { this.periodicClean(); }, cleanPeriod);
+            }
+        }
+    }
+}
+
+export default WebSocketConnection;

+ 45 - 29
client/api/reader.js

@@ -1,4 +1,6 @@
 import axios from 'axios';
+import * as utils from '../share/utils';
+import WebSocketConnection from './WebSocketConnection';
 
 const api = axios.create({
     baseURL: '/api/reader'
@@ -9,44 +11,58 @@ const workerApi = axios.create({
 });
 
 class Reader {
+    constructor() {
+        this.wsc = new WebSocketConnection();
+    }
 
     async getStateFinish(workerId, callback) {
         if (!callback) callback = () => {};
 
-        //присылается текст, состоящий из json-объектов state каждые 300ms, с разделителем splitter между ними
-        const splitter = '-- aod2t5hDXU32bUFyqlFE next status --';
-        let lastIndex = 0;
-        let response = await workerApi.post('/get-state-finish', {workerId}, {
-            onDownloadProgress: progress => {
-                //небольая оптимизация, вместо простого responseText.split
-                const xhr = progress.target;
-                let currIndex = xhr.responseText.length;
-                if (lastIndex == currIndex)
-                    return; 
-                const last = xhr.responseText.substring(lastIndex, currIndex);
-                lastIndex = currIndex;
-
-                //быстрее будет last.split
-                const res = last.split(splitter).pop();
-                if (res) {
-                    try {
-                        callback(JSON.parse(res));
-                    } catch (e) {
-                        //
-                    }
+        let response = {};
+
+        try {
+            const wsc = this.wsc;
+            await wsc.open();
+            const requestId = wsc.send({action: 'worker-get-state-finish', workerId});
+
+            while (1) {// eslint-disable-line no-constant-condition
+                response = await wsc.message(requestId);
+                callback(response);
+
+                if (response.state == 'finish' || response.state == 'error') {
+                    break;
                 }
             }
-        });
+            return response;
+        } catch (e) {
+            //
+            console.error(e);
+        }
+
+        //с WebSocket проблема, проверяем по http
+        const refreshPause = 500;
+        let i = 0;
+        response = {};
+        while (1) {// eslint-disable-line no-constant-condition
+            const prevProgress = response.progress || 0;
+            const prevState = response.state || 0;
+            response = await workerApi.post('/get-state', {workerId});
+            response = response.data;
+            callback(response);
+
+            if (response.state == 'finish' || response.state == 'error') {
+                break;
+            }
 
-        //берем последний state
-        response = response.data.split(splitter).pop();
+            if (i > 0)
+                await utils.sleep(refreshPause);
 
-        if (response) {
-            try {
-                response = JSON.parse(response);
-            } catch (e) {
-                response = false;
+            i++;
+            if (i > 120*1000/refreshPause) {//2 мин ждем телодвижений воркера
+                throw new Error('Слишком долгое время ожидания');
             }
+            //проверка воркера
+            i = (prevProgress != response.progress || prevState != response.state ? 1 : i);
         }
 
         return response;

+ 1 - 1
client/components/Reader/share/bookManager.js

@@ -464,7 +464,7 @@ class BookManager {
 
     addEventListener(listener) {
         if (this.eventListeners.indexOf(listener) < 0)
-            this.eventListeners.push(listener);        
+            this.eventListeners.push(listener);
     }
 
     removeEventListener(listener) {

+ 7 - 0
docs/omnireader/omnireader

@@ -18,6 +18,13 @@ server {
     proxy_pass http://127.0.0.1:44081;
   }
 
+  location /ws {
+    proxy_pass http://127.0.0.1:44081;
+    proxy_http_version 1.1;
+    proxy_set_header Upgrade $http_upgrade;
+    proxy_set_header Connection "upgrade";
+  }
+
   location / {
     root /home/liberama/public;
 

+ 58 - 0
docs/omnireader/omnireader_http

@@ -0,0 +1,58 @@
+server {
+  listen 80;
+  server_name omnireader.ru;
+
+  client_max_body_size 50m;
+
+  gzip on;
+  gzip_min_length 1024;
+  gzip_proxied expired no-cache no-store private auth;
+  gzip_types *;
+
+  location /api {
+    proxy_pass http://127.0.0.1:44081;
+  }
+
+  location /ws {
+    proxy_pass http://127.0.0.1:44081;
+    proxy_http_version 1.1;
+    proxy_set_header Upgrade $http_upgrade;
+    proxy_set_header Connection "upgrade";
+  }
+
+  location / {
+    root /home/liberama/public;
+
+    location /tmp {
+      add_header Content-Type text/xml;
+      add_header Content-Encoding gzip;
+    }
+
+    location ~* \.(?:manifest|appcache|html)$ {
+      expires -1;
+    }
+  }
+}
+
+server {
+  listen 80;
+  server_name old.omnireader.ru;
+
+  client_max_body_size 50m;
+
+  gzip on;
+  gzip_min_length 1024;
+  gzip_proxied expired no-cache no-store private auth;
+  gzip_types *;
+
+  root /home/oldreader;
+
+  index index.html;
+
+  # Обработка php файлов с помощью fpm
+  location ~ \.php$ { 
+    try_files $uri =404; 
+    include /etc/nginx/fastcgi.conf;
+    fastcgi_pass unix:/run/php/php7.2-fpm.sock;
+  }
+}

+ 5 - 0
package-lock.json

@@ -13075,6 +13075,11 @@
         "mkdirp": "^0.5.1"
       }
     },
+    "ws": {
+      "version": "7.2.1",
+      "resolved": "https://registry.npmjs.org/ws/-/ws-7.2.1.tgz",
+      "integrity": "sha512-sucePNSafamSKoOqoNfBd8V0StlkzJKL2ZAhGQinCfNQ+oacw+Pk7lcdAElecBF2VkLNZRiIb5Oi1Q5lVUVt2A=="
+    },
     "xml2js": {
       "version": "0.4.23",
       "resolved": "https://registry.npmjs.org/xml2js/-/xml2js-0.4.23.tgz",

+ 1 - 0
package.json

@@ -85,6 +85,7 @@
     "vuex": "^3.1.1",
     "vuex-persistedstate": "^2.5.4",
     "webdav": "^2.10.1",
+    "ws": "^7.2.1",
     "zip-stream": "^2.1.2"
   }
 }

+ 111 - 0
server/controllers/WebSocketController.js

@@ -0,0 +1,111 @@
+const WebSocket = require ('ws');
+const WorkerState = require('../core/WorkerState');//singleton
+const utils = require('../core/utils');
+
+const cleanPeriod = 1*60*1000;//1 минута
+const closeSocketOnIdle = 5*60*1000;//5 минут
+
+class WebSocketController {
+    constructor(wss, config) {
+        this.config = config;
+        this.workerState = new WorkerState();
+
+        this.wss = wss;
+
+        wss.on('connection', (ws) => {
+            ws.on('message', (message) => {
+                this.onMessage(ws, message);
+            });
+        });
+
+        setTimeout(() => { this.periodicClean(); }, cleanPeriod);
+    }
+
+    periodicClean() {
+        try {
+            const now = Date.now();
+            this.wss.clients.forEach((ws) => {
+                if (!ws.lastActivity || now - ws.lastActivity > closeSocketOnIdle - 50) {
+                    ws.terminate();
+                }
+            });
+        } finally {
+            setTimeout(() => { this.periodicClean(); }, cleanPeriod);
+        }
+    }
+
+    async onMessage(ws, message) {
+        let req = {};
+        try {
+            ws.lastActivity = Date.now();
+            req = JSON.parse(message);
+            switch (req.action) {
+                case 'test':
+                    this.test(req, ws); break;
+                case 'worker-get-state':
+                    this.workerGetState(req, ws); break;
+                case 'worker-get-state-finish':
+                    this.workerGetStateFinish(req, ws); break;
+
+                default:
+                    throw new Error(`Action not found: ${req.action}`);
+            }
+        } catch (e) {
+            this.send({error: e.message}, req, ws);
+        }
+    }
+
+    send(res, req, ws) {
+        if (ws.readyState == WebSocket.OPEN) {
+            ws.lastActivity = Date.now();
+            let r = Object.assign({}, res);
+            if (req.requestId)
+                r.requestId = req.requestId;
+            ws.send(JSON.stringify(r));
+        }
+    }
+
+    //Actions ------------------------------------------------------------------
+    async test(req, ws) {
+        this.send({message: 'Liberama project is awesome'}, req, ws);
+    }
+
+    async workerGetState(req, ws) {
+        if (!req.workerId)
+            throw new Error(`key 'workerId' is wrong`);
+
+        const state = this.workerState.getState(req.workerId);
+        this.send((state ? state : {}), req, ws);
+    }
+
+    async workerGetStateFinish(req, ws) {
+        if (!req.workerId)
+            throw new Error(`key 'workerId' is wrong`);
+
+        const refreshPause = 200;
+        let i = 0;
+        let state = {};
+        while (1) {// eslint-disable-line no-constant-condition
+            const prevProgress = state.progress || -1;
+            const prevState = state.state || '';
+            state = this.workerState.getState(req.workerId);
+
+            this.send((state ? state : {}), req, ws);
+            if (!state) break;
+
+            if (state.state != 'finish' && state.state != 'error')
+                await utils.sleep(refreshPause);
+            else
+                break;
+
+            i++;
+            if (i > 2*60*1000/refreshPause) {//2 мин ждем телодвижений воркера
+                this.send({state: 'error', error: 'Время ожидания процесса истекло'}, req, ws);
+            }
+            i = (prevProgress != state.progress || prevState != state.state ? 1 : i);
+        }        
+    }
+
+}
+
+module.exports = WebSocketController;

+ 1 - 0
server/controllers/WorkerController.js

@@ -26,6 +26,7 @@ class WorkerController extends BaseController {
         return false;
     }
 
+    //TODO: удалить бесполезную getStateFinish
     async getStateFinish(req, res) {
         const request = req.body;
         let error = '';

+ 1 - 0
server/controllers/index.js

@@ -2,4 +2,5 @@ module.exports = {
     MiscController: require('./MiscController'),
     ReaderController: require('./ReaderController'),
     WorkerController: require('./WorkerController'),
+    WebSocketController: require('./WebSocketController'),
 }

+ 10 - 5
server/index.js

@@ -4,6 +4,8 @@ const path = require('path');
 const argv = require('minimist')(process.argv.slice(2));
 const express = require('express');
 const compression = require('compression');
+const http = require('http');
+const WebSocket = require ('ws');
 
 async function init() {
     //config
@@ -46,10 +48,13 @@ async function main() {
     const config = new (require('./config'))().config;//singleton
 
     //servers
-    for (let server of config.servers) {
-        if (server.mode !== 'none') {
+    for (let serverCfg of config.servers) {
+        if (serverCfg.mode !== 'none') {
             const app = express();
-            const serverConfig = Object.assign({}, config, server);
+            const server = http.createServer(app);
+            const wss = new WebSocket.Server({ server, maxPayload: 10*1024*1024 });
+
+            const serverConfig = Object.assign({}, config, serverCfg);
 
             let devModule = undefined;
             if (serverConfig.branch == 'development') {
@@ -73,7 +78,7 @@ async function main() {
                 }               
             }));
 
-            require('./routes').initRoutes(app, serverConfig);
+            require('./routes').initRoutes(app, wss, serverConfig);
 
             if (devModule) {
                 devModule.logErrors(app);
@@ -84,7 +89,7 @@ async function main() {
                 });
             }
 
-            app.listen(serverConfig.port, serverConfig.ip, function() {
+            server.listen(serverConfig.port, serverConfig.ip, function() {
                 log(`Server-${serverConfig.serverName} is ready on ${serverConfig.ip}:${serverConfig.port}, mode: ${serverConfig.mode}`);
             });
         }

+ 2 - 1
server/routes.js

@@ -2,10 +2,11 @@ const c = require('./controllers');
 const utils = require('./core/utils');
 const multer = require('multer');
 
-function initRoutes(app, config) {
+function initRoutes(app, wss, config) {
     const misc = new c.MiscController(config);
     const reader = new c.ReaderController(config);
     const worker = new c.WorkerController(config);
+    new c.WebSocketController(wss, config);
 
     //access
     const [aAll, aNormal, aSite, aReader, aOmnireader] = // eslint-disable-line no-unused-vars