JembaReaderStorage.js 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. const _ = require('lodash');
  2. const utils = require('../utils');
  3. const JembaConnManager = require('../../db/JembaConnManager');//singleton
  4. const log = new (require('../AppLogger'))().log;//singleton
  5. let instance = null;
  6. //singleton
  7. class JembaReaderStorage {
  8. constructor() {
  9. if (!instance) {
  10. this.connManager = new JembaConnManager();
  11. this.db = this.connManager.db['reader-storage'];
  12. this.cacheMap = new Map();
  13. this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
  14. instance = this;
  15. }
  16. return instance;
  17. }
  18. getCache(id) {
  19. const obj = this.cacheMap.get(id);
  20. //обновляем время доступа и при чтении тоже
  21. if (obj)
  22. obj.time = Date.now();
  23. return obj;
  24. }
  25. setCache(id, newObj) {
  26. let obj = this.cacheMap.get(id);
  27. if (!obj)
  28. obj = {};
  29. Object.assign(obj, newObj, {time: Date.now()});
  30. this.cacheMap.set(id, obj);
  31. }
  32. async doAction(act) {
  33. try {
  34. if (!_.isObject(act.items))
  35. throw new Error('items is not an object');
  36. let result = {};
  37. switch (act.action) {
  38. case 'check':
  39. result = await this.checkItems(act.items);
  40. break;
  41. case 'get':
  42. result = await this.getItems(act.items);
  43. break;
  44. case 'set':
  45. result = await this.setItems(act.items, act.identity, act.force);
  46. break;
  47. default:
  48. throw new Error('Unknown action');
  49. }
  50. return result;
  51. } catch (e) {
  52. log(LM_ERR, `JembaReaderStorage: ${e.message}`);
  53. throw e;
  54. }
  55. }
  56. async checkItems(items) {
  57. let result = {state: 'success', items: {}};
  58. const db = this.db;
  59. for (const id of Object.keys(items)) {
  60. const obj = this.getCache(id);
  61. if (obj && obj.items) {
  62. result.items[id] = obj.items;
  63. } else {
  64. const rows = await db.select({//SQL`SELECT rev FROM storage WHERE id = ${id}`
  65. table: 'storage',
  66. map: '(r) => ({rev: r.rev})',
  67. where: `@@id(${db.esc(id)})`
  68. });
  69. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  70. result.items[id] = {rev};
  71. this.setCache(id, {items: result.items[id]});
  72. }
  73. }
  74. return result;
  75. }
  76. async getItems(items) {
  77. let result = {state: 'success', items: {}};
  78. const db = this.db;
  79. for (const id of Object.keys(items)) {
  80. const rows = await db.select({//SQL`SELECT rev, data FROM storage WHERE id = ${id}`);
  81. table: 'storage',
  82. where: `@@id(${db.esc(id)})`
  83. });
  84. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  85. const data = (rows.length && rows[0].data ? rows[0].data : '');
  86. result.items[id] = {rev, data};
  87. }
  88. return result;
  89. }
  90. async setItems(items, identity, force) {
  91. let check = await this.checkItems(items);
  92. //сначала проверим совпадение ревизий
  93. for (const id of Object.keys(items)) {
  94. if (!_.isString(items[id].data))
  95. throw new Error('items.data is not a string');
  96. //identity необходимо для работы при нестабильной связи,
  97. //одному и тому же клиенту разрешается перезаписывать данные при расхождении на 0 или 1 ревизию
  98. const obj = this.getCache(id) || {};
  99. const oldIdentity = obj.identity;
  100. const sameClient = (identity && obj.identity === identity);
  101. if (identity && obj.identity !== identity) {
  102. obj.identity = identity;
  103. this.setCache(id, obj);
  104. }
  105. const revDiff = items[id].rev - check.items[id].rev;
  106. const allowUpdate = force || revDiff === 1 || (sameClient && (revDiff === 0 || revDiff === 1));
  107. if (!allowUpdate) {
  108. log(LM_ERR, `JembaReaderStorage-Reject: revDiff: ${revDiff}, sameClient: ${sameClient}, oldIdentity: ${oldIdentity}, identity: ${identity}`);
  109. return {state: 'reject', items: check.items};
  110. }
  111. }
  112. const db = this.db;
  113. for (const id of Object.keys(items)) {
  114. await db.insert({//SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`);
  115. table: 'storage',
  116. replace: true,
  117. rows: [{id, rev: items[id].rev, time: utils.toUnixTime(Date.now()), data: items[id].data}],
  118. });
  119. this.setCache(id, {items: {rev: items[id].rev}});
  120. }
  121. return {state: 'success'};
  122. }
  123. periodicCleanCache(timeout) {
  124. try {
  125. const sorted = [];
  126. for (const [id, obj] of this.cacheMap)
  127. sorted.push({id, time: obj.time});
  128. sorted.sort((a, b) => b.time - a.time);
  129. for (const obj of sorted) {
  130. //оставляем только 1000 недавних
  131. if (this.cacheMap.size <= 1000)
  132. break;
  133. this.cacheMap.delete(obj.id);
  134. }
  135. } finally {
  136. setTimeout(() => {
  137. this.periodicCleanCache(timeout);
  138. }, timeout);
  139. }
  140. }
  141. }
  142. module.exports = JembaReaderStorage;