BUCServer.js 12 KB


  1. const fs = require('fs-extra');
  2. const FileDownloader = require('../FileDownloader');
  3. const JembaConnManager = require('../../db/JembaConnManager');//singleton
  4. const ayncExit = new (require('../AsyncExit'))();
  5. const utils = require('../utils');
  6. const log = new (require('../AppLogger'))().log;//singleton
  7. const minuteMs = 60*1000;
  8. const hourMs = 60*minuteMs;
  9. const dayMs = 24*hourMs;
  10. let instance = null;
  11. //singleton
  12. class BUCServer {
  13. constructor(config) {
  14. if (!instance) {
  15. this.config = config;
  16. //константы
  17. if (this.config.branch !== 'development') {
  18. this.maxCheckQueueLength = 10000;//максимальная длина checkQueue
  19. this.fillCheckQueuePeriod = 1*minuteMs;//период пополнения очереди
  20. this.periodicCheckWait = 500;//пауза, если нечего делать
  21. this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
  22. this.oldQueryInterval = 14*dayMs;//интервал устаревания запроса на обновление
  23. this.checkingInterval = 5*hourMs;//интервал проверки обновления одного и того же файла
  24. this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
  25. } else {
  26. this.maxCheckQueueLength = 10;//максимальная длина checkQueue
  27. this.fillCheckQueuePeriod = 10*1000;//период пополнения очереди
  28. this.periodicCheckWait = 500;//пауза, если нечего делать
  29. this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
  30. this.oldQueryInterval = 30*dayMs;//интервал устаревания запроса на обновление
  31. this.checkingInterval = 30*1000;//интервал проверки обновления одного и того же файла
  32. this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
  33. }
  34. this.config.tempDownloadDir = `${config.tempDir}/download`;
  35. fs.ensureDirSync(this.config.tempDownloadDir);
  36. this.down = new FileDownloader(config.maxUploadFileSize);
  37. this.connManager = new JembaConnManager();
  38. this.db = this.connManager.db['book-update-server'];
  39. this.checkQueue = [];
  40. this.hostChecking = {};
  41. this.main(); //no await
  42. instance = this;
  43. }
  44. return instance;
  45. }
  46. async getBuc(fromCheckTime, callback) {
  47. const db = this.db;
  48. const iterName = utils.randomHexString(30);
  49. while (1) {//eslint-disable-line
  50. const rows = await db.select({
  51. table: 'buc',
  52. where: `
  53. let iter = @getItem(${db.esc(iterName)});
  54. if (!iter) {
  55. iter = @dirtyIndexLR('checkTime', ${db.esc(fromCheckTime)});
  56. iter = iter.values();
  57. @setItem(${db.esc(iterName)}, iter);
  58. }
  59. const ids = new Set();
  60. let id = iter.next();
  61. while (!id.done && ids.size < 100) {
  62. ids.add(id.value);
  63. id = iter.next();
  64. }
  65. return ids;
  66. `
  67. });
  68. if (rows.length)
  69. callback(rows);
  70. else
  71. break;
  72. }
  73. await db.select({
  74. table: 'buc',
  75. where: `
  76. @delItem(${db.esc(iterName)});
  77. return new Set();
  78. `
  79. });
  80. }
  81. async updateBuc(bookUrls) {
  82. const db = this.db;
  83. const now = Date.now();
  84. const rows = await db.select({
  85. table: 'buc',
  86. map: `(r) => ({id: r.id})`,
  87. where: `@@id(${db.esc(bookUrls)})`
  88. });
  89. const exists = new Set();
  90. for (const row of rows) {
  91. exists.add(row.id);
  92. }
  93. const toUpdateIds = [];
  94. const toInsertRows = [];
  95. for (let id of bookUrls) {
  96. if (!id)
  97. continue;
  98. if (id.length > 1000) {
  99. id = id.substring(0, 1000);
  100. }
  101. if (exists.has(id)) {
  102. toUpdateIds.push(id);
  103. } else {
  104. toInsertRows.push({
  105. id,
  106. queryTime: now,
  107. checkTime: 0, // 0 - never checked
  108. etag: '',
  109. modTime: '',
  110. size: 0,
  111. checkSum: '', //sha256
  112. state: 0, // 0 - not processing, 1 - processing
  113. error: '',
  114. });
  115. }
  116. }
  117. if (toUpdateIds.length) {
  118. await db.update({
  119. table: 'buc',
  120. mod: `(r) => r.queryTime = ${db.esc(now)}`,
  121. where: `@@id(${db.esc(toUpdateIds)})`
  122. });
  123. }
  124. if (toInsertRows.length) {
  125. await db.insert({
  126. table: 'buc',
  127. ignore: true,
  128. rows: toInsertRows,
  129. });
  130. }
  131. }
  132. async fillCheckQueue() {
  133. const db = this.db;
  134. while (1) {//eslint-disable-line
  135. try {
  136. let now = Date.now();
  137. //чистка совсем устаревших
  138. let rows = await db.select({
  139. table: 'buc',
  140. where: `@@dirtyIndexLR('queryTime', undefined, ${db.esc(now - this.cleanQueryInterval)})`
  141. });
  142. if (rows.length) {
  143. const ids = rows.map((r) => r.id);
  144. const res = await db.delete({
  145. table: 'buc',
  146. where: `@@id(${db.esc(ids)})`,
  147. });
  148. log(LM_WARN, `clean 'buc' table: deleted ${res.deleted}`);
  149. }
  150. //rows = await db.select({table: 'buc', count: true});
  151. //log(LM_WARN, `'buc' table length: ${rows[0].count}`);
  152. now = Date.now();
  153. //выборка кандидатов
  154. rows = await db.select({
  155. table: 'buc',
  156. where: `
  157. @@and(
  158. @dirtyIndexLR('queryTime', ${db.esc(now - this.oldQueryInterval)}),
  159. @dirtyIndexLR('checkTime', undefined, ${db.esc(now - this.checkingInterval)}),
  160. @flag('notProcessing')
  161. );
  162. `
  163. });
  164. //console.log(rows);
  165. if (rows.length) {
  166. const ids = [];
  167. for (const row of rows) {
  168. if (this.checkQueue.length >= this.maxCheckQueueLength)
  169. break;
  170. ids.push(row.id);
  171. this.checkQueue.push(row);
  172. }
  173. await db.update({
  174. table: 'buc',
  175. mod: `(r) => r.state = 1`,
  176. where: `@@id(${db.esc(ids)})`
  177. });
  178. log(LM_WARN, `checkQueue: added ${ids.length} recs, total ${this.checkQueue.length}`);
  179. }
  180. } catch(e) {
  181. log(LM_ERR, e.stack);
  182. }
  183. await utils.sleep(this.fillCheckQueuePeriod);
  184. }
  185. }
  186. async periodicCheck() {
  187. const db = this.db;
  188. while (1) {//eslint-disable-line
  189. try {
  190. if (!this.checkQueue.length)
  191. await utils.sleep(this.periodicCheckWait);
  192. if (!this.checkQueue.length)
  193. continue;
  194. const row = this.checkQueue.shift();
  195. const url = new URL(row.id);
  196. //только если обращались к тому же хосту не ранее sameHostCheckInterval миллисекунд назад
  197. if (!this.hostChecking[url.hostname]) {
  198. this.hostChecking[url.hostname] = true;
  199. try {
  200. let unchanged = true;
  201. let hash = '';
  202. const headers = await this.down.head(row.id);
  203. const etag = headers['etag'] || '';
  204. const modTime = headers['last-modified'] || '';
  205. let size = parseInt(headers['content-length'], 10) || 0;
  206. //log(row.id);
  207. //log(`etag: ${etag}, modTime: ${modTime}, size: ${size}`)
  208. if ((!etag || !row.etag || (etag !== row.etag))
  209. && (!modTime || !row.modTime || (modTime !== row.modTime))
  210. && (!size || !row.size || (size !== row.size))
  211. ) {
  212. const downdata = await this.down.load(row.id);
  213. size = downdata.length;
  214. hash = await utils.getBufHash(downdata, 'sha256', 'hex');
  215. unchanged = false;
  216. }
  217. await db.update({
  218. table: 'buc',
  219. mod: `(r) => {
  220. r.checkTime = ${db.esc(Date.now())};
  221. r.etag = ${(unchanged ? 'r.etag' : db.esc(etag))};
  222. r.modTime = ${(unchanged ? 'r.modTime' : db.esc(modTime))};
  223. r.size = ${(unchanged ? 'r.size' : db.esc(size))};
  224. r.checkSum = ${(unchanged ? 'r.checkSum' : db.esc(hash))};
  225. r.state = 0;
  226. r.error = '';
  227. }`,
  228. where: `@@id(${db.esc(row.id)})`
  229. });
  230. if (unchanged) {
  231. log(`checked ${row.id} > unchanged`);
  232. } else {
  233. log(`checked ${row.id} > size ${size}`);
  234. }
  235. } catch (e) {
  236. await db.update({
  237. table: 'buc',
  238. mod: `(r) => {
  239. r.checkTime = ${db.esc(Date.now())};
  240. r.state = 0;
  241. r.error = ${db.esc(e.message)};
  242. }`,
  243. where: `@@id(${db.esc(row.id)})`
  244. });
  245. log(LM_ERR, `error ${row.id} > ${e.stack}`);
  246. } finally {
  247. (async() => {
  248. await utils.sleep(this.sameHostCheckInterval);
  249. this.hostChecking[url.hostname] = false;
  250. })();
  251. }
  252. } else {
  253. this.checkQueue.push(row);
  254. }
  255. } catch(e) {
  256. log(LM_ERR, e.stack);
  257. }
  258. await utils.sleep(10);
  259. }
  260. }
  261. async main() {
  262. try {
  263. //обнуляем все статусы
  264. await this.db.update({table: 'buc', mod: `(r) => r.state = 0`});
  265. this.fillCheckQueue();//no await
  266. //10 потоков
  267. for (let i = 0; i < 10; i++)
  268. this.periodicCheck();//no await
  269. log(`------------------`);
  270. log(`BUC Server started`);
  271. log(`------------------`);
  272. } catch (e) {
  273. log(LM_FATAL, e.stack);
  274. ayncExit.exit(1);
  275. }
  276. }
  277. }
  278. module.exports = BUCServer;