|
- const fs = require('fs-extra');
- const FileDownloader = require('../FileDownloader');
- const JembaConnManager = require('../../db/JembaConnManager');//singleton
- const ayncExit = new (require('../AsyncExit'))();
- const utils = require('../utils');
- const log = new (require('../AppLogger'))().log;//singleton
- const minuteMs = 60*1000;
- const hourMs = 60*minuteMs;
- const dayMs = 24*hourMs;
- let instance = null;
- //singleton
- class BUCServer {
- constructor(config) {
- if (!instance) {
- this.config = config;
- //константы
- if (this.config.branch !== 'development') {
- this.maxCheckQueueLength = 10000;//максимальная длина checkQueue
- this.fillCheckQueuePeriod = 1*minuteMs;//период пополнения очереди
- this.periodicCheckWait = 500;//пауза, если нечего делать
- this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
- this.oldQueryInterval = 14*dayMs;//интервал устаревания запроса на обновление
- this.checkingInterval = 5*hourMs;//интервал проверки обновления одного и того же файла
- this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
- } else {
- this.maxCheckQueueLength = 10;//максимальная длина checkQueue
- this.fillCheckQueuePeriod = 10*1000;//период пополнения очереди
- this.periodicCheckWait = 500;//пауза, если нечего делать
- this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
- this.oldQueryInterval = 30*dayMs;//интервал устаревания запроса на обновление
- this.checkingInterval = 30*1000;//интервал проверки обновления одного и того же файла
- this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
- }
-
- this.config.tempDownloadDir = `${config.tempDir}/download`;
- fs.ensureDirSync(this.config.tempDownloadDir);
- this.down = new FileDownloader(config.maxUploadFileSize);
- this.connManager = new JembaConnManager();
- this.db = this.connManager.db['book-update-server'];
-
- this.checkQueue = [];
- this.hostChecking = {};
- this.main(); //no await
- instance = this;
- }
- return instance;
- }
- async getBuc(fromCheckTime, callback) {
- const db = this.db;
- const iterName = utils.randomHexString(30);
- while (1) {//eslint-disable-line
- const rows = await db.select({
- table: 'buc',
- where: `
- let iter = @getItem(${db.esc(iterName)});
- if (!iter) {
- iter = @dirtyIndexLR('checkTime', ${db.esc(fromCheckTime)});
- iter = iter.values();
- @setItem(${db.esc(iterName)}, iter);
- }
- const ids = new Set();
- let id = iter.next();
- while (!id.done && ids.size < 100) {
- ids.add(id.value);
- id = iter.next();
- }
- return ids;
- `
- });
- if (rows.length)
- callback(rows);
- else
- break;
- }
- await db.select({
- table: 'buc',
- where: `
- @delItem(${db.esc(iterName)});
- return new Set();
- `
- });
- }
- async updateBuc(bookUrls) {
- const db = this.db;
- const now = Date.now();
- const rows = await db.select({
- table: 'buc',
- map: `(r) => ({id: r.id})`,
- where: `@@id(${db.esc(bookUrls)})`
- });
- const exists = new Set();
- for (const row of rows) {
- exists.add(row.id);
- }
- const toUpdateIds = [];
- const toInsertRows = [];
- for (let id of bookUrls) {
- if (!id)
- continue;
- if (id.length > 1000) {
- id = id.substring(0, 1000);
- }
- if (exists.has(id)) {
- toUpdateIds.push(id);
- } else {
- toInsertRows.push({
- id,
- queryTime: now,
- checkTime: 0, // 0 - never checked
- etag: '',
- modTime: '',
- size: 0,
- checkSum: '', //sha256
- state: 0, // 0 - not processing, 1 - processing
- error: '',
- });
- }
- }
- if (toUpdateIds.length) {
- await db.update({
- table: 'buc',
- mod: `(r) => r.queryTime = ${db.esc(now)}`,
- where: `@@id(${db.esc(toUpdateIds)})`
- });
- }
- if (toInsertRows.length) {
- await db.insert({
- table: 'buc',
- ignore: true,
- rows: toInsertRows,
- });
- }
- }
- async fillCheckQueue() {
- const db = this.db;
- while (1) {//eslint-disable-line
- try {
- let now = Date.now();
- //чистка совсем устаревших
- let rows = await db.select({
- table: 'buc',
- where: `@@dirtyIndexLR('queryTime', undefined, ${db.esc(now - this.cleanQueryInterval)})`
- });
- if (rows.length) {
- const ids = rows.map((r) => r.id);
- const res = await db.delete({
- table: 'buc',
- where: `@@id(${db.esc(ids)})`,
- });
- log(LM_WARN, `clean 'buc' table: deleted ${res.deleted}`);
- }
- rows = await db.select({table: 'buc', count: true});
- log(LM_WARN, `'buc' table size: ${rows[0].count}`);
- now = Date.now();
- //выборка кандидатов
- rows = await db.select({
- table: 'buc',
- where: `
- @@and(
- @dirtyIndexLR('queryTime', ${db.esc(now - this.oldQueryInterval)}),
- @dirtyIndexLR('checkTime', undefined, ${db.esc(now - this.checkingInterval)}),
- @flag('notProcessing')
- );
- `
- });
- //формирование checkQueue
- if (rows.length) {
- const ids = [];
- const rowsToPush = [];
- //сначала выберем сколько надо
- for (const row of rows) {
- if (this.checkQueue.length + rowsToPush.length >= this.maxCheckQueueLength)
- break;
- rowsToPush.push(row);
- ids.push(row.id);
- }
- //установим у них флаг "в обработке"
- await db.update({
- table: 'buc',
- mod: `(r) => r.state = 1`,
- where: `@@id(${db.esc(ids)})`
- });
- //пушим в очередь, после этого их обработает periodicCheck
- for (const row of rowsToPush)
- this.checkQueue.push(row);
-
- log(LM_WARN, `checkQueue: added ${ids.length} recs, total ${this.checkQueue.length}`);
- }
- } catch(e) {
- log(LM_ERR, e.stack);
- }
- await utils.sleep(this.fillCheckQueuePeriod);
- }
- }
- async periodicCheck() {
- const db = this.db;
- while (1) {//eslint-disable-line
- try {
- if (!this.checkQueue.length)
- await utils.sleep(this.periodicCheckWait);
- if (!this.checkQueue.length)
- continue;
- const row = this.checkQueue.shift();
- const url = new URL(row.id);
- //только если обращались к тому же хосту не ранее sameHostCheckInterval миллисекунд назад
- if (!this.hostChecking[url.hostname]) {
- this.hostChecking[url.hostname] = true;
- try {
- let unchanged = true;
- let hash = '';
- const headers = await this.down.head(row.id);
- const etag = headers['etag'] || '';
- const modTime = headers['last-modified'] || '';
- let size = parseInt(headers['content-length'], 10) || 0;
- //log(row.id);
- //log(`etag: ${etag}, modTime: ${modTime}, size: ${size}`)
- if ((!etag || !row.etag || (etag !== row.etag))
- && (!modTime || !row.modTime || (modTime !== row.modTime))
- && (!size || !row.size || (size !== row.size))
- ) {
- const downdata = await this.down.load(row.id);
- size = downdata.length;
- hash = await utils.getBufHash(downdata, 'sha256', 'hex');
- unchanged = false;
- }
- await db.update({
- table: 'buc',
- mod: `(r) => {
- r.checkTime = ${db.esc(Date.now())};
- r.etag = ${(unchanged ? 'r.etag' : db.esc(etag))};
- r.modTime = ${(unchanged ? 'r.modTime' : db.esc(modTime))};
- r.size = ${(unchanged ? 'r.size' : db.esc(size))};
- r.checkSum = ${(unchanged ? 'r.checkSum' : db.esc(hash))};
- r.state = 0;
- r.error = '';
- }`,
- where: `@@id(${db.esc(row.id)})`
- });
- if (unchanged) {
- log(`checked ${row.id} > unchanged`);
- } else {
- log(`checked ${row.id} > size ${size}`);
- }
- } catch (e) {
- await db.update({
- table: 'buc',
- mod: `(r) => {
- r.checkTime = ${db.esc(Date.now())};
- r.state = 0;
- r.error = ${db.esc(e.message)};
- }`,
- where: `@@id(${db.esc(row.id)})`
- });
- log(LM_ERR, `error ${row.id} > ${e.stack}`);
- } finally {
- (async() => {
- await utils.sleep(this.sameHostCheckInterval);
- this.hostChecking[url.hostname] = false;
- })();
- }
- } else {
- this.checkQueue.push(row);
- }
- } catch(e) {
- log(LM_ERR, e.stack);
- }
- await utils.sleep(10);
- }
- }
- async main() {
- try {
- //обнуляем все статусы
- await this.db.update({table: 'buc', mod: `(r) => r.state = 0`});
- this.fillCheckQueue();//no await
- //10 потоков
- for (let i = 0; i < 10; i++)
- this.periodicCheck();//no await
- log(`-------------------------`);
- log(`BUC Server Worker started`);
- log(`-------------------------`);
- } catch (e) {
- log(LM_FATAL, e.stack);
- ayncExit.exit(1);
- }
- }
- }
- module.exports = BUCServer;
|