浏览代码

Рефакторинг WebSocketConnection, небольшие улучшения

Book Pauk 4 年之前
父节点
当前提交
a09b70a991

+ 1 - 2
client/api/misc.js

@@ -13,8 +13,7 @@ class Misc {
         ]};
 
         try {
-            await wsc.open();
-            const config = await wsc.message(wsc.send(Object.assign({action: 'get-config'}, query)));
+            const config = await wsc.message(await wsc.send(Object.assign({action: 'get-config'}, query)));
             if (config.error)
                 throw new Error(config.error);
             return config;

+ 3 - 6
client/api/reader.js

@@ -19,8 +19,7 @@ class Reader {
 
         let response = {};
         try {
-            await wsc.open();
-            const requestId = wsc.send({action: 'worker-get-state-finish', workerId});
+            const requestId = await wsc.send({action: 'worker-get-state-finish', workerId});
 
             let prevResponse = false;
             while (1) {// eslint-disable-line no-constant-condition
@@ -124,8 +123,7 @@ class Reader {
             let response = null
             
             try {
-                await wsc.open();
-                response = await wsc.message(wsc.send({action: 'reader-restore-cached-file', path: url}));
+                response = await wsc.message(await wsc.send({action: 'reader-restore-cached-file', path: url}));
             } catch (e) {
                 console.error(e);
                 //если с WebSocket проблема, работаем по http
@@ -210,8 +208,7 @@ class Reader {
     async storage(request) {
         let response = null;
         try {
-            await wsc.open();
-            response = await wsc.message(wsc.send({action: 'reader-storage', body: request}));
+            response = await wsc.message(await wsc.send({action: 'reader-storage', body: request}));
         } catch (e) {
             console.error(e);
             //если с WebSocket проблема, работаем по http

+ 1 - 183
client/api/webSocketConnection.js

@@ -1,185 +1,3 @@
-import * as utils from '../share/utils';
-
-const cleanPeriod = 60*1000;//1 минута
-
-class WebSocketConnection {
-    //messageLifeTime в минутах (cleanPeriod)
-    constructor(messageLifeTime = 5) {
-        this.ws = null;
-        this.timer = null;
-        this.listeners = [];
-        this.messageQueue = [];
-        this.messageLifeTime = messageLifeTime;
-        this.requestId = 0;
-
-        this.connecting = false;
-    }
-
-    addListener(listener) {
-        if (this.listeners.indexOf(listener) < 0)
-            this.listeners.push(Object.assign({regTime: Date.now()}, listener));
-    }
-
-    //рассылаем сообщение и удаляем те обработчики, которые его получили
-    emit(mes, isError) {
-        const len = this.listeners.length;
-        if (len > 0) {
-            let newListeners = [];
-            for (const listener of this.listeners) {
-                let emitted = false;
-                if (isError) {
-                    if (listener.onError)
-                        listener.onError(mes);
-                    emitted = true;
-                } else {
-                    if (listener.onMessage) {
-                        if (listener.requestId) {
-                            if (listener.requestId === mes.requestId) {
-                                listener.onMessage(mes);
-                                emitted = true;
-                            }
-                        } else {
-                            listener.onMessage(mes);
-                            emitted = true;
-                        }
-                    } else {
-                        emitted = true;
-                    }
-                }
-
-                if (!emitted)
-                    newListeners.push(listener);
-            }
-            this.listeners = newListeners;
-        }
-        
-        return this.listeners.length != len;
-    }
-
-    open(url) {
-        return new Promise((resolve, reject) => { (async() => {
-            //Ожидаем окончания процесса подключения, если open уже был вызван
-            let i = 0;
-            while (this.connecting && i < 200) {//10 сек
-                await utils.sleep(50);
-                i++;
-            }
-            if (i >= 200)
-                this.connecting = false;
-
-            //проверим подключение, и если нет, то подключимся заново
-            if (this.ws && this.ws.readyState == WebSocket.OPEN) {
-                resolve(this.ws);
-            } else {
-                this.connecting = true;
-                const protocol = (window.location.protocol == 'https:' ? 'wss:' : 'ws:');
-
-                url = url || `${protocol}//${window.location.host}/ws`;
-                
-                this.ws = new WebSocket(url);
-
-                if (this.timer) {
-                    clearTimeout(this.timer);
-                }
-                this.timer = setTimeout(() => { this.periodicClean(); }, cleanPeriod);
-
-                this.ws.onopen = (e) => {
-                    this.connecting = false;
-                    resolve(e);
-                };
-
-                this.ws.onmessage = (e) => {
-                    try {
-                        const mes = JSON.parse(e.data);
-                        this.messageQueue.push({regTime: Date.now(), mes});
-
-                        let newMessageQueue = [];
-                        for (const message of this.messageQueue) {
-                            if (!this.emit(message.mes)) {
-                                newMessageQueue.push(message);
-                            }
-                        }
-
-                        this.messageQueue = newMessageQueue;
-                    } catch (e) {
-                        this.emit(e.message, true);
-                    }
-                };
-
-                this.ws.onerror = (e) => {
-                    this.emit(e.message, true);
-                    if (this.connecting) {
-                        this.connecting = false;
-                        reject(e);
-                    }
-                };
-            }
-        })() });
-    }
-
-    //timeout в минутах (cleanPeriod)
-    message(requestId, timeout = 2) {
-        return new Promise((resolve, reject) => {
-            this.addListener({
-                requestId,
-                timeout,
-                onMessage: (mes) => {
-                    resolve(mes);
-                },
-                onError: (e) => {
-                    reject(e);
-                }
-            });
-        });
-    }
-
-    send(req) {
-        if (this.ws && this.ws.readyState == WebSocket.OPEN) {
-            const requestId = ++this.requestId;
-            this.ws.send(JSON.stringify(Object.assign({requestId}, req)));
-            return requestId;
-        } else {
-            throw new Error('WebSocket connection is not ready');
-        }
-    }
-
-    close() {
-        if (this.ws && this.ws.readyState == WebSocket.OPEN) {
-            this.ws.close();
-        }
-    }
-
-    periodicClean() {
-        try {
-            this.timer = null;
-
-            const now = Date.now();
-            //чистка listeners
-            let newListeners = [];
-            for (const listener of this.listeners) {
-                if (now - listener.regTime < listener.timeout*cleanPeriod - 50) {
-                    newListeners.push(listener);
-                } else {
-                    if (listener.onError)
-                        listener.onError('Время ожидания ответа истекло');
-                }
-            }
-            this.listeners = newListeners;
-
-            //чистка messageQueue
-            let newMessageQueue = [];
-            for (const message of this.messageQueue) {
-                if (now - message.regTime < this.messageLifeTime*cleanPeriod - 50) {
-                    newMessageQueue.push(message);
-                }
-            }
-            this.messageQueue = newMessageQueue;
-        } finally {
-            if (this.ws.readyState == WebSocket.OPEN) {
-                this.timer = setTimeout(() => { this.periodicClean(); }, cleanPeriod);
-            }
-        }
-    }
-}
+import WebSocketConnection from '../../server/core/WebSocketConnection';
 
 export default new WebSocketConnection();

+ 7 - 1
server/controllers/WebSocketController.js

@@ -50,8 +50,14 @@ class WebSocketController {
                 log(`WebSocket-IN:  ${message.substr(0, 4000)}`);
             }
 
-            ws.lastActivity = Date.now();
             req = JSON.parse(message);
+
+            ws.lastActivity = Date.now();
+            
+            //pong for WebSocketConnection
+            if (req._rpo === 1)
+                this.send({_rok: 1}, req, ws);
+
             switch (req.action) {
                 case 'test':
                     await this.test(req, ws); break;

+ 237 - 0
server/core/WebSocketConnection.js

@@ -0,0 +1,237 @@
+const isBrowser = (typeof window !== 'undefined');
+
+const utils = {
+    sleep: (ms) => { return new Promise(resolve => setTimeout(resolve, ms)); }
+};
+
+const cleanPeriod = 5*1000;//5 секунд
+
+class WebSocketConnection {
+    //messageLifeTime в секундах (проверка каждый cleanPeriod интервал)
+    constructor(url, openTimeoutSecs = 10, messageLifeTimeSecs = 30) {
+        this.WebSocket = (isBrowser ? WebSocket : require('ws'));
+        this.url = url;
+        this.ws = null;
+        this.listeners = [];
+        this.messageQueue = [];
+        this.messageLifeTime = messageLifeTimeSecs*1000;
+        this.openTimeout = openTimeoutSecs*1000;
+        this.requestId = 0;
+
+        this.wsErrored = false;
+        this.closed = false;
+
+        this.connecting = false;
+        this.periodicClean();//no await
+    }
+
+    //рассылаем сообщение и удаляем те обработчики, которые его получили
+    emit(mes, isError) {
+        const len = this.listeners.length;
+        if (len > 0) {
+            let newListeners = [];
+            for (const listener of this.listeners) {
+                let emitted = false;
+                if (isError) {
+                    listener.onError(mes);
+                    emitted = true;
+                } else {
+                    if ( (listener.requestId && mes.requestId && listener.requestId === mes.requestId) ||
+                        (!listener.requestId && !mes.requestId) ) {
+                        listener.onMessage(mes);
+                        emitted = true;
+                    }
+                }
+
+                if (!emitted)
+                    newListeners.push(listener);
+            }
+            this.listeners = newListeners;
+        }
+        
+        return this.listeners.length != len;
+    }
+
+    get isOpen() {
+        return (this.ws && this.ws.readyState == this.WebSocket.OPEN);
+    }
+
+    processMessageQueue() {
+        let newMessageQueue = [];
+        for (const message of this.messageQueue) {
+            if (!this.emit(message.mes)) {
+                newMessageQueue.push(message);
+            }
+        }
+
+        this.messageQueue = newMessageQueue;
+    }
+
+    _open() {
+        return new Promise((resolve, reject) => { (async() => {
+            if (this.closed)
+                reject(new Error('Этот экземпляр класса уничтожен. Пожалуйста, создайте новый.'));
+
+            if (this.connecting) {
+                let i = this.openTimeout/100;
+                while (i-- > 0 && this.connecting) {
+                    await utils.sleep(100);
+                }
+            }
+
+            //проверим подключение, и если нет, то подключимся заново
+            if (this.isOpen) {
+                resolve(this.ws);
+            } else {
+                this.connecting = true;
+                this.terminate();
+
+                if (isBrowser) {
+                    const protocol = (window.location.protocol == 'https:' ? 'wss:' : 'ws:');
+                    const url = this.url || `${protocol}//${window.location.host}/ws`;
+                    this.ws = new this.WebSocket(url);
+                } else {
+                    this.ws = new this.WebSocket(this.url);
+                }
+
+                const onopen = (e) => {
+                    this.connecting = false;
+                    resolve(this.ws);
+                };
+
+                const onmessage = (data) => {
+                    try {
+                        if (isBrowser)
+                            data = data.data;
+                        const mes = JSON.parse(data);
+                        this.messageQueue.push({regTime: Date.now(), mes});
+
+                        this.processMessageQueue();
+                    } catch (e) {
+                        this.emit(e.message, true);
+                    }
+                };
+
+                const onerror = (e) => {
+                    this.emit(e.message, true);
+                    reject(new Error(e.message));
+                };
+
+                const onclose = (e) => {
+                    this.emit(e.message, true);
+                    reject(new Error(e.message));
+                };
+
+                if (isBrowser) {
+                    this.ws.onopen = onopen;
+                    this.ws.onmessage = onmessage;
+                    this.ws.onerror = onerror;
+                    this.ws.onclose = onclose;
+                } else {
+                    this.ws.on('open', onopen);
+                    this.ws.on('message', onmessage);
+                    this.ws.on('error', onerror);
+                    this.ws.on('close', onclose);
+                }
+
+                await utils.sleep(this.openTimeout);
+                reject(new Error('Соединение не удалось'));
+            }
+        })() });
+    }
+
+    //timeout в секундах (проверка каждый cleanPeriod интервал)
+    message(requestId, timeoutSecs = 4) {
+        return new Promise((resolve, reject) => {
+            this.listeners.push({
+                regTime: Date.now(),
+                requestId,
+                timeout: timeoutSecs*1000,
+                onMessage: (mes) => {
+                    resolve(mes);
+                },
+                onError: (mes) => {
+                    reject(new Error(mes));
+                }
+            });
+            
+            this.processMessageQueue();
+        });
+    }
+
+    async send(req, timeoutSecs = 4) {
+        await this._open();
+        if (this.isOpen) {
+            this.requestId = (this.requestId < 1000000 ? this.requestId + 1 : 1);
+            const requestId = this.requestId;//реентерабельность!!!
+
+            this.ws.send(JSON.stringify(Object.assign({requestId, _rpo: 1}, req)));//_rpo: 1 - ждем в ответ _rok: 1
+
+            let resp = {};
+            try {
+                resp = await this.message(requestId, timeoutSecs);
+            } catch(e) {
+                this.terminate();
+                throw new Error('WebSocket не отвечает');
+            }
+
+            if (resp._rok) {                
+                return requestId;
+            } else {
+                throw new Error('Запрос не принят сервером');
+            }
+        } else {
+            throw new Error('WebSocket коннект закрыт');
+        }
+    }
+
+    terminate() {
+        if (this.ws) {
+            if (isBrowser) {
+                this.ws.close();
+            } else {
+                this.ws.terminate();
+            }
+        }
+        this.ws = null;
+    }
+
+    close() {
+        this.terminate();
+        this.closed = true;
+    }
+
+    async periodicClean() {
+        while (!this.closed) {
+            try {
+                const now = Date.now();
+                //чистка listeners
+                let newListeners = [];
+                for (const listener of this.listeners) {
+                    if (now - listener.regTime < listener.timeout) {
+                        newListeners.push(listener);
+                    } else {
+                        if (listener.onError)
+                            listener.onError('Время ожидания ответа истекло');
+                    }
+                }
+                this.listeners = newListeners;
+
+                //чистка messageQueue
+                let newMessageQueue = [];
+                for (const message of this.messageQueue) {
+                    if (now - message.regTime < this.messageLifeTime) {
+                        newMessageQueue.push(message);
+                    }
+                }
+                this.messageQueue = newMessageQueue;
+            } catch(e) {
+                //
+            }
+
+            await utils.sleep(cleanPeriod);
+        }
+    }
+}
+
+module.exports = WebSocketConnection;