readerStorage.js 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. const SQL = require('sql-template-strings');
  2. const _ = require('lodash');
  3. const connManager = require('../db/connManager');
  4. class ReaderStorage {
  5. constructor() {
  6. this.storagePool = connManager.pool.readerStorage;
  7. this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
  8. }
  9. async doAction(act) {
  10. let result = {};
  11. switch (act.action) {
  12. case 'check':
  13. result = await this.checkItems(act.items);
  14. break;
  15. case 'get':
  16. result = await this.getItems(act.items);
  17. break;
  18. case 'set':
  19. result = await this.setItems(act.items, act.force);
  20. break;
  21. default:
  22. throw new Error('Unknown action');
  23. }
  24. return result;
  25. }
  26. async checkItems(items) {
  27. let result = {state: 'success', items: {}};
  28. const dbh = await this.storagePool.get();
  29. try {
  30. for (const id of Object.keys(items)) {
  31. if (this.cache[id]) {
  32. result.items[id] = this.cache[id];
  33. } else {
  34. const rows = await dbh.all(SQL`SELECT rev FROM storage WHERE id = ${id}`);
  35. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  36. result.items[id] = {rev};
  37. this.cache[id] = result.items[id];
  38. }
  39. }
  40. } finally {
  41. dbh.ret();
  42. }
  43. return result;
  44. }
  45. async getItems(items) {
  46. let result = {state: 'success', items: {}};
  47. const dbh = await this.storagePool.get();
  48. try {
  49. for (const id of Object.keys(items)) {
  50. const rows = await dbh.all(SQL`SELECT rev, data FROM storage WHERE id = ${id}`);
  51. const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
  52. const data = (rows.length && rows[0].data ? rows[0].data : '');
  53. result.items[id] = {rev, data};
  54. }
  55. } finally {
  56. dbh.ret();
  57. }
  58. return result;
  59. }
  60. async setItems(items, force) {
  61. let check = await this.checkItems(items);
  62. //сначала проверим совпадение ревизий
  63. for (const id of Object.keys(items)) {
  64. if (!_.isString(items[id].data))
  65. throw new Error('items.data is not a string');
  66. if (!force && check.items[id].rev + 1 !== items[id].rev)
  67. return {state: 'reject', items: check.items};
  68. }
  69. const dbh = await this.storagePool.get();
  70. try {
  71. for (const id of Object.keys(items)) {
  72. await dbh.run(SQL`INSERT OR REPLACE INTO storage (id, rev, data) VALUES (${id}, ${items[id].rev}, ${items[id].data})`);
  73. this.cache[id] = {rev: items[id].rev};
  74. }
  75. } finally {
  76. dbh.ret();
  77. }
  78. return {state: 'success'};
  79. }
  80. periodicCleanCache(timeout) {
  81. this.cache = {};
  82. setTimeout(() => {
  83. this.periodicCleanCache(timeout);
  84. }, timeout);
  85. }
  86. }
  87. const readerStorage = new ReaderStorage();
  88. module.exports = readerStorage;