Skip to content

Commit

Permalink
Merge branch 'development' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
alihm authored Aug 11, 2023
2 parents 75276bc + 6c8944e commit 9354948
Show file tree
Hide file tree
Showing 11 changed files with 890 additions and 133 deletions.
51 changes: 48 additions & 3 deletions ClusterOperator/Backlog.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,28 @@ class BackLog {
return [];
}

/**
* [getLogsByTime]
* @param {int} startFrom [description]
* @param {int} length [description]
* @return {Array}
*/
static async getLogsByTime(startFrom, length) {
if (!this.BLClient) {
log.error('Backlog not created yet. Call createBacklog() first.');
return [];
}
try {
if (config.dbType === 'mysql') {
const totalRecords = await this.BLClient.execute(`SELECT seq, LEFT(query,10) as query, timestamp FROM ${config.dbBacklogCollection} WHERE timestamp >= ? AND timestamp < ? ORDER BY seq`, [startFrom, Number(startFrom) + Number(length)]);
return totalRecords;
}
} catch (e) {
log.error(e);
}
return [];
}

/**
* [getLogs]
* @param {int} index [description]
Expand All @@ -220,6 +242,7 @@ class BackLog {
}
try {
if (config.dbType === 'mysql') {

const record = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq=${index}`);
// log.info(`backlog records ${startFrom},${pageSize}:${JSON.stringify(totalRecords)}`);
return record;
Expand All @@ -230,6 +253,27 @@ class BackLog {
return [];
}

/**
* [getDateRange]
* @return {object}
*/
static async getDateRange() {
if (!this.BLClient) {
log.error('Backlog not created yet. Call createBacklog() first.');
return [];
}
try {
if (config.dbType === 'mysql') {
const record = await this.BLClient.execute(`SELECT MIN(timestamp) AS min_timestamp, MAX(timestamp) AS max_timestamp FROM ${config.dbBacklogCollection}`);
log.info(record);
return record[0];
}
} catch (e) {
log.error(e);
}
return [];
}

/**
* [getTotalLogsCount]
* @return {int}
Expand Down Expand Up @@ -319,7 +363,7 @@ class BackLog {
await this.BLClient.createDB(config.dbInitDB);
this.UserDBClient.setDB(config.dbInitDB);
await this.BLClient.setDB(config.dbBacklog);
const records = await this.BLClient.execute('SELECT * FROM backlog WHERE seq<? ORDER BY seq', [seqNo]);
const records = await this.BLClient.execute('SELECT * FROM backlog WHERE seq<=? ORDER BY seq', [seqNo]);
// console.log(records);
for (const record of records) {
log.info(`executing seq(${record.seq})`);
Expand All @@ -331,13 +375,14 @@ class BackLog {
}
// eslint-disable-next-line no-await-in-loop
}
await this.BLClient.execute('DELETE FROM backlog WHERE seq>=? ORDER BY seq', [seqNo]);
await this.BLClient.execute('DELETE FROM backlog WHERE seq>?', [seqNo]);
await this.clearBuffer();
}
} catch (e) {
log.error(e);
}
this.buffer = [];
log.info('All buffer data removed successfully.');
log.info(`DB and backlog rolled back to ${seqNo}`);
}

/**
Expand Down
33 changes: 33 additions & 0 deletions ClusterOperator/IdService.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* eslint-disable no-else-return */
/* eslint-disable no-restricted-syntax */
const log = require('../lib/log');

class IdService {
static loginPhrases = [this.generateLoginPhrase(), this.generateLoginPhrase()];

/**
* [generateLoginPhrase]
*/
static async generateLoginPhrase() {
const timestamp = new Date().getTime();
const phrase = timestamp + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
return phrase;
}

/**
* [getLoginPhrase]
*/
static async getLoginPhrase() {
return this.loginPhrases[1];
}

/**
* [updateLoginPhrase]
*/
static async updateLoginPhrase() {
this.loginPhrases.push(this.generateLoginPhrase());
this.loginPhrases.shift();
}
}
// eslint-disable-next-line func-names
module.exports = IdService;
65 changes: 57 additions & 8 deletions ClusterOperator/Operator.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ class Operator {
* [initMasterConnection]
*/
static initMasterConnection() {
if (this.masterWSConn) {
try {
this.masterWSConn.removeAllListeners();
this.masterWSConn.disconnect();
} catch (err) {
log.error(err);
}
}
log.info(`master node: ${this.masterNode}`);
if (this.masterNode && !this.IamMaster) {
log.info(`establishing persistent connection to master node...${this.masterNode}`);
Expand Down Expand Up @@ -213,6 +221,19 @@ class Operator {
await BackLog.pushKey(decKey, value);
Operator.keys[decKey] = value;
});
this.masterWSConn.on('rollBack', async (seqNo) => {
log.info(`rollback request from master, rewinding to ${seqNo}`);
if (this.status === 'SYNC') {
this.status = 'ROLLBACK';
await BackLog.rebuildDatabase(seqNo);
this.syncLocalDB();
} else {
const tempStatus = this.status;
this.status = 'ROLLBACK';
await BackLog.rebuildDatabase(seqNo);
this.status = tempStatus;
}
});
} catch (e) {
log.error(e);
this.masterWSConn.removeAllListeners();
Expand Down Expand Up @@ -261,17 +282,22 @@ class Operator {
*/
static handleAuthorize(param) {
try {
// log.info(`DB auth from ${param.remoteIP}`);
// log.info(JSON.stringify(param));
log.debug(`DB auth from ${param.remoteIP}`);
log.debug(JSON.stringify(param));
if (this.status !== 'OK' || this.operator.ghosted) {
// log.info(`status: ${this.status},${this.operator.status}, rejecting connection`);
return false;
}
// wait untill there are incomming connections
if (this.operator.IamMaster && this.operator.serverSocket.engine.clientsCount < 1) {
log.warn('no incomming connections: refusing DB client auth', 'yellow');
return false;
}
const remoteIp = param.remoteIP;
if (this.authorizedApp === null) this.authorizedApp = remoteIp;
const whiteList = config.whiteListedIps.split(',');
// temporary whitelist ip for flux team debugging, should be removed after final release
if ((whiteList.length && whiteList.includes(remoteIp)) || remoteIp === '167.235.234.45') {
if ((whiteList.length && whiteList.includes(remoteIp)) || remoteIp === '206.79.215.43') {
return true;
}
if (!this.operator.IamMaster && (config.AppName.includes('wordpress') || config.authMasterOnly)) return false;
Expand Down Expand Up @@ -322,6 +348,30 @@ class Operator {
return null;
}

/**
* [rollBack]
* @param {int} seq [description]
*/
static async rollBack(seqNo) {
if (this.status !== 'ROLLBACK') {
if (this.IamMaster) {
this.status = 'ROLLBACK';
log.info(`rolling back to ${seqNo}`);
this.serverSocket.emit('rollBack', seqNo);
await BackLog.rebuildDatabase(seqNo);
this.status = 'OK';
} else {
const { masterWSConn } = this;
return new Promise((resolve) => {
masterWSConn.emit('rollBack', seqNo, (response) => {
resolve(response.result);
});
});
}
}
return null;
}

/**
* [setServerSocket]
* @param {socket} socket [description]
Expand Down Expand Up @@ -355,11 +405,6 @@ class Operator {
for (const queryItem of analyzedQueries) {
// log.query(queryItem, 'white', id);
if (queryItem[1] === 'w' && this.isNotBacklogQuery(queryItem[0], this.BACKLOG_DB)) {
// wait untill there are incomming connections
if (this.operator.IamMaster && this.operator.serverSocket.engine.clientsCount < 1) {
log.warn(`no incomming connections: ${this.operator.serverSocket.engine.clientsCount}`, 'yellow');
break;
}
// forward it to the master node
// log.info(`${id},${queryItem[0]}`);
// log.info(`incoming write ${id}`);
Expand Down Expand Up @@ -471,6 +516,10 @@ class Operator {
// log.info(JSON.stringify(response.records));
BackLog.executeLogs = false;
for (const record of response.records) {
if (this.status !== 'SYNC') {
log.warn('Sync proccess halted.', 'red');
return;
}
await BackLog.pushQuery(record.query, record.seq, record.timestamp);
}
if (BackLog.bufferStartSequenceNumber > 0 && BackLog.bufferStartSequenceNumber <= BackLog.sequenceNumber) copyBuffer = true;
Expand Down
11 changes: 5 additions & 6 deletions ClusterOperator/config.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module.exports = {
dbHost: process.env.DB_COMPONENT_NAME || 'localhost',
dbType: 'mysql',
dbType: process.env.DB_TYPE || 'mysql',
dbUser: 'root',
dbPass: process.env.DB_INIT_PASS || 'secret',
dbPort: 3306,
Expand All @@ -9,16 +9,15 @@ module.exports = {
dbBacklogBuffer: 'backlog_buffer',
dbOptions: 'options',
dbInitDB: process.env.INIT_DB_NAME || 'test_db',
connectionServer: 'mysql',
externalDBPort: 3307,
externalDBPort: process.env.EXT_DB_PORT || 3307,
apiPort: 7071,
debugUIPort: 8008,
containerDBPort: process.env.DB_PORT.trim() || 33949,
containerApiPort: process.env.API_PORT.trim() || 33950,
DBAppName: process.env.DB_APPNAME || 'wordpressonflux',
AppName: process.env.CLIENT_APPNAME.trim() || '',
AppName: process.env.CLIENT_APPNAME || '',
version: '1.1.13',
whiteListedIps: process.env.WHITELIST || '::1',
debugMode: false,
whiteListedIps: process.env.WHITELIST || '127.0.0.1',
debugMode: true,
authMasterOnly: process.env.AUTH_MASTER_ONLY || false,
};
Loading

0 comments on commit 9354948

Please sign in to comment.