BUCServer.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. const fs = require('fs-extra');
  2. const FileDownloader = require('../FileDownloader');
  3. const JembaConnManager = require('../../db/JembaConnManager');//singleton
  4. const ayncExit = new (require('../AsyncExit'))();
  5. const utils = require('../utils');
  6. const log = new (require('../AppLogger'))().log;//singleton
  7. const minuteMs = 60*1000;
  8. const hourMs = 60*minuteMs;
  9. const dayMs = 24*hourMs;
  10. let instance = null;
  11. //singleton
  12. class BUCServer {
  13. constructor(config) {
  14. if (!instance) {
  15. this.config = config;
  16. //константы
  17. if (this.config.branch !== 'development') {
  18. this.maxCheckQueueLength = 10000;//максимальная длина checkQueue
  19. this.fillCheckQueuePeriod = 1*minuteMs;//период пополнения очереди
  20. this.periodicCheckWait = 500;//пауза, если нечего делать
  21. this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
  22. this.oldQueryInterval = 14*dayMs;//интервал устаревания запроса на обновление
  23. this.checkingInterval = 1*dayMs;//интервал проверки обновления одного и того же файла
  24. this.sameHostCheckInterval = 10*1000;//интервал проверки файла на том же сайте, не менее
  25. } else {
  26. this.maxCheckQueueLength = 10;//максимальная длина checkQueue
  27. this.fillCheckQueuePeriod = 10*1000;//период пополнения очереди
  28. this.periodicCheckWait = 500;//пауза, если нечего делать
  29. this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
  30. this.oldQueryInterval = 30*dayMs;//интервал устаревания запроса на обновление
  31. this.checkingInterval = 30*1000;//интервал проверки обновления одного и того же файла
  32. this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее
  33. }
  34. this.config.tempDownloadDir = `${config.tempDir}/download`;
  35. fs.ensureDirSync(this.config.tempDownloadDir);
  36. this.down = new FileDownloader(config.maxUploadFileSize);
  37. this.connManager = new JembaConnManager();
  38. this.db = this.connManager.db['book-update-server'];
  39. this.checkQueue = [];
  40. this.hostChecking = {};
  41. this.shciForHost = this.config.shciForHost || {};//sameHostCheckInterval for host
  42. this.main(); //no await
  43. instance = this;
  44. }
  45. return instance;
  46. }
  47. async getBuc(fromCheckTime, callback) {
  48. const db = this.db;
  49. const iterName = utils.randomHexString(30);
  50. while (1) {//eslint-disable-line
  51. const rows = await db.select({
  52. table: 'buc',
  53. where: `
  54. let iter = @getItem(${db.esc(iterName)});
  55. if (!iter) {
  56. iter = @dirtyIndexLR('checkTime', ${db.esc(fromCheckTime)});
  57. iter = iter.values();
  58. @setItem(${db.esc(iterName)}, iter);
  59. }
  60. const ids = new Set();
  61. let id = iter.next();
  62. while (!id.done) {
  63. ids.add(id.value);
  64. if (ids.size >= 100)
  65. break;
  66. id = iter.next();
  67. }
  68. return ids;
  69. `
  70. });
  71. if (rows.length)
  72. callback(rows);
  73. else
  74. break;
  75. }
  76. await db.select({
  77. table: 'buc',
  78. where: `
  79. @delItem(${db.esc(iterName)});
  80. return new Set();
  81. `
  82. });
  83. }
  84. async updateBuc(bookUrls) {
  85. const db = this.db;
  86. const now = Date.now();
  87. const rows = await db.select({
  88. table: 'buc',
  89. map: `(r) => ({id: r.id})`,
  90. where: `@@id(${db.esc(bookUrls)})`
  91. });
  92. const exists = new Set();
  93. for (const row of rows) {
  94. exists.add(row.id);
  95. }
  96. const toUpdateIds = [];
  97. const toInsertRows = [];
  98. for (let id of bookUrls) {
  99. if (!id)
  100. continue;
  101. if (id.length > 1000) {
  102. id = id.substring(0, 1000);
  103. }
  104. if (exists.has(id)) {
  105. toUpdateIds.push(id);
  106. } else {
  107. toInsertRows.push({
  108. id,
  109. queryTime: now,
  110. checkTime: 0, // 0 - never checked
  111. etag: '',
  112. modTime: '',
  113. size: 0,
  114. checkSum: '', //sha256
  115. state: 0, // 0 - not processing, 1 - processing
  116. error: '',
  117. });
  118. }
  119. }
  120. if (toUpdateIds.length) {
  121. await db.update({
  122. table: 'buc',
  123. mod: `(r) => r.queryTime = ${db.esc(now)}`,
  124. where: `@@id(${db.esc(toUpdateIds)})`
  125. });
  126. }
  127. if (toInsertRows.length) {
  128. await db.insert({
  129. table: 'buc',
  130. ignore: true,
  131. rows: toInsertRows,
  132. });
  133. }
  134. }
  135. async fillCheckQueue() {
  136. const db = this.db;
  137. while (1) {//eslint-disable-line
  138. try {
  139. let now = Date.now();
  140. //чистка совсем устаревших
  141. let rows = await db.select({
  142. table: 'buc',
  143. where: `@@dirtyIndexLR('queryTime', undefined, ${db.esc(now - this.cleanQueryInterval)})`
  144. });
  145. if (rows.length) {
  146. const ids = rows.map((r) => r.id);
  147. const res = await db.delete({
  148. table: 'buc',
  149. where: `@@id(${db.esc(ids)})`,
  150. });
  151. log(LM_WARN, `clean 'buc' table: deleted ${res.deleted}`);
  152. }
  153. rows = await db.select({table: 'buc', count: true});
  154. log(LM_WARN, `'buc' table size: ${rows[0].count}`);
  155. now = Date.now();
  156. //выборка кандидатов
  157. rows = await db.select({
  158. table: 'buc',
  159. where: `
  160. @@and(
  161. @dirtyIndexLR('queryTime', ${db.esc(now - this.oldQueryInterval)}),
  162. @dirtyIndexLR('checkTime', undefined, ${db.esc(now - this.checkingInterval)}),
  163. @flag('notProcessing')
  164. );
  165. `
  166. });
  167. //формирование checkQueue
  168. if (rows.length) {
  169. const ids = [];
  170. const rowsToPush = [];
  171. //сначала выберем сколько надо
  172. for (const row of rows) {
  173. if (this.checkQueue.length + rowsToPush.length >= this.maxCheckQueueLength)
  174. break;
  175. rowsToPush.push(row);
  176. ids.push(row.id);
  177. }
  178. //установим у них флаг "в обработке"
  179. await db.update({
  180. table: 'buc',
  181. mod: `(r) => r.state = 1`,
  182. where: `@@id(${db.esc(ids)})`
  183. });
  184. //пушим в очередь, после этого их обработает periodicCheck
  185. for (const row of rowsToPush) {
  186. this.checkQueue.push(row);
  187. log(LM_INFO, ` add ${row.id}`);
  188. }
  189. log(LM_WARN, `checkQueue: added ${ids.length} recs, total ${this.checkQueue.length}`);
  190. }
  191. } catch(e) {
  192. log(LM_ERR, e.stack);
  193. }
  194. await utils.sleep(this.fillCheckQueuePeriod);
  195. }
  196. }
  197. async periodicCheck() {
  198. const db = this.db;
  199. while (1) {//eslint-disable-line
  200. try {
  201. if (!this.checkQueue.length)
  202. await utils.sleep(this.periodicCheckWait);
  203. if (!this.checkQueue.length)
  204. continue;
  205. const row = this.checkQueue.shift();
  206. const url = new URL(row.id);
  207. //только если обращались к тому же хосту не ранее sameHostCheckInterval миллисекунд назад
  208. if (!this.hostChecking[url.hostname]) {
  209. this.hostChecking[url.hostname] = true;
  210. try {
  211. let unchanged = true;
  212. let hash = '';
  213. const headers = await this.down.head(row.id, {timeout: 10*1000});
  214. const etag = headers['etag'] || '';
  215. const modTime = headers['last-modified'] || '';
  216. let size = parseInt(headers['content-length'], 10) || 0;
  217. //log(row.id);
  218. //log(`etag: ${etag}, modTime: ${modTime}, size: ${size}`)
  219. if ((!etag || !row.etag || (etag !== row.etag))
  220. && (!modTime || !row.modTime || (modTime !== row.modTime))
  221. && (!size || !row.size || (size !== row.size))
  222. ) {
  223. const downdata = await this.down.load(row.id, {timeout: 10*1000});
  224. size = downdata.length;
  225. hash = await utils.getBufHash(downdata, 'sha256', 'hex');
  226. unchanged = false;
  227. }
  228. await db.update({
  229. table: 'buc',
  230. mod: `(r) => {
  231. r.checkTime = ${db.esc(Date.now())};
  232. r.etag = ${(unchanged ? 'r.etag' : db.esc(etag))};
  233. r.modTime = ${(unchanged ? 'r.modTime' : db.esc(modTime))};
  234. r.size = ${(unchanged ? 'r.size' : db.esc(size))};
  235. r.checkSum = ${(unchanged ? 'r.checkSum' : db.esc(hash))};
  236. r.state = 0;
  237. r.error = '';
  238. }`,
  239. where: `@@id(${db.esc(row.id)})`
  240. });
  241. if (unchanged) {
  242. log(`checked ${row.id} > unchanged`);
  243. } else {
  244. log(`checked ${row.id} > size ${size}`);
  245. }
  246. } catch (e) {
  247. await db.update({
  248. table: 'buc',
  249. mod: `(r) => {
  250. r.checkTime = ${db.esc(Date.now())};
  251. r.state = 0;
  252. r.error = ${db.esc(e.message)};
  253. }`,
  254. where: `@@id(${db.esc(row.id)})`
  255. });
  256. log(LM_ERR, `error ${row.id} > ${e.stack ? e.stack : e.message}`);
  257. } finally {
  258. (async() => {
  259. let sameHostCheckInterval = this.shciForHost[url.hostname] || this.sameHostCheckInterval;
  260. sameHostCheckInterval = Math.round((Math.random() - 0.5)*(sameHostCheckInterval*0.2) + sameHostCheckInterval);
  261. log(`delay ${sameHostCheckInterval}ms for host '${url.hostname}'`);
  262. await utils.sleep(sameHostCheckInterval);
  263. this.hostChecking[url.hostname] = false;
  264. })();
  265. }
  266. } else {
  267. this.checkQueue.push(row);
  268. }
  269. } catch(e) {
  270. log(LM_ERR, e.stack);
  271. }
  272. await utils.sleep(100);
  273. }
  274. }
  275. async main() {
  276. try {
  277. //обнуляем все статусы
  278. await this.db.update({table: 'buc', mod: `(r) => r.state = 0`});
  279. this.fillCheckQueue();//no await
  280. //10 потоков
  281. for (let i = 0; i < 10; i++)
  282. this.periodicCheck();//no await
  283. log(`-------------------------`);
  284. log(`BUC Server Worker started`);
  285. log(`-------------------------`);
  286. } catch (e) {
  287. log(LM_FATAL, e.stack);
  288. ayncExit.exit(1);
  289. }
  290. }
  291. }
  292. module.exports = BUCServer;