123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- const fs = require('fs-extra');
- const _ = require('lodash');
- const ayncExit = new (require('../core/AsyncExit'))();//singleton
- const { JembaDb, JembaDbThread } = require('jembadb');
- const log = new (require('../core/AppLogger'))().log;//singleton
- const jembaMigrations = require('./jembaMigrations');
- let instance = null;
- //singleton
- class JembaConnManager {
- constructor() {
- if (!instance) {
- this.inited = false;
- this._db = {};
- instance = this;
- }
- return instance;
- }
- async init(config, forceAutoRepair = false, migs = jembaMigrations, undoLastMigration = false) {
- if (this.inited)
- throw new Error('JembaConnManager initialized already');
- this.config = config;
- this._db = {};
- ayncExit.add(this.close.bind(this));
- const serverModes = new Set();
- for (const serverCfg of this.config.servers) {
- serverModes.add(serverCfg.mode);
- }
- for (const dbConfig of this.config.jembaDb) {
- //проверка, надо ли открывать базу, зависит от serverMode
- if (dbConfig.serverMode) {
- let serverMode = dbConfig.serverMode;
- if (!Array.isArray(dbConfig.serverMode))
- serverMode = [dbConfig.serverMode];
- let modePresent = false;
- for (const mode of serverMode) {
- modePresent = serverModes.has(mode);
- if (modePresent)
- break;
- }
- if (!modePresent)
- continue;
- }
- const dbPath = `${this.config.dataDir}/db/${dbConfig.dbName}`;
- //бэкап
- if (!dbConfig.noBak && await fs.pathExists(dbPath)) {
- const bakFile = `${dbPath}.bak`;
- await fs.remove(bakFile);
- await fs.copy(dbPath, bakFile);
- }
- let dbConn = null;
- if (dbConfig.thread) {
- dbConn = new JembaDbThread();
- } else {
- dbConn = new JembaDb();
- }
- this._db[dbConfig.dbName] = dbConn;
- log(`Open "${dbConfig.dbName}" begin`);
- await dbConn.lock({
- dbPath,
- create: true,
- softLock: true,
- tableDefaults: {
- cacheSize: dbConfig.cacheSize,
- compressed: dbConfig.compressed,
- forceFileClosing: dbConfig.forceFileClosing,
- typeCompatMode: true,
- },
- });
- if (dbConfig.openAll || forceAutoRepair || dbConfig.autoRepair) {
- try {
- await dbConn.openAll();
- } catch(e) {
- if ((forceAutoRepair || dbConfig.autoRepair) &&
- (
- e.message.indexOf('corrupted') >= 0
- || e.message.indexOf('Unexpected token') >= 0
- || e.message.indexOf('invalid stored block lengths') >= 0
- )
- ) {
- log(LM_ERR, e);
- log(`Open "${dbConfig.dbName}" with auto repair`);
- await dbConn.openAll({autoRepair: true});
- } else {
- throw e;
- }
- }
- }
- log(`Open "${dbConfig.dbName}" end`);
- //миграции
- const mig = migs[dbConfig.dbName];
- if (mig && mig.data) {
- const applied = await this.migrate(dbConn, mig.data, mig.table, undoLastMigration);
- if (applied.length)
- log(`${applied.length} migrations applied to "${dbConfig.dbName}"`);
- }
- }
- this.inited = true;
- }
- async close() {
- for (const dbConfig of this.config.jembaDb) {
- if (this._db[dbConfig.dbName])
- await this._db[dbConfig.dbName].unlock();
- }
- this._db = {};
- this.inited = false;
- }
- async migrate(db, migs, table, undoLastMigration) {
- const migrations = _.cloneDeep(migs).sort((a, b) => a.id - b.id);
- if (!migrations.length) {
- throw new Error('No migration data');
- }
- migrations.map(migration => {
- const data = migration.data;
- if (!data.up || !data.down) {
- throw new Error(`The ${migration.id}:${migration.name} does not contain 'up' or 'down' instructions`);
- } else {
- migration.up = data.up;
- migration.down = data.down;
- }
- delete migration.data;
- });
- // Create a database table for migrations meta data if it doesn't exist
- // id, name, up, down
- await db.create({
- table,
- quietIfExists: true,
- });
- // Get the list of already applied migrations
- let dbMigrations = await db.select({
- table,
- sort: '(a, b) => a.id - b.id'
- });
- const execUpDown = async(items) => {
- for (const item of items) {
- const action = item[0];
- await db[action](item[1]);
- }
- };
- // Undo migrations that exist only in the database but not in migs,
- // also undo the last migration if the undoLastMigration
- const lastMigration = migrations[migrations.length - 1];
- for (const migration of dbMigrations.slice().sort((a, b) => b.id - a.id)) {
- if (!migrations.some(x => x.id === migration.id) ||
- (undoLastMigration && migration.id === lastMigration.id)) {
- await execUpDown(migration.down);
- await db.delete({
- table,
- where: `@@id(${db.esc(migration.id)})`
- });
- dbMigrations = dbMigrations.filter(x => x.id !== migration.id);
- } else {
- break;
- }
- }
- // Apply pending migrations
- let applied = [];
- const lastMigrationId = dbMigrations.length ? dbMigrations[dbMigrations.length - 1].id : 0;
- for (const migration of migrations) {
- if (migration.id > lastMigrationId) {
- await execUpDown(migration.up);
- await db.insert({
- table,
- rows: [migration],
- });
- applied.push(migration.id);
- }
- }
- return applied;
- }
- get db() {
- if (!this.inited)
- throw new Error('JembaConnManager not inited');
- return this._db;
- }
- }
- module.exports = JembaConnManager;
|