Browse Source

Исправления LimitedQueue, исправления багов, добавлена проверка флага abort

Book Pauk 5 years ago
parent
commit
f8b7b8b698

+ 9 - 3
server/core/FileDownloader.js

@@ -5,7 +5,7 @@ class FileDownloader {
         this.limitDownloadSize = limitDownloadSize;
         this.limitDownloadSize = limitDownloadSize;
     }
     }
 
 
-    async load(url, callback) {
+    async load(url, callback, abort) {
         let errMes = '';
         let errMes = '';
         const options = {
         const options = {
             encoding: null,
             encoding: null,
@@ -22,7 +22,9 @@ class FileDownloader {
         }
         }
 
 
         let prevProg = 0;
         let prevProg = 0;
-        const request = got(url, options).on('downloadProgress', progress => {
+        const request = got(url, options);
+
+        request.on('downloadProgress', progress => {
             if (this.limitDownloadSize) {
             if (this.limitDownloadSize) {
                 if (progress.transferred > this.limitDownloadSize) {
                 if (progress.transferred > this.limitDownloadSize) {
                     errMes = 'Файл слишком большой';
                     errMes = 'Файл слишком большой';
@@ -39,8 +41,12 @@ class FileDownloader {
             if (prog != prevProg && callback)
             if (prog != prevProg && callback)
                 callback(prog);
                 callback(prog);
             prevProg = prog;
             prevProg = prog;
-        });
 
 
+            if (abort && abort()) {
+                errMes = 'abort';
+                request.cancel();
+            }
+        });
 
 
         try {
         try {
             return (await request).body;
             return (await request).body;

+ 36 - 37
server/core/LimitedQueue.js

@@ -1,19 +1,16 @@
-const cleanPeriod = 60*1000;//1 минута
-const cleanTimeout = 60;//timeout в минутах (cleanPeriod)
-
 class LimitedQueue {
 class LimitedQueue {
-    constructor(enqueueAfter = 10, size = 100, timeout = cleanTimeout) {//timeout в минутах (cleanPeriod)
+    constructor(enqueueAfter = 10, size = 100, timeout = 60*60*1000) {//timeout в ms
         this.size = size;
         this.size = size;
         this.timeout = timeout;
         this.timeout = timeout;
 
 
+        this.abortCount = 0;
+        this.enqueueAfter = enqueueAfter;
         this.freed = enqueueAfter;
         this.freed = enqueueAfter;
         this.listeners = [];
         this.listeners = [];
-
-        this.timer = setTimeout(() => { this.periodicClean(); }, cleanPeriod);
     }
     }
 
 
     _addListener(listener) {
     _addListener(listener) {
-        this.listeners.push(Object.assign({regTime: Date.now()}, listener));
+        this.listeners.push(listener);
     }
     }
 
 
     //отсылаем сообщение первому ожидающему и удаляем его из списка
     //отсылаем сообщение первому ожидающему и удаляем его из списка
@@ -22,13 +19,11 @@ class LimitedQueue {
             let listener = this.listeners.shift();
             let listener = this.listeners.shift();
             listener.onFree();
             listener.onFree();
 
 
-            const now = Date.now();
             for (let i = 0; i < this.listeners.length; i++) {
             for (let i = 0; i < this.listeners.length; i++) {
-                listener = this.listeners[i];
-                listener.regTime = now;
-                listener.onPlaceChange(i + 1);
+                this.listeners[i].onPlaceChange(i + 1);
             }
             }
 
 
+            this.resetTimeout();
         }
         }
     }
     }
 
 
@@ -42,16 +37,21 @@ class LimitedQueue {
                     throw new Error('Ошибка получения ресурсов в очереди ожидания');
                     throw new Error('Ошибка получения ресурсов в очереди ожидания');
 
 
                 this.freed--;
                 this.freed--;
+                this.resetTimeout();
 
 
-                let returned = false;
+                let aCount = this.abortCount;
                 return {
                 return {
                     ret: () => {
                     ret: () => {
-                        if (!returned) {
+                        if (aCount == this.abortCount) {
                             this.freed++;
                             this.freed++;
                             this._emitFree();
                             this._emitFree();
-                            returned = true;
+                            aCount = -1;
                         }
                         }
-                    }
+                    },
+                    abort: () => {
+                        return (aCount != this.abortCount);
+                    },
+                    resetTimeout: this.resetTimeout.bind(this)
                 };
                 };
             };
             };
 
 
@@ -80,6 +80,27 @@ class LimitedQueue {
         });
         });
     }
     }
 
 
+    resetTimeout() {
+        if (this.timer)
+            clearTimeout(this.timer);
+        this.timer = setTimeout(() => { this.clean(); }, this.timeout);
+    }
+
+    clean() {
+        this.timer = null;
+
+        if (this.freed < this.enqueueAfter) {
+            this.abortCount++;
+            //чистка listeners
+            for (const listener of this.listeners) {
+                listener.onError('Время ожидания в очереди истекло');
+            }
+            this.listeners = [];
+
+            this.freed = this.enqueueAfter;
+        }
+    }
+
     destroy() {
     destroy() {
         if (this.timer) {
         if (this.timer) {
             clearTimeout(this.timer);
             clearTimeout(this.timer);
@@ -93,28 +114,6 @@ class LimitedQueue {
 
 
         this.destroyed = true;
         this.destroyed = true;
     }
     }
-
-    periodicClean() {
-        try {
-            this.timer = null;
-
-            const now = Date.now();
-            //чистка listeners, убираем зависшие в очереди на одном месте
-            let newListeners = [];
-            for (const listener of this.listeners) {
-                if (now - listener.regTime < this.timeout*cleanPeriod - 50) {
-                    newListeners.push(listener);
-                } else {
-                    listener.onError('Время ожидания в очереди истекло');
-                }
-            }
-            this.listeners = newListeners;
-        } finally {
-            if (!this.destroyed) {
-                this.timer = setTimeout(() => { this.periodicClean(); }, cleanPeriod);
-            }
-        }
-    }
 }
 }
 
 
 module.exports = LimitedQueue;
 module.exports = LimitedQueue;

+ 1 - 1
server/core/Reader/BookConverter/ConvertBase.js

@@ -14,7 +14,7 @@ class ConvertBase {
         this.calibrePath = `${config.dataDir}/calibre/ebook-convert`;
         this.calibrePath = `${config.dataDir}/calibre/ebook-convert`;
         this.sofficePath = '/usr/bin/soffice';
         this.sofficePath = '/usr/bin/soffice';
         this.pdfToHtmlPath = '/usr/bin/pdftohtml';
         this.pdfToHtmlPath = '/usr/bin/pdftohtml';
-        this.queue = new LimitedQueue(2, 20, 3);
+        this.queue = new LimitedQueue(2, 20, 3*60*1000);
     }
     }
 
 
     async run(data, opts) {// eslint-disable-line no-unused-vars
     async run(data, opts) {// eslint-disable-line no-unused-vars

+ 18 - 4
server/core/Reader/ReaderWorker.js

@@ -27,7 +27,7 @@ class ReaderWorker {
             this.config.tempPublicDir = `${config.publicDir}/tmp`;
             this.config.tempPublicDir = `${config.publicDir}/tmp`;
             fs.ensureDirSync(this.config.tempPublicDir);
             fs.ensureDirSync(this.config.tempPublicDir);
 
 
-            this.queue = new LimitedQueue(5, 100, 3);
+            this.queue = new LimitedQueue(5, 100, 3*60*1000);
             this.workerState = new WorkerState();
             this.workerState = new WorkerState();
             this.down = new FileDownloader(config.maxUploadFileSize);
             this.down = new FileDownloader(config.maxUploadFileSize);
             this.decomp = new FileDecompressor(2*config.maxUploadFileSize);
             this.decomp = new FileDecompressor(2*config.maxUploadFileSize);
@@ -56,6 +56,9 @@ class ReaderWorker {
         let isUploaded = false;
         let isUploaded = false;
         let convertFilename = '';
         let convertFilename = '';
 
 
+        const overLoadMes = 'Слишком большая очередь загрузки. Пожалуйста, попробуйте позже.';
+        const overLoadErr = new Error(overLoadMes);
+
         let q = null;
         let q = null;
         try {
         try {
             wState.set({state: 'queue', step: 1, totalSteps: 1});
             wState.set({state: 'queue', step: 1, totalSteps: 1});
@@ -67,7 +70,7 @@ class ReaderWorker {
                         qSize = place;
                         qSize = place;
                 });
                 });
             } catch (e) {
             } catch (e) {
-                throw new Error('Слишком большая очередь загрузки. Пожалуйста, попробуйте позже.');
+                throw overLoadErr;
             }
             }
 
 
             wState.set({state: 'download', step: 1, totalSteps: 3, url});
             wState.set({state: 'download', step: 1, totalSteps: 3, url});
@@ -76,10 +79,11 @@ class ReaderWorker {
             const tempFilename2 = utils.randomHexString(30);
             const tempFilename2 = utils.randomHexString(30);
             const decompDirname = utils.randomHexString(30);
             const decompDirname = utils.randomHexString(30);
 
 
+            //download or use uploaded
             if (url.indexOf('file://') != 0) {//download
             if (url.indexOf('file://') != 0) {//download
                 const downdata = await this.down.load(url, (progress) => {
                 const downdata = await this.down.load(url, (progress) => {
                     wState.set({progress});
                     wState.set({progress});
-                });
+                }, q.abort);
 
 
                 downloadedFilename = `${this.config.tempDownloadDir}/${tempFilename}`;
                 downloadedFilename = `${this.config.tempDownloadDir}/${tempFilename}`;
                 await fs.writeFile(downloadedFilename, downdata);
                 await fs.writeFile(downloadedFilename, downdata);
@@ -92,6 +96,10 @@ class ReaderWorker {
             }
             }
             wState.set({progress: 100});
             wState.set({progress: 100});
 
 
+            if (q.abort())
+                throw overLoadErr;
+            q.resetTimeout();
+
             //decompress
             //decompress
             wState.set({state: 'decompress', step: 2, progress: 0});
             wState.set({state: 'decompress', step: 2, progress: 0});
             decompDir = `${this.config.tempDownloadDir}/${decompDirname}`;
             decompDir = `${this.config.tempDownloadDir}/${decompDirname}`;
@@ -104,12 +112,16 @@ class ReaderWorker {
             }
             }
             wState.set({progress: 100});
             wState.set({progress: 100});
             
             
+            if (q.abort())
+                throw overLoadErr;
+            q.resetTimeout();
+
             //конвертирование в fb2
             //конвертирование в fb2
             wState.set({state: 'convert', step: 3, progress: 0});
             wState.set({state: 'convert', step: 3, progress: 0});
             convertFilename = `${this.config.tempDownloadDir}/${tempFilename2}`;
             convertFilename = `${this.config.tempDownloadDir}/${tempFilename2}`;
             await this.bookConverter.convertToFb2(decompFiles, convertFilename, opts, progress => {
             await this.bookConverter.convertToFb2(decompFiles, convertFilename, opts, progress => {
                 wState.set({progress});
                 wState.set({progress});
-            });
+            }, q.abort);
 
 
             //сжимаем файл в tmp, если там уже нет с тем же именем-sha256
             //сжимаем файл в tmp, если там уже нет с тем же именем-sha256
             const compFilename = await this.decomp.gzipFileIfNotExists(convertFilename, this.config.tempPublicDir);
             const compFilename = await this.decomp.gzipFileIfNotExists(convertFilename, this.config.tempPublicDir);
@@ -136,6 +148,8 @@ class ReaderWorker {
 
 
         } catch (e) {
         } catch (e) {
             log(LM_ERR, e.stack);
             log(LM_ERR, e.stack);
+            if (e.message == 'abort')
+                e.message = overLoadMes;
             wState.set({state: 'error', error: e.message});
             wState.set({state: 'error', error: e.message});
         } finally {
         } finally {
             //clean
             //clean