123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- const WebSocketConnection = require('../WebSocketConnection');
- const JembaConnManager = require('../../db/JembaConnManager');//singleton
- const ayncExit = new (require('../AsyncExit'))();
- const utils = require('../utils');
- const log = new (require('../AppLogger'))().log;//singleton
- const minuteMs = 60*1000;
- const hourMs = 60*minuteMs;
- const dayMs = 24*hourMs;
- let instance = null;
- //singleton
- class BUCClient {
- constructor(config) {
- if (!instance) {
- this.config = config;
- this.connManager = new JembaConnManager();
- this.appDb = this.connManager.db['app'];
- this.wsc = new WebSocketConnection(config.bucServer.url, 10, 30, {rejectUnauthorized: false});
- this.accessToken = config.bucServer.accessToken;
- //константы
- if (this.config.branch !== 'development') {
- this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
- this.syncPeriod = 1*hourMs;//период синхронизации с сервером BUC
- this.sendBookUrlsPeriod = 1*minuteMs;//период отправки BookUrls на сервер BUC
- } else {
- this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
- this.syncPeriod = 1*minuteMs;//период синхронизации с сервером BUC
- this.sendBookUrlsPeriod = 1*1000;//период отправки BookUrls на сервер BUC
- }
- this.fromCheckTime = 1;
- this.bookUrls = new Set();
- this.main();//no await
- instance = this;
- }
- return instance;
- }
- async wsRequest(query) {
- const response = await this.wsc.message(
- await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 60),
- 60
- );
- if (response.error)
- throw new Error(response.error);
- return response;
- }
- async wsGetBuc(fromCheckTime, callback) {
- const requestId = await this.wsc.send({accessToken: this.accessToken, action: 'get-buc', fromCheckTime}, 60);
- while (1) {//eslint-disable-line
- const res = await this.wsc.message(requestId, 60);
- if (res.state == 'get') {
- await callback(res.rows);
- } else {
- break;
- }
- }
- }
- async wsUpdateBuc(bookUrls) {
- return await this.wsRequest({action: 'update-buc', bookUrls});
- }
- async checkBuc(bookUrls) {
- const db = this.appDb;
- for (const url of bookUrls)
- this.bookUrls.add(url);
- const rows = await db.select({
- table: 'buc',
- map: `(r) => ({id: r.id, size: r.size})`,
- where: `@@id(${db.esc(bookUrls)})`,
- });
- return rows;
- }
- async findMaxCheckTime() {
- const db = this.appDb;
- let result = 1;
- //одним куском, возможно будет жрать память
- const rows = await db.select({
- table: 'buc',
- where: `
- const result = new Set();
- let max = 0;
- let maxId = null;
- @iter(@all(), (row) => {
- if (row.checkTime > max) {
- max = row.checkTime;
- maxId = row.id;
- }
- });
- if (maxId)
- result.add(maxId);
- return result;
- `
- });
- if (rows.length)
- result = rows[0].checkTime;
- return result;
- }
- async periodicSendBookUrls() {
- while (1) {//eslint-disable-line
- try {
- //отправим this.bookUrls
- if (this.bookUrls.size) {
- log(`client: remote update buc begin`);
- const arr = Array.from(this.bookUrls);
- this.bookUrls = new Set();
- const chunkSize = 100;
- let updated = 0;
- for (let i = 0; i < arr.length; i += chunkSize) {
- const chunk = arr.slice(i, i + chunkSize);
-
- const res = await this.wsUpdateBuc(chunk);
- if (!res.error && res.state == 'success') {
- //update success
- updated += chunk.length;
- } else {
- for (const url of chunk) {
- this.bookUrls.add(url);
- }
- log(LM_ERR, `update-buc error: ${(res.error ? res.error : `wrong state "${res.state}"`)}`);
- }
- }
- log(`client: remote update buc end, updated ${updated} urls`);
- }
- } catch (e) {
- log(LM_ERR, e.stack);
- }
- await utils.sleep(this.sendBookUrlsPeriod);
- }
- }
- async periodicSync() {
- const db = this.appDb;
- while (1) {//eslint-disable-line
- try {
- //почистим нашу таблицу 'buc'
- log(`client: clean 'buc' table begin`);
- const cleanTime = Date.now() - this.cleanQueryInterval;
- while (1) {//eslint-disable-line
- //выборка всех по кусочкам
- const rows = await db.select({
- table: 'buc',
- where: `
- let iter = @getItem('clean');
- if (!iter) {
- iter = @all();
- @setItem('clean', iter);
- }
- const ids = new Set();
- let id = iter.next();
- while (!id.done && ids.size < 1000) {
- ids.add(id.value);
- id = iter.next();
- }
- return ids;
- `
- });
- if (rows.length) {
- const toDelIds = [];
- for (const row of rows)
- if (row.queryTime <= cleanTime)
- toDelIds.push(row.id);
- //удаление
- const res = await db.delete({
- table: 'buc',
- where: `@@id(${db.esc(toDelIds)})`,
- });
- log(`client: clean 'buc' deleted ${res.deleted}`);
- } else {
- break;
- }
- }
- await db.select({
- table: 'buc',
- where: `
- @delItem('clean');
- return new Set();
- `
- });
- log(`client: clean 'buc' table end`);
- //синхронизация с сервером BUC
- log(`client: sync 'buc' table begin`);
- this.fromCheckTime -= 30*minuteMs;//минус полчаса на всякий случай
- await this.wsGetBuc(this.fromCheckTime, async(rows) => {
- for (const row of rows) {
- if (row.checkTime > this.fromCheckTime)
- this.fromCheckTime = row.checkTime;
- }
- const res = await db.insert({
- table: 'buc',
- replace: true,
- rows
- });
-
- log(`client: sync 'buc' table, inserted ${res.inserted} rows, replaced ${res.replaced}`);
- });
- log(`client: sync 'buc' table end`);
- } catch (e) {
- log(LM_ERR, e.stack);
- }
- await utils.sleep(this.syncPeriod);
- }
- }
- async main() {
- try {
- if (!this.config.bucEnabled)
- throw new Error('BookUpdateChecker disabled');
- this.fromCheckTime = await this.findMaxCheckTime();
-
- this.periodicSendBookUrls();//no await
- this.periodicSync();//no await
- log(`BUC Client started`);
- } catch (e) {
- log(LM_FATAL, e.stack);
- ayncExit.exit(1);
- }
- }
- }
- module.exports = BUCClient;
|