JembaReaderStorage.js 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. const _ = require('lodash');
  2. const utils = require('../utils');
  3. const JembaConnManager = require('../../db/JembaConnManager');//singleton
  4. let instance = null;
  5. //singleton
  6. class JembaReaderStorage {
  7. constructor() {
  8. if (!instance) {
  9. this.connManager = new JembaConnManager();
  10. this.db = this.connManager.db['reader-storage'];
  11. this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
  12. instance = this;
  13. }
  14. return instance;
  15. }
  16. async doAction(act) {
  17. if (!_.isObject(act.items))
  18. throw new Error('items is not an object');
  19. let result = {};
  20. switch (act.action) {
  21. case 'check':
  22. result = await this.checkItems(act.items);
  23. break;
  24. case 'get':
  25. result = await this.getItems(act.items);
  26. break;
  27. case 'set':
  28. result = await this.setItems(act.items, act.force);
  29. break;
  30. default:
  31. throw new Error('Unknown action');
  32. }
  33. return result;
  34. }
  35. async checkItems(items) {
  36. let result = {state: 'success', items: {}};
  37. const db = this.db;
  38. for (const id of Object.keys(items)) {
  39. if (this.cache[id]) {
  40. result.items[id] = this.cache[id];
  41. } else {
  42. const rows = await db.select({//SQL`SELECT rev FROM storage WHERE id = ${id}`
  43. table: 'storage',
  44. map: '(r) => ({rev: r.rev})',
  45. where: `@@id(${db.esc(id)})`
  46. });
  47. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  48. result.items[id] = {rev};
  49. this.cache[id] = result.items[id];
  50. }
  51. }
  52. return result;
  53. }
  54. async getItems(items) {
  55. let result = {state: 'success', items: {}};
  56. const db = this.db;
  57. for (const id of Object.keys(items)) {
  58. const rows = await db.select({//SQL`SELECT rev, data FROM storage WHERE id = ${id}`);
  59. table: 'storage',
  60. where: `@@id(${db.esc(id)})`
  61. });
  62. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  63. const data = (rows.length && rows[0].data ? rows[0].data : '');
  64. result.items[id] = {rev, data};
  65. }
  66. return result;
  67. }
  68. async setItems(items, force) {
  69. let check = await this.checkItems(items);
  70. //сначала проверим совпадение ревизий
  71. for (const id of Object.keys(items)) {
  72. if (!_.isString(items[id].data))
  73. throw new Error('items.data is not a string');
  74. if (!force && check.items[id].rev + 1 !== items[id].rev)
  75. return {state: 'reject', items: check.items};
  76. }
  77. const db = this.db;
  78. const newRev = {};
  79. for (const id of Object.keys(items)) {
  80. await db.insert({//SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`);
  81. table: 'storage',
  82. replace: true,
  83. rows: [{id, rev: items[id].rev, time: utils.toUnixTime(Date.now()), data: items[id].data}],
  84. });
  85. newRev[id] = {rev: items[id].rev};
  86. }
  87. Object.assign(this.cache, newRev);
  88. return {state: 'success'};
  89. }
  90. periodicCleanCache(timeout) {
  91. this.cache = {};
  92. setTimeout(() => {
  93. this.periodicCleanCache(timeout);
  94. }, timeout);
  95. }
  96. }
  97. module.exports = JembaReaderStorage;