BUCServer.js 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. const fs = require('fs-extra');
  2. const FileDownloader = require('../FileDownloader');
  3. const JembaConnManager = require('../../db/JembaConnManager');//singleton
  4. const ayncExit = new (require('../AsyncExit'))();
  5. const utils = require('../utils');
  6. const log = new (require('../AppLogger'))().log;//singleton
  7. const minuteMs = 60*1000;
  8. const hourMs = 60*minuteMs;
  9. const dayMs = 24*hourMs;
  10. let instance = null;
  11. //singleton
  12. class BUCServer {
  13. constructor(config) {
  14. if (!instance) {
  15. this.config = config;
  16. //константы
  17. if (this.config.branch !== 'development') {
  18. this.maxCheckQueueLength = 10000;//максимальная длина checkQueue
  19. this.fillCheckQueuePeriod = 1*minuteMs;//период пополнения очереди
  20. this.periodicCheckWait = 500;//пауза, если нечего делать
  21. this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
  22. this.oldQueryInterval = 30*dayMs;//интервал устаревания запроса на обновление
  23. this.checkingInterval = 3*hourMs;//интервал проверки обновления одного и того же файла
  24. this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
  25. } else {
  26. this.maxCheckQueueLength = 10;//максимальная длина checkQueue
  27. this.fillCheckQueuePeriod = 10*1000;//период пополнения очереди
  28. this.periodicCheckWait = 500;//пауза, если нечего делать
  29. this.cleanQueryInterval = 100*1000;//10*minuteMs;//интервал очистки устаревших
  30. this.oldQueryInterval = 5*minuteMs;//интервал устаревания запроса на обновление
  31. this.checkingInterval = 30*1000;//интервал проверки обновления одного и того же файла
  32. this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
  33. }
  34. this.config.tempDownloadDir = `${config.tempDir}/download`;
  35. fs.ensureDirSync(this.config.tempDownloadDir);
  36. this.down = new FileDownloader(config.maxUploadFileSize);
  37. this.connManager = new JembaConnManager();
  38. this.db = this.connManager.db['book-update-server'];
  39. this.checkQueue = [];
  40. this.hostChecking = {};
  41. this.main(); //no await
  42. instance = this;
  43. }
  44. return instance;
  45. }
  46. async getBuc(fromCheckTime, callback) {
  47. const db = this.db;
  48. while (1) {//eslint-disable-line
  49. const rows = await db.select({
  50. table: 'buc',
  51. where: `
  52. let iter = @getItem('getBuc');
  53. if (!iter) {
  54. iter = @dirtyIndexLR('checkTime', ${db.esc(fromCheckTime)});
  55. @setItem('getBuc', iter);
  56. }
  57. const ids = new Set();
  58. let id = iter.next();
  59. while (!id.done && ids.size < 100) {
  60. ids.add(id.value);
  61. id = iter.next();
  62. }
  63. return ids;
  64. `
  65. });
  66. if (rows.length)
  67. callback(rows);
  68. else
  69. break;
  70. }
  71. }
  72. async updateBuc(bookUrls) {
  73. const db = this.db;
  74. const now = Date.now();
  75. await db.update({
  76. table: 'buc',
  77. mod: `(r) => r.queryTime = ${db.esc(now)}`,
  78. where: `@@id(${db.esc(bookUrls)})`
  79. });
  80. }
  81. async fillCheckQueue() {
  82. const db = this.db;
  83. while (1) {//eslint-disable-line
  84. try {
  85. let now = Date.now();
  86. //чистка совсем устаревших
  87. let rows = await db.select({
  88. table: 'buc',
  89. where: `@@dirtyIndexLR('queryTime', undefined, ${db.esc(now - this.cleanQueryInterval)})`
  90. });
  91. if (rows.length) {
  92. const ids = rows.map((r) => r.id);
  93. const res = await db.delete({
  94. table: 'buc',
  95. where: `@@id(${db.esc(ids)})`,
  96. });
  97. log(LM_WARN, `clean 'buc' table: deleted ${res.deleted}`);
  98. }
  99. rows = await db.select({table: 'buc', count: true});
  100. log(LM_WARN, `'buc' table length: ${rows[0].count}`);
  101. rows = await db.select({table: 'buc'});
  102. console.log(rows);
  103. now = Date.now();
  104. //выборка кандидатов
  105. rows = await db.select({
  106. table: 'buc',
  107. where: `
  108. @@and(
  109. @dirtyIndexLR('queryTime', ${db.esc(now - this.oldQueryInterval)}),
  110. @dirtyIndexLR('checkTime', undefined, ${db.esc(now - this.checkingInterval)}),
  111. @flag('notProcessing')
  112. );
  113. `
  114. });
  115. if (rows.length) {
  116. const ids = [];
  117. for (let i = 0; i < rows.length; i++) {
  118. if (this.checkQueue.length >= this.maxCheckQueueLength)
  119. break;
  120. const row = rows[i];
  121. ids.push(row.id);
  122. this.checkQueue.push(row);
  123. }
  124. await db.update({
  125. table: 'buc',
  126. mod: `(r) => r.state = 1`,
  127. where: `@@id(${db.esc(ids)})`
  128. });
  129. log(LM_WARN, `checkQueue: added ${ids.length} recs, total ${this.checkQueue.length}`);
  130. }
  131. } catch(e) {
  132. log(LM_ERR, e.stack);
  133. }
  134. await utils.sleep(this.fillCheckQueuePeriod);
  135. }
  136. }
  137. async periodicCheck() {
  138. const db = this.db;
  139. while (1) {//eslint-disable-line
  140. try {
  141. if (!this.checkQueue.length)
  142. await utils.sleep(this.periodicCheckWait);
  143. if (!this.checkQueue.length)
  144. continue;
  145. const row = this.checkQueue.shift();
  146. const url = new URL(row.id);
  147. //только если обращались к тому же хосту не ранее sameHostCheckInterval миллисекунд назад
  148. if (!this.hostChecking[url.hostname]) {
  149. this.hostChecking[url.hostname] = true;
  150. try {
  151. const downdata = await this.down.load(row.id);
  152. const hash = await utils.getBufHash(downdata, 'sha256', 'hex');
  153. await db.update({
  154. table: 'buc',
  155. mod: `(r) => {
  156. r.checkTime = ${db.esc(Date.now())};
  157. r.size = ${db.esc(downdata.length)};
  158. r.checkSum = ${db.esc(hash)};
  159. r.state = 0;
  160. r.error = '';
  161. }`,
  162. where: `@@id(${db.esc(row.id)})`
  163. });
  164. log(`checked ${row.id} > size ${downdata.length}`);
  165. } catch (e) {
  166. await db.update({
  167. table: 'buc',
  168. mod: `(r) => {
  169. r.checkTime = ${db.esc(Date.now())};
  170. r.state = 0;
  171. r.error = ${db.esc(e.message)};
  172. }`,
  173. where: `@@id(${db.esc(row.id)})`
  174. });
  175. } finally {
  176. (async() => {
  177. await utils.sleep(this.sameHostCheckInterval);
  178. this.hostChecking[url.hostname] = false;
  179. })();
  180. }
  181. } else {
  182. this.checkQueue.push(row);
  183. }
  184. } catch(e) {
  185. log(LM_ERR, e.stack);
  186. }
  187. await utils.sleep(10);
  188. }
  189. }
  190. async main() {
  191. try {
  192. //обнуляем все статусы
  193. await this.db.update({table: 'buc', mod: `(r) => r.state = 0`});
  194. /*
  195. await this.db.insert({
  196. table: 'buc',
  197. replace: true,
  198. rows: [
  199. {
  200. id: 'http://old.omnireader.ru/test.txt', // book URL
  201. queryTime: Date.now(),
  202. checkTime: 0, // 0 - never checked
  203. size: 0,
  204. checkSum: '', //sha256
  205. state: 0, // 0 - not processing, 1 - processing
  206. error: '',
  207. }
  208. ],
  209. });
  210. */
  211. this.fillCheckQueue();//no await
  212. //10 потоков
  213. for (let i = 0; i < 10; i++)
  214. this.periodicCheck();//no await
  215. log(`------------------`);
  216. log(`BUC Server started`);
  217. log(`------------------`);
  218. } catch (e) {
  219. log(LM_FATAL, e.stack);
  220. ayncExit.exit(1);
  221. }
  222. }
  223. }
  224. module.exports = BUCServer;