Explorar o código

Работа над WebWorker и DbSearcher

Book Pauk %!s(int64=2) %!d(string=hai) anos
pai
achega
676e6f6909

+ 3 - 6
server/controllers/WebSocketController.js

@@ -98,13 +98,10 @@ class WebSocketController {
     }
 
     async getConfig(req, ws) {
-        if (Array.isArray(req.params)) {
-            const paramsSet = new Set(req.params);
+        const config = _.pick(this.config, this.config.webConfigParams);
+        config.dbConfig = await this.webWorker.dbConfig();
 
-            this.send(_.pick(this.config, this.config.webConfigParams.filter(x => paramsSet.has(x))), req, ws);
-        } else {
-            throw new Error('params is not an array');
-        }
+        this.send(config, req, ws);
     }
 
     async getWorkerState(req, ws) {

+ 28 - 0
server/core/DbSearcher.js

@@ -0,0 +1,28 @@
+class DbSearcher {
+    constructor(db) {
+        this.db = db;
+    }
+
+    async search(query) {
+        const db = this.db;
+
+        let result = [];
+
+        if (query.author) {
+            //
+        } else {
+            result = await db.select({
+                table: 'author',
+                map: `(r) => ({id: r.id, author: r.author})`
+            });
+        }
+
+        if (query.limit) {
+            result = result.slice(0, query.limit);
+        }
+
+        return result;
+    }
+}
+
+module.exports = DbSearcher;

+ 240 - 0
server/core/WebSocketConnection.js

@@ -0,0 +1,240 @@
+const isBrowser = (typeof window !== 'undefined');
+
+const utils = {
+    sleep: (ms) => { return new Promise(resolve => setTimeout(resolve, ms)); }
+};
+
+const cleanPeriod = 5*1000;//5 секунд
+
+class WebSocketConnection {
+    //messageLifeTime в секундах (проверка каждый cleanPeriod интервал)
+    constructor(url, openTimeoutSecs = 10, messageLifeTimeSecs = 30, webSocketOptions = {}) {
+        this.WebSocket = (isBrowser ? WebSocket : require('ws'));
+        this.url = url;
+        this.webSocketOptions = webSocketOptions;
+
+        this.ws = null;
+
+        this.listeners = [];
+        this.messageQueue = [];
+        this.messageLifeTime = messageLifeTimeSecs*1000;
+        this.openTimeout = openTimeoutSecs*1000;
+        this.requestId = 0;
+
+        this.wsErrored = false;
+        this.closed = false;
+
+        this.connecting = false;
+        this.periodicClean();//no await
+    }
+
+    //рассылаем сообщение и удаляем те обработчики, которые его получили
+    emit(mes, isError) {
+        const len = this.listeners.length;
+        if (len > 0) {
+            let newListeners = [];
+            for (const listener of this.listeners) {
+                let emitted = false;
+                if (isError) {
+                    listener.onError(mes);
+                    emitted = true;
+                } else {
+                    if ( (listener.requestId && mes.requestId && listener.requestId === mes.requestId) ||
+                        (!listener.requestId && !mes.requestId) ) {
+                        listener.onMessage(mes);
+                        emitted = true;
+                    }
+                }
+
+                if (!emitted)
+                    newListeners.push(listener);
+            }
+            this.listeners = newListeners;
+        }
+        
+        return this.listeners.length != len;
+    }
+
+    get isOpen() {
+        return (this.ws && this.ws.readyState == this.WebSocket.OPEN);
+    }
+
+    processMessageQueue() {
+        let newMessageQueue = [];
+        for (const message of this.messageQueue) {
+            if (!this.emit(message.mes)) {
+                newMessageQueue.push(message);
+            }
+        }
+
+        this.messageQueue = newMessageQueue;
+    }
+
+    _open() {
+        return new Promise((resolve, reject) => { (async() => {
+            if (this.closed)
+                reject(new Error('Этот экземпляр класса уничтожен. Пожалуйста, создайте новый.'));
+
+            if (this.connecting) {
+                let i = this.openTimeout/100;
+                while (i-- > 0 && this.connecting) {
+                    await utils.sleep(100);
+                }
+            }
+
+            //проверим подключение, и если нет, то подключимся заново
+            if (this.isOpen) {
+                resolve(this.ws);
+            } else {
+                this.connecting = true;
+                this.terminate();
+
+                if (isBrowser) {
+                    const protocol = (window.location.protocol == 'https:' ? 'wss:' : 'ws:');
+                    const url = this.url || `${protocol}//${window.location.host}/ws`;
+                    this.ws = new this.WebSocket(url);
+                } else {
+                    this.ws = new this.WebSocket(this.url, this.webSocketOptions);
+                }
+
+                const onopen = () => {
+                    this.connecting = false;
+                    resolve(this.ws);
+                };
+
+                const onmessage = (data) => {
+                    try {
+                        if (isBrowser)
+                            data = data.data;
+                        const mes = JSON.parse(data);
+                        this.messageQueue.push({regTime: Date.now(), mes});
+
+                        this.processMessageQueue();
+                    } catch (e) {
+                        this.emit(e.message, true);
+                    }
+                };
+
+                const onerror = (e) => {
+                    this.emit(e.message, true);
+                    reject(new Error(e.message));
+                };
+
+                const onclose = (e) => {
+                    this.emit(e.message, true);
+                    reject(new Error(e.message));
+                };
+
+                if (isBrowser) {
+                    this.ws.onopen = onopen;
+                    this.ws.onmessage = onmessage;
+                    this.ws.onerror = onerror;
+                    this.ws.onclose = onclose;
+                } else {
+                    this.ws.on('open', onopen);
+                    this.ws.on('message', onmessage);
+                    this.ws.on('error', onerror);
+                    this.ws.on('close', onclose);
+                }
+
+                await utils.sleep(this.openTimeout);
+                reject(new Error('Соединение не удалось'));
+            }
+        })() });
+    }
+
+    //timeout в секундах (проверка каждый cleanPeriod интервал)
+    message(requestId, timeoutSecs = 4) {
+        return new Promise((resolve, reject) => {
+            this.listeners.push({
+                regTime: Date.now(),
+                requestId,
+                timeout: timeoutSecs*1000,
+                onMessage: (mes) => {
+                    resolve(mes);
+                },
+                onError: (mes) => {
+                    reject(new Error(mes));
+                }
+            });
+            
+            this.processMessageQueue();
+        });
+    }
+
+    async send(req, timeoutSecs = 4) {
+        await this._open();
+        if (this.isOpen) {
+            this.requestId = (this.requestId < 1000000 ? this.requestId + 1 : 1);
+            const requestId = this.requestId;//реентерабельность!!!
+
+            this.ws.send(JSON.stringify(Object.assign({requestId}, req)));
+
+            let resp = {};
+            try {
+                resp = await this.message(requestId, timeoutSecs);
+            } catch(e) {
+                this.terminate();
+                throw new Error('WebSocket не отвечает');
+            }
+
+            if (resp._rok) {                
+                return requestId;
+            } else {
+                throw new Error('Запрос не принят сервером');
+            }
+        } else {
+            throw new Error('WebSocket коннект закрыт');
+        }
+    }
+
+    terminate() {
+        if (this.ws) {
+            if (isBrowser) {
+                this.ws.close();
+            } else {
+                this.ws.terminate();
+            }
+        }
+        this.ws = null;
+    }
+
+    close() {
+        this.terminate();
+        this.closed = true;
+    }
+
+    async periodicClean() {
+        while (!this.closed) {
+            try {
+                const now = Date.now();
+                //чистка listeners
+                let newListeners = [];
+                for (const listener of this.listeners) {
+                    if (now - listener.regTime < listener.timeout) {
+                        newListeners.push(listener);
+                    } else {
+                        if (listener.onError)
+                            listener.onError('Время ожидания ответа истекло');
+                    }
+                }
+                this.listeners = newListeners;
+
+                //чистка messageQueue
+                let newMessageQueue = [];
+                for (const message of this.messageQueue) {
+                    if (now - message.regTime < this.messageLifeTime) {
+                        newMessageQueue.push(message);
+                    }
+                }
+                this.messageQueue = newMessageQueue;
+            } catch(e) {
+                //
+            }
+
+            await utils.sleep(cleanPeriod);
+        }
+    }
+}
+
+module.exports = WebSocketConnection;

+ 32 - 4
server/core/WebWorker.js

@@ -4,6 +4,7 @@ const fs = require('fs-extra');
 const WorkerState = require('./WorkerState');
 const { JembaDbThread } = require('jembadb');
 const DbCreator = require('./DbCreator');
+const DbSearcher = require('./DbSearcher');
 
 const ayncExit = new (require('./AsyncExit'))();
 const log = new (require('./AppLogger'))().log;//singleton
@@ -32,6 +33,7 @@ class WebWorker {
             this.wState = this.workerState.getControl('server_state');
             this.myState = '';
             this.db = null;
+            this.dbSearcher = null;
 
             ayncExit.add(this.closeDb.bind(this));
 
@@ -125,8 +127,8 @@ class WebWorker {
             this.setMyState(ssDbLoading);
             log('Searcher DB open');
 
-            this.db = new JembaDbThread();
-            await this.db.lock({
+            const db = new JembaDbThread();
+            await db.lock({
                 dbPath,
                 softLock: true,
 
@@ -136,8 +138,16 @@ class WebWorker {
             });
 
             //открываем все таблицы
-            await this.db.openAll();
-            await this.db.close({table: 'title'});
+            await db.openAll();
+
+            //закроем title для экономии памяти, откроем при необходимости
+            await db.close({table: 'title'});
+            this.titleOpen = false;
+
+            this.dbSearcher = new DbSearcher(db);
+
+            db.wwCache = {};            
+            this.db = db;
 
             log('Searcher DB is ready');
         } catch (e) {
@@ -148,6 +158,24 @@ class WebWorker {
         }
     }
 
+    async dbConfig() {
+        this.checkMyState();
+
+        const db = this.db;
+        if (!db.wwCache.config) {
+            const rows = await db.select({table: 'config'});
+            const config = {};
+
+            for (const row of rows) {
+                config[row.id] = row.value;
+            }
+
+            db.wwCache.config = config;
+        }
+
+        return db.wwCache.config;
+    }
+
     async logServerStats() {
         while (1) {// eslint-disable-line
             try {