Ver Fonte

Работа над BookUpdateChecker

Book Pauk há 2 anos atrás
pai
commit
08d0d3e7f3

+ 2 - 2
server/controllers/BookUpdateCheckerController.js

@@ -48,7 +48,7 @@ class BookUpdateCheckerController {
         let req = {};
         let req = {};
         try {
         try {
             if (this.isDevelopment) {
             if (this.isDevelopment) {
-                log(`WebSocket-IN:  ${message.substr(0, 4000)}`);
+                log(`BUC-WebSocket-IN:  ${message.substr(0, 4000)}`);
             }
             }
 
 
             req = JSON.parse(message);
             req = JSON.parse(message);
@@ -88,7 +88,7 @@ class BookUpdateCheckerController {
             ws.send(message);
             ws.send(message);
 
 
             if (this.isDevelopment) {
             if (this.isDevelopment) {
-                log(`WebSocket-OUT: ${message.substr(0, 4000)}`);
+                log(`BUC-WebSocket-OUT: ${message.substr(0, 4000)}`);
             }
             }
 
 
         }
         }

+ 95 - 9
server/core/BookUpdateChecker/BUCClient.js

@@ -1,4 +1,4 @@
-const WebSocketConnection = require('./WebSocketConnection');
+const WebSocketConnection = require('../WebSocketConnection');
 const JembaConnManager = require('../../db/JembaConnManager');//singleton
 const JembaConnManager = require('../../db/JembaConnManager');//singleton
 
 
 const ayncExit = new (require('../AsyncExit'))();
 const ayncExit = new (require('../AsyncExit'))();
@@ -18,7 +18,7 @@ class BUCClient {
             this.config = config;
             this.config = config;
 
 
             this.connManager = new JembaConnManager();
             this.connManager = new JembaConnManager();
-            this.db = this.connManager.db['book-update-server'];
+            this.appDb = this.connManager.db['app'];
 
 
             this.wsc = new WebSocketConnection(config.bucServer.url, 10, 30, {rejectUnauthorized: false});
             this.wsc = new WebSocketConnection(config.bucServer.url, 10, 30, {rejectUnauthorized: false});
             this.accessToken = config.bucServer.accessToken;
             this.accessToken = config.bucServer.accessToken;
@@ -28,7 +28,7 @@ class BUCClient {
                 this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
                 this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
                 this.syncPeriod = 1*hourMs;//период синхронизации с сервером BUC
                 this.syncPeriod = 1*hourMs;//период синхронизации с сервером BUC
             } else {
             } else {
-                this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
+                this.cleanQueryInterval = 300*minuteMs;//300*dayMs;//интервал очистки устаревших
                 this.syncPeriod = 1*minuteMs;//период синхронизации с сервером BUC
                 this.syncPeriod = 1*minuteMs;//период синхронизации с сервером BUC
             }
             }
 
 
@@ -45,20 +45,33 @@ class BUCClient {
 
 
     async wsRequest(query) {
     async wsRequest(query) {
         const response = await this.wsc.message(
         const response = await this.wsc.message(
-            await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 600),
-            600
+            await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 60),
+            60
         );
         );
         if (response.error)
         if (response.error)
             throw new Error(response.error);
             throw new Error(response.error);
         return response;
         return response;
     }
     }
 
 
+    async wsGetBuc(fromCheckTime, callback) {
+        const requestId = await this.wsc.send({accessToken: this.accessToken, action: 'get-buc', fromCheckTime}, 60);
+        while (1) {//eslint-disable-line
+            const res = await this.wsc.message(requestId, 60);
+
+            if (res.state == 'get') {
+                await callback(res.rows);
+            } else {
+                break;
+            }
+        }
+    }
+
     async wsUpdateBuc(bookUrls) {
     async wsUpdateBuc(bookUrls) {
         return await this.wsRequest({action: 'update-buc', bookUrls});
         return await this.wsRequest({action: 'update-buc', bookUrls});
     }
     }
 
 
     async checkBuc(bookUrls) {
     async checkBuc(bookUrls) {
-        const db = this.db;
+        const db = this.appDb;
 
 
         for (const url of bookUrls)
         for (const url of bookUrls)
             this.bookUrls.add(url);
             this.bookUrls.add(url);
@@ -72,7 +85,7 @@ class BUCClient {
     }
     }
 
 
     async findMaxCheckTime() {
     async findMaxCheckTime() {
-        const db = this.db;
+        const db = this.appDb;
 
 
         let result = 1;
         let result = 1;
 
 
@@ -89,7 +102,7 @@ class BUCClient {
                         max = row.checkTime;
                         max = row.checkTime;
                         maxId = row.id;
                         maxId = row.id;
                     }
                     }
-                };
+                });
 
 
                 if (maxId)
                 if (maxId)
                     result.add(maxId);
                     result.add(maxId);
@@ -105,19 +118,25 @@ class BUCClient {
     }
     }
 
 
     async periodicSync() {
     async periodicSync() {
+        const db = this.appDb;
+
         while (1) {//eslint-disable-line
         while (1) {//eslint-disable-line
             try {
             try {
                 //сначала отправим this.bookUrls
                 //сначала отправим this.bookUrls
+                log(`client: remote update buc begin`);
+
                 const arr = Array.from(this.bookUrls);
                 const arr = Array.from(this.bookUrls);
                 this.bookUrls = new Set();
                 this.bookUrls = new Set();
 
 
                 const chunkSize = 100;
                 const chunkSize = 100;
+                let updated = 0;
                 for (let i = 0; i < arr.length; i += chunkSize) {
                 for (let i = 0; i < arr.length; i += chunkSize) {
                     const chunk = arr.slice(i, i + chunkSize);
                     const chunk = arr.slice(i, i + chunkSize);
                     
                     
                     const res = await this.wsUpdateBuc(chunk);
                     const res = await this.wsUpdateBuc(chunk);
                     if (!res.error && res.state == 'success') {
                     if (!res.error && res.state == 'success') {
                         //update success
                         //update success
+                        updated += chunk.length;
                     } else {
                     } else {
                         for (const url of chunk) {
                         for (const url of chunk) {
                             this.bookUrls.add(url);
                             this.bookUrls.add(url);
@@ -125,11 +144,78 @@ class BUCClient {
                         log(LM_ERR, `update-buc error: ${(res.error ? res.error : `wrong state "${res.state}"`)}`);
                         log(LM_ERR, `update-buc error: ${(res.error ? res.error : `wrong state "${res.state}"`)}`);
                     }
                     }
                 }
                 }
+                log(`client: remote update buc end, updated ${updated} urls`);
 
 
                 //почистим нашу таблицу 'buc'
                 //почистим нашу таблицу 'buc'
-                this.cleanQueryInterval
+                log(`client: clean 'buc' table begin`);
+                const cleanTime = Date.now() - this.cleanQueryInterval;
+                while (1) {//eslint-disable-line
+                    //выборка всех по кусочкам
+                    const rows = await db.select({
+                        table: 'buc',
+                        where: `
+                            let iter = @getItem('clean');
+                            if (!iter) {
+                                iter = @all();
+                                @setItem('clean', iter);
+                            }
+
+                            const ids = new Set();
+                            let id = iter.next();
+                            while (!id.done && ids.size < 1000) {
+                                ids.add(id.value);
+                                id = iter.next();
+                            }
+
+                            return ids;
+                        `
+                    });
+
+                    if (rows.length) {
+                        const toDelIds = [];
+                        for (const row of rows)
+                            if (row.queryTime <= cleanTime)
+                                toDelIds.push(row.id);
+
+                        //удаление
+                        const res = await db.delete({
+                            table: 'buc',
+                            where: `@@id(${db.esc(toDelIds)})`,
+                        });
+
+                        log(`client: clean 'buc' deleted ${res.deleted}`);
+                    } else {
+                        break;
+                    }
+                }
+                await db.select({
+                    table: 'buc',
+                    where: `
+                        @delItem('clean');
+                        return new Set();
+                    `
+                });
+
+                log(`client: clean 'buc' table end`);
 
 
                 //синхронизация с сервером BUC
                 //синхронизация с сервером BUC
+                log(`client: sync 'buc' table begin`);
+                this.fromCheckTime -= 30*minuteMs;//минус полчаса на всякий случай
+                await this.wsGetBuc(this.fromCheckTime, async(rows) => {
+                    for (const row of rows) {
+                        if (row.checkTime > this.fromCheckTime)
+                            this.fromCheckTime = row.checkTime;
+                    }
+
+                    const res = await db.insert({
+                        table: 'buc',
+                        replace: true,
+                        rows
+                    });
+    
+                    log(`client: sync 'buc' table, inserted ${res.inserted} rows, replaced ${res.replaced}`);
+                });
+                log(`client: sync 'buc' table end`);
             } catch (e) {
             } catch (e) {
                 log(LM_ERR, e.stack);
                 log(LM_ERR, e.stack);
             }
             }

+ 18 - 8
server/core/BookUpdateChecker/BUCServer.js

@@ -35,7 +35,7 @@ class BUCServer {
                 this.periodicCheckWait = 500;//пауза, если нечего делать
                 this.periodicCheckWait = 500;//пауза, если нечего делать
 
 
                 this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
                 this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
-                this.oldQueryInterval = 5*minuteMs;//интервал устаревания запроса на обновление
+                this.oldQueryInterval = 30*dayMs;//интервал устаревания запроса на обновление
                 this.checkingInterval = 30*1000;//интервал проверки обновления одного и того же файла
                 this.checkingInterval = 30*1000;//интервал проверки обновления одного и того же файла
                 this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
                 this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
             }
             }
@@ -63,14 +63,17 @@ class BUCServer {
     async getBuc(fromCheckTime, callback) {
     async getBuc(fromCheckTime, callback) {
         const db = this.db;
         const db = this.db;
 
 
+        const iterName = utils.randomHexString(30);
+
         while (1) {//eslint-disable-line
         while (1) {//eslint-disable-line
             const rows = await db.select({
             const rows = await db.select({
                 table: 'buc',
                 table: 'buc',
                 where: `
                 where: `
-                    let iter = @getItem('getBuc');
+                    let iter = @getItem(${db.esc(iterName)});
                     if (!iter) {
                     if (!iter) {
                         iter = @dirtyIndexLR('checkTime', ${db.esc(fromCheckTime)});
                         iter = @dirtyIndexLR('checkTime', ${db.esc(fromCheckTime)});
-                        @setItem('getBuc', iter);
+                        iter = iter.values();
+                        @setItem(${db.esc(iterName)}, iter);
                     }
                     }
 
 
                     const ids = new Set();
                     const ids = new Set();
@@ -89,6 +92,14 @@ class BUCServer {
             else
             else
                 break;
                 break;
         }
         }
+
+        await db.select({
+            table: 'buc',
+            where: `
+                @delItem(${db.esc(iterName)});
+                return new Set();
+            `
+        });
     }
     }
 
 
     async updateBuc(bookUrls) {
     async updateBuc(bookUrls) {
@@ -111,7 +122,7 @@ class BUCServer {
         for (let id of bookUrls) {
         for (let id of bookUrls) {
             if (!id)
             if (!id)
                 continue;
                 continue;
-            
+
             if (id.length > 1000) {
             if (id.length > 1000) {
                 id = id.substring(0, 1000);
                 id = id.substring(0, 1000);
             }
             }
@@ -174,10 +185,7 @@ class BUCServer {
 
 
                 rows = await db.select({table: 'buc', count: true});
                 rows = await db.select({table: 'buc', count: true});
                 log(LM_WARN, `'buc' table length: ${rows[0].count}`);
                 log(LM_WARN, `'buc' table length: ${rows[0].count}`);
-/*
-rows = await db.select({table: 'buc'});
-console.log(rows);
-*/
+
                 now = Date.now();
                 now = Date.now();
                 //выборка кандидатов
                 //выборка кандидатов
                 rows = await db.select({
                 rows = await db.select({
@@ -191,6 +199,8 @@ console.log(rows);
                     `
                     `
                 });
                 });
 
 
+//console.log(rows);
+
                 if (rows.length) {
                 if (rows.length) {
                     const ids = [];
                     const ids = [];