ReaderStorage.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. const SQL = require('sql-template-strings');
  2. const _ = require('lodash');
  3. const ConnManager = require('../db/ConnManager');//singleton
  4. let instance = null;
  5. //singleton
  6. class ReaderStorage {
  7. constructor() {
  8. if (!instance) {
  9. this.connManager = new ConnManager();
  10. this.storagePool = this.connManager.pool.readerStorage;
  11. this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
  12. instance = this;
  13. }
  14. return instance;
  15. }
  16. async doAction(act) {
  17. if (!_.isObject(act.items))
  18. throw new Error('items is not an object');
  19. let result = {};
  20. switch (act.action) {
  21. case 'check':
  22. result = await this.checkItems(act.items);
  23. break;
  24. case 'get':
  25. result = await this.getItems(act.items);
  26. break;
  27. case 'set':
  28. result = await this.setItems(act.items, act.force);
  29. break;
  30. default:
  31. throw new Error('Unknown action');
  32. }
  33. return result;
  34. }
  35. async checkItems(items) {
  36. let result = {state: 'success', items: {}};
  37. const dbh = await this.storagePool.get();
  38. try {
  39. for (const id of Object.keys(items)) {
  40. if (this.cache[id]) {
  41. result.items[id] = this.cache[id];
  42. } else {
  43. const rows = await dbh.all(SQL`SELECT rev FROM storage WHERE id = ${id}`);
  44. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  45. result.items[id] = {rev};
  46. this.cache[id] = result.items[id];
  47. }
  48. }
  49. } finally {
  50. dbh.ret();
  51. }
  52. return result;
  53. }
  54. async getItems(items) {
  55. let result = {state: 'success', items: {}};
  56. const dbh = await this.storagePool.get();
  57. try {
  58. for (const id of Object.keys(items)) {
  59. const rows = await dbh.all(SQL`SELECT rev, data FROM storage WHERE id = ${id}`);
  60. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  61. const data = (rows.length && rows[0].data ? rows[0].data : '');
  62. result.items[id] = {rev, data};
  63. }
  64. } finally {
  65. dbh.ret();
  66. }
  67. return result;
  68. }
  69. async setItems(items, force) {
  70. let check = await this.checkItems(items);
  71. //сначала проверим совпадение ревизий
  72. for (const id of Object.keys(items)) {
  73. if (!_.isString(items[id].data))
  74. throw new Error('items.data is not a string');
  75. if (!force && check.items[id].rev + 1 !== items[id].rev)
  76. return {state: 'reject', items: check.items};
  77. }
  78. const dbh = await this.storagePool.get();
  79. await dbh.run('BEGIN');
  80. try {
  81. const newRev = {};
  82. for (const id of Object.keys(items)) {
  83. await dbh.run(SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`);
  84. newRev[id] = {rev: items[id].rev};
  85. }
  86. await dbh.run('COMMIT');
  87. Object.assign(this.cache, newRev);
  88. } catch (e) {
  89. await dbh.run('ROLLBACK');
  90. throw e;
  91. } finally {
  92. dbh.ret();
  93. }
  94. return {state: 'success'};
  95. }
  96. periodicCleanCache(timeout) {
  97. this.cache = {};
  98. setTimeout(() => {
  99. this.periodicCleanCache(timeout);
  100. }, timeout);
  101. }
  102. }
  103. module.exports = ReaderStorage;