JembaConnManager.js 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. const fs = require('fs-extra');
  2. const _ = require('lodash');
  3. const ayncExit = new (require('../core/AsyncExit'))();//singleton
  4. const { JembaDb, JembaDbThread } = require('jembadb');
  5. const log = new (require('../core/AppLogger'))().log;//singleton
  6. const jembaMigrations = require('./jembaMigrations');
  7. let instance = null;
  8. //singleton
  9. class JembaConnManager {
  10. constructor() {
  11. if (!instance) {
  12. this.inited = false;
  13. this._db = {};
  14. instance = this;
  15. }
  16. return instance;
  17. }
  18. async init(config, forceAutoRepair = false, migs = jembaMigrations, undoLastMigration = false) {
  19. if (this.inited)
  20. throw new Error('JembaConnManager initialized already');
  21. this.config = config;
  22. this._db = {};
  23. ayncExit.add(this.close.bind(this));
  24. const serverModes = new Set();
  25. for (const serverCfg of this.config.servers) {
  26. serverModes.add(serverCfg.mode);
  27. }
  28. for (const dbConfig of this.config.jembaDb) {
  29. //проверка, надо ли открывать базу, зависит от serverMode
  30. if (dbConfig.serverMode) {
  31. let serverMode = dbConfig.serverMode;
  32. if (!Array.isArray(dbConfig.serverMode))
  33. serverMode = [dbConfig.serverMode];
  34. let modePresent = false;
  35. for (const mode of serverMode) {
  36. modePresent = serverModes.has(mode);
  37. if (modePresent)
  38. break;
  39. }
  40. if (!modePresent)
  41. continue;
  42. }
  43. const dbPath = `${this.config.dataDir}/db/${dbConfig.dbName}`;
  44. //бэкап
  45. if (!dbConfig.noBak && await fs.pathExists(dbPath)) {
  46. const bakFile = `${dbPath}.bak`;
  47. await fs.remove(bakFile);
  48. await fs.copy(dbPath, bakFile);
  49. }
  50. let dbConn = null;
  51. if (dbConfig.thread) {
  52. dbConn = new JembaDbThread();
  53. } else {
  54. dbConn = new JembaDb();
  55. }
  56. this._db[dbConfig.dbName] = dbConn;
  57. log(`Open "${dbConfig.dbName}" begin`);
  58. await dbConn.lock({
  59. dbPath,
  60. create: true,
  61. softLock: true,
  62. tableDefaults: {
  63. cacheSize: dbConfig.cacheSize,
  64. compressed: dbConfig.compressed,
  65. forceFileClosing: dbConfig.forceFileClosing,
  66. typeCompatMode: true,
  67. },
  68. });
  69. if (dbConfig.openAll || forceAutoRepair || dbConfig.autoRepair) {
  70. try {
  71. await dbConn.openAll();
  72. } catch(e) {
  73. if ((forceAutoRepair || dbConfig.autoRepair) &&
  74. (
  75. e.message.indexOf('corrupted') >= 0
  76. || e.message.indexOf('Unexpected token') >= 0
  77. || e.message.indexOf('invalid stored block lengths') >= 0
  78. )
  79. ) {
  80. log(LM_ERR, e);
  81. log(`Open "${dbConfig.dbName}" with auto repair`);
  82. await dbConn.openAll({autoRepair: true});
  83. } else {
  84. throw e;
  85. }
  86. }
  87. }
  88. log(`Open "${dbConfig.dbName}" end`);
  89. //миграции
  90. const mig = migs[dbConfig.dbName];
  91. if (mig && mig.data) {
  92. const applied = await this.migrate(dbConn, mig.data, mig.table, undoLastMigration);
  93. if (applied.length)
  94. log(`${applied.length} migrations applied to "${dbConfig.dbName}"`);
  95. }
  96. }
  97. this.inited = true;
  98. }
  99. async close() {
  100. for (const dbConfig of this.config.jembaDb) {
  101. if (this._db[dbConfig.dbName])
  102. await this._db[dbConfig.dbName].unlock();
  103. }
  104. this._db = {};
  105. this.inited = false;
  106. }
  107. async migrate(db, migs, table, undoLastMigration) {
  108. const migrations = _.cloneDeep(migs).sort((a, b) => a.id - b.id);
  109. if (!migrations.length) {
  110. throw new Error('No migration data');
  111. }
  112. migrations.map(migration => {
  113. const data = migration.data;
  114. if (!data.up || !data.down) {
  115. throw new Error(`The ${migration.id}:${migration.name} does not contain 'up' or 'down' instructions`);
  116. } else {
  117. migration.up = data.up;
  118. migration.down = data.down;
  119. }
  120. delete migration.data;
  121. });
  122. // Create a database table for migrations meta data if it doesn't exist
  123. // id, name, up, down
  124. await db.create({
  125. table,
  126. quietIfExists: true,
  127. });
  128. // Get the list of already applied migrations
  129. let dbMigrations = await db.select({
  130. table,
  131. sort: '(a, b) => a.id - b.id'
  132. });
  133. const execUpDown = async(items) => {
  134. for (const item of items) {
  135. const action = item[0];
  136. await db[action](item[1]);
  137. }
  138. };
  139. // Undo migrations that exist only in the database but not in migs,
  140. // also undo the last migration if the undoLastMigration
  141. const lastMigration = migrations[migrations.length - 1];
  142. for (const migration of dbMigrations.slice().sort((a, b) => b.id - a.id)) {
  143. if (!migrations.some(x => x.id === migration.id) ||
  144. (undoLastMigration && migration.id === lastMigration.id)) {
  145. await execUpDown(migration.down);
  146. await db.delete({
  147. table,
  148. where: `@@id(${db.esc(migration.id)})`
  149. });
  150. dbMigrations = dbMigrations.filter(x => x.id !== migration.id);
  151. } else {
  152. break;
  153. }
  154. }
  155. // Apply pending migrations
  156. let applied = [];
  157. const lastMigrationId = dbMigrations.length ? dbMigrations[dbMigrations.length - 1].id : 0;
  158. for (const migration of migrations) {
  159. if (migration.id > lastMigrationId) {
  160. await execUpDown(migration.up);
  161. await db.insert({
  162. table,
  163. rows: [migration],
  164. });
  165. applied.push(migration.id);
  166. }
  167. }
  168. return applied;
  169. }
  170. get db() {
  171. if (!this.inited)
  172. throw new Error('JembaConnManager not inited');
  173. return this._db;
  174. }
  175. }
  176. module.exports = JembaConnManager;