JembaReaderStorage.js 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. const _ = require('lodash');
  2. const utils = require('../utils');
  3. const JembaConnManager = require('../../db/JembaConnManager');//singleton
  4. const log = new (require('../AppLogger'))().log;//singleton
  5. let instance = null;
  6. //singleton
  7. class JembaReaderStorage {
  8. constructor() {
  9. if (!instance) {
  10. this.connManager = new JembaConnManager();
  11. this.db = this.connManager.db['reader-storage'];
  12. this.cacheMap = new Map();
  13. this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
  14. instance = this;
  15. }
  16. return instance;
  17. }
  18. getCache(id) {
  19. const obj = this.cacheMap.get(id);
  20. if (obj)
  21. obj.time = Date.now();
  22. return obj;
  23. }
  24. setCache(id, newObj) {
  25. let obj = this.cacheMap.get(id);
  26. if (!obj)
  27. obj = {};
  28. Object.assign(obj, newObj, {time: Date.now()});
  29. this.cacheMap.set(id, obj);
  30. }
  31. async doAction(act) {
  32. try {
  33. if (!_.isObject(act.items))
  34. throw new Error('items is not an object');
  35. let result = {};
  36. switch (act.action) {
  37. case 'check':
  38. result = await this.checkItems(act.items);
  39. break;
  40. case 'get':
  41. result = await this.getItems(act.items);
  42. break;
  43. case 'set':
  44. result = await this.setItems(act.items, act.identity, act.force);
  45. break;
  46. default:
  47. throw new Error('Unknown action');
  48. }
  49. return result;
  50. } catch (e) {
  51. log(LM_ERR, `JembaReaderStorage: ${e.message}`);
  52. throw e;
  53. }
  54. }
  55. async checkItems(items) {
  56. let result = {state: 'success', items: {}};
  57. const db = this.db;
  58. for (const id of Object.keys(items)) {
  59. const obj = this.getCache(id);
  60. if (obj && obj.items) {
  61. result.items[id] = obj.items;
  62. } else {
  63. const rows = await db.select({//SQL`SELECT rev FROM storage WHERE id = ${id}`
  64. table: 'storage',
  65. map: '(r) => ({rev: r.rev})',
  66. where: `@@id(${db.esc(id)})`
  67. });
  68. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  69. result.items[id] = {rev};
  70. this.setCache(id, {items: result.items[id]});
  71. }
  72. }
  73. return result;
  74. }
  75. async getItems(items) {
  76. let result = {state: 'success', items: {}};
  77. const db = this.db;
  78. for (const id of Object.keys(items)) {
  79. const rows = await db.select({//SQL`SELECT rev, data FROM storage WHERE id = ${id}`);
  80. table: 'storage',
  81. where: `@@id(${db.esc(id)})`
  82. });
  83. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  84. const data = (rows.length && rows[0].data ? rows[0].data : '');
  85. result.items[id] = {rev, data};
  86. }
  87. return result;
  88. }
  89. async setItems(items, identity, force) {
  90. let check = await this.checkItems(items);
  91. //сначала проверим совпадение ревизий
  92. for (const id of Object.keys(items)) {
  93. if (!_.isString(items[id].data))
  94. throw new Error('items.data is not a string');
  95. //identity необходимо для работы при нестабильной связи,
  96. //одному и тому же клиенту разрешается перезаписывать данные при расхождении на 0 или 1 ревизию
  97. const obj = this.getCache(id) || {};
  98. const sameClient = (identity && obj.identity === identity);
  99. if (identity && obj.identity !== identity) {
  100. obj.identity = identity;
  101. this.setCache(id, obj);
  102. }
  103. const revDiff = items[id].rev - check.items[id].rev;
  104. const allowUpdate = force || revDiff === 1 || (sameClient && (revDiff === 0 || revDiff === 1));
  105. if (!allowUpdate)
  106. return {state: 'reject', items: check.items};
  107. }
  108. const db = this.db;
  109. for (const id of Object.keys(items)) {
  110. await db.insert({//SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`);
  111. table: 'storage',
  112. replace: true,
  113. rows: [{id, rev: items[id].rev, time: utils.toUnixTime(Date.now()), data: items[id].data}],
  114. });
  115. this.setCache(id, {items: {rev: items[id].rev}});
  116. }
  117. return {state: 'success'};
  118. }
  119. periodicCleanCache(timeout) {
  120. try {
  121. const sorted = [];
  122. for (const [id, obj] of this.cacheMap)
  123. sorted.push({id, time: obj.time});
  124. sorted.sort((a, b) => b.time - a.time);
  125. for (const obj of sorted) {
  126. //оставляем только 1000 недавних
  127. if (this.cacheMap.size <= 1000)
  128. break;
  129. this.cacheMap.delete(obj.id);
  130. }
  131. } finally {
  132. setTimeout(() => {
  133. this.periodicCleanCache(timeout);
  134. }, timeout);
  135. }
  136. }
  137. }
  138. module.exports = JembaReaderStorage;