BUCClient.js 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. const WebSocketConnection = require('../WebSocketConnection');
  2. const JembaConnManager = require('../../db/JembaConnManager');//singleton
  3. const ayncExit = new (require('../AsyncExit'))();
  4. const utils = require('../utils');
  5. const log = new (require('../AppLogger'))().log;//singleton
  6. const minuteMs = 60*1000;
  7. const hourMs = 60*minuteMs;
  8. const dayMs = 24*hourMs;
  9. let instance = null;
  10. //singleton
  11. class BUCClient {
  12. constructor(config) {
  13. if (!instance) {
  14. this.config = config;
  15. this.connManager = new JembaConnManager();
  16. this.appDb = this.connManager.db['app'];
  17. this.wsc = new WebSocketConnection(config.bucServer.url, 10, 30, {rejectUnauthorized: false});
  18. this.accessToken = config.bucServer.accessToken;
  19. //константы
  20. if (this.config.branch !== 'development') {
  21. this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
  22. this.syncPeriod = 1*hourMs;//период синхронизации с сервером BUC
  23. this.sendBookUrlsPeriod = 1*minuteMs;//период отправки BookUrls на сервер BUC
  24. } else {
  25. this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
  26. this.syncPeriod = 1*minuteMs;//период синхронизации с сервером BUC
  27. this.sendBookUrlsPeriod = 1*1000;//период отправки BookUrls на сервер BUC
  28. }
  29. this.fromCheckTime = 1;
  30. this.bookUrls = new Set();
  31. this.main();//no await
  32. instance = this;
  33. }
  34. return instance;
  35. }
  36. async wsRequest(query) {
  37. const response = await this.wsc.message(
  38. await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 60),
  39. 60
  40. );
  41. if (response.error)
  42. throw new Error(response.error);
  43. return response;
  44. }
  45. async wsGetBuc(fromCheckTime, callback) {
  46. const requestId = await this.wsc.send({accessToken: this.accessToken, action: 'get-buc', fromCheckTime}, 60);
  47. while (1) {//eslint-disable-line
  48. const res = await this.wsc.message(requestId, 60);
  49. if (res.state == 'get') {
  50. await callback(res.rows);
  51. } else {
  52. break;
  53. }
  54. }
  55. }
  56. async wsUpdateBuc(bookUrls) {
  57. return await this.wsRequest({action: 'update-buc', bookUrls});
  58. }
  59. async checkBuc(bookUrls) {
  60. const db = this.appDb;
  61. for (const url of bookUrls)
  62. this.bookUrls.add(url);
  63. const rows = await db.select({
  64. table: 'buc',
  65. map: `(r) => ({id: r.id, size: r.size})`,
  66. where: `@@id(${db.esc(bookUrls)})`,
  67. });
  68. return rows;
  69. }
  70. async findMaxCheckTime() {
  71. const db = this.appDb;
  72. let result = 1;
  73. //одним куском, возможно будет жрать память
  74. const rows = await db.select({
  75. table: 'buc',
  76. where: `
  77. const result = new Set();
  78. let max = 0;
  79. let maxId = null;
  80. @iter(@all(), (row) => {
  81. if (row.checkTime > max) {
  82. max = row.checkTime;
  83. maxId = row.id;
  84. }
  85. });
  86. if (maxId)
  87. result.add(maxId);
  88. return result;
  89. `
  90. });
  91. if (rows.length)
  92. result = rows[0].checkTime;
  93. return result;
  94. }
  95. async periodicSendBookUrls() {
  96. while (1) {//eslint-disable-line
  97. try {
  98. //отправим this.bookUrls
  99. if (this.bookUrls.size) {
  100. log(`client: remote update buc begin`);
  101. const arr = Array.from(this.bookUrls);
  102. this.bookUrls = new Set();
  103. const chunkSize = 100;
  104. let updated = 0;
  105. for (let i = 0; i < arr.length; i += chunkSize) {
  106. const chunk = arr.slice(i, i + chunkSize);
  107. const res = await this.wsUpdateBuc(chunk);
  108. if (!res.error && res.state == 'success') {
  109. //update success
  110. updated += chunk.length;
  111. } else {
  112. for (const url of chunk) {
  113. this.bookUrls.add(url);
  114. }
  115. log(LM_ERR, `update-buc error: ${(res.error ? res.error : `wrong state "${res.state}"`)}`);
  116. }
  117. }
  118. log(`client: remote update buc end, updated ${updated} urls`);
  119. }
  120. } catch (e) {
  121. log(LM_ERR, e.stack);
  122. }
  123. await utils.sleep(this.sendBookUrlsPeriod);
  124. }
  125. }
  126. async periodicSync() {
  127. const db = this.appDb;
  128. while (1) {//eslint-disable-line
  129. try {
  130. //почистим нашу таблицу 'buc'
  131. log(`client: clean 'buc' table begin`);
  132. const cleanTime = Date.now() - this.cleanQueryInterval;
  133. while (1) {//eslint-disable-line
  134. //выборка всех по кусочкам
  135. const rows = await db.select({
  136. table: 'buc',
  137. where: `
  138. let iter = @getItem('clean');
  139. if (!iter) {
  140. iter = @all();
  141. @setItem('clean', iter);
  142. }
  143. const ids = new Set();
  144. let id = iter.next();
  145. while (!id.done && ids.size < 1000) {
  146. ids.add(id.value);
  147. id = iter.next();
  148. }
  149. return ids;
  150. `
  151. });
  152. if (rows.length) {
  153. const toDelIds = [];
  154. for (const row of rows)
  155. if (row.queryTime <= cleanTime)
  156. toDelIds.push(row.id);
  157. //удаление
  158. const res = await db.delete({
  159. table: 'buc',
  160. where: `@@id(${db.esc(toDelIds)})`,
  161. });
  162. log(`client: clean 'buc' deleted ${res.deleted}`);
  163. } else {
  164. break;
  165. }
  166. }
  167. await db.select({
  168. table: 'buc',
  169. where: `
  170. @delItem('clean');
  171. return new Set();
  172. `
  173. });
  174. log(`client: clean 'buc' table end`);
  175. //синхронизация с сервером BUC
  176. log(`client: sync 'buc' table begin`);
  177. this.fromCheckTime -= 30*minuteMs;//минус полчаса на всякий случай
  178. await this.wsGetBuc(this.fromCheckTime, async(rows) => {
  179. for (const row of rows) {
  180. if (row.checkTime > this.fromCheckTime)
  181. this.fromCheckTime = row.checkTime;
  182. }
  183. const res = await db.insert({
  184. table: 'buc',
  185. replace: true,
  186. rows
  187. });
  188. log(`client: sync 'buc' table, inserted ${res.inserted} rows, replaced ${res.replaced}`);
  189. });
  190. log(`client: sync 'buc' table end`);
  191. } catch (e) {
  192. log(LM_ERR, e.stack);
  193. }
  194. await utils.sleep(this.syncPeriod);
  195. }
  196. }
  197. async main() {
  198. try {
  199. if (!this.config.bucEnabled)
  200. throw new Error('BookUpdateChecker disabled');
  201. this.fromCheckTime = await this.findMaxCheckTime();
  202. this.periodicSendBookUrls();//no await
  203. this.periodicSync();//no await
  204. log(`BUC Client started`);
  205. } catch (e) {
  206. log(LM_FATAL, e.stack);
  207. ayncExit.exit(1);
  208. }
  209. }
  210. }
  211. module.exports = BUCClient;