JembaReaderStorage.js 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. const _ = require('lodash');
  2. const utils = require('../utils');
  3. const JembaConnManager = require('../../db/JembaConnManager');//singleton
  4. const log = new (require('../AppLogger'))().log;//singleton
  5. let instance = null;
  6. //singleton
  7. class JembaReaderStorage {
  8. constructor() {
  9. if (!instance) {
  10. this.connManager = new JembaConnManager();
  11. this.db = this.connManager.db['reader-storage'];
  12. this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
  13. instance = this;
  14. }
  15. return instance;
  16. }
  17. async doAction(act) {
  18. try {
  19. if (!_.isObject(act.items))
  20. throw new Error('items is not an object');
  21. let result = {};
  22. switch (act.action) {
  23. case 'check':
  24. result = await this.checkItems(act.items);
  25. break;
  26. case 'get':
  27. result = await this.getItems(act.items);
  28. break;
  29. case 'set':
  30. result = await this.setItems(act.items, act.force);
  31. break;
  32. default:
  33. throw new Error('Unknown action');
  34. }
  35. return result;
  36. } catch (e) {
  37. log(LM_ERR, `JembaReaderStorage: ${e.message}`);
  38. throw e;
  39. }
  40. }
  41. async checkItems(items) {
  42. let result = {state: 'success', items: {}};
  43. const db = this.db;
  44. for (const id of Object.keys(items)) {
  45. if (this.cache[id]) {
  46. result.items[id] = this.cache[id];
  47. } else {
  48. const rows = await db.select({//SQL`SELECT rev FROM storage WHERE id = ${id}`
  49. table: 'storage',
  50. map: '(r) => ({rev: r.rev})',
  51. where: `@@id(${db.esc(id)})`
  52. });
  53. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  54. result.items[id] = {rev};
  55. this.cache[id] = result.items[id];
  56. }
  57. }
  58. return result;
  59. }
  60. async getItems(items) {
  61. let result = {state: 'success', items: {}};
  62. const db = this.db;
  63. for (const id of Object.keys(items)) {
  64. const rows = await db.select({//SQL`SELECT rev, data FROM storage WHERE id = ${id}`);
  65. table: 'storage',
  66. where: `@@id(${db.esc(id)})`
  67. });
  68. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  69. const data = (rows.length && rows[0].data ? rows[0].data : '');
  70. result.items[id] = {rev, data};
  71. }
  72. return result;
  73. }
  74. async setItems(items, force) {
  75. let check = await this.checkItems(items);
  76. //сначала проверим совпадение ревизий
  77. for (const id of Object.keys(items)) {
  78. if (!_.isString(items[id].data))
  79. throw new Error('items.data is not a string');
  80. if (!force && check.items[id].rev + 1 !== items[id].rev)
  81. return {state: 'reject', items: check.items};
  82. }
  83. const db = this.db;
  84. const newRev = {};
  85. for (const id of Object.keys(items)) {
  86. await db.insert({//SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`);
  87. table: 'storage',
  88. replace: true,
  89. rows: [{id, rev: items[id].rev, time: utils.toUnixTime(Date.now()), data: items[id].data}],
  90. });
  91. newRev[id] = {rev: items[id].rev};
  92. }
  93. Object.assign(this.cache, newRev);
  94. return {state: 'success'};
  95. }
  96. periodicCleanCache(timeout) {
  97. this.cache = {};
  98. setTimeout(() => {
  99. this.periodicCleanCache(timeout);
  100. }, timeout);
  101. }
  102. }
  103. module.exports = JembaReaderStorage;