Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reload chaos experiments on service startup #642

Merged
merged 6 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = async () => {
await database.init();
await jobsManager.init();
await jobsManager.reloadCronJobs();
await jobsManager.reloadChaosExperiments();
await jobsManager.scheduleFinishedContainersCleanup();
if (streamingConfig.platform) {
const eventStreamerPlatformConfig = require(`./config/${streamingConfig.platform}Config`);
Expand Down
9 changes: 9 additions & 0 deletions src/chaos-experiments/models/chaosExperimentsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ module.exports.getChaosExperimentById = async function (experimentId) {
}
};

module.exports.insertChaosJobExperiment = (jobExperimentId, jobId, experimentId, startTime, endTime) => {
const contextId = httpContext.get(CONTEXT_ID);
return databaseConnector.insertChaosJobExperiment(jobExperimentId, jobId, experimentId, startTime, endTime, contextId);
};

module.exports.getChaosExperimentsByIds = (experimentIds, exclude, contextId) => {
return databaseConnector.getChaosExperimentsByIds(experimentIds, exclude, contextId);
};
Expand Down Expand Up @@ -76,3 +81,7 @@ module.exports.updateChaosExperiment = async function (experimentId, chaosExperi
await databaseConnector.updateChaosExperiment(experimentId, chaosExperiment);
return chaosExperiment;
};

module.exports.getFutureJobExperiments = async function (timestamp, contextId) {
return databaseConnector.getFutureJobExperiments(contextId);
};
5 changes: 5 additions & 0 deletions src/chaos-experiments/models/database/databaseConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module.exports = {
insertChaosJobExperiment,
getChaosJobExperimentById,
getChaosJobExperimentByJobId,
getFutureJobExperiments,
setChaosJobExperimentTriggered,
updateChaosExperiment,
closeConnection
Expand Down Expand Up @@ -64,6 +65,10 @@ async function getChaosJobExperimentByJobId(jobId, contextId) {
return databaseConnector.getChaosJobExperimentById(jobId, contextId);
}

async function getFutureJobExperiments(contextId) {
return databaseConnector.getFutureJobExperiments(contextId);
}

async function setChaosJobExperimentTriggered(jobExperimentId, isTriggered, contextId) {
return databaseConnector.setChaosJobExperimentTriggered(jobExperimentId, isTriggered, contextId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Sequelize = require('sequelize');
const { CHAOS_EXPERIMENTS_TABLE_NAME, CHAOS_JOB_EXPERIMENTS_TABLE_NAME } = require('../../../../database/sequlize-handler/consts');
const { Op } = require('sequelize');
const KUBEOBJECT = 'kubeObject';
let client;

Expand All @@ -17,6 +18,7 @@ module.exports = {
insertChaosJobExperiment,
getChaosJobExperimentById,
getChaosJobExperimentByJobId,
getFutureJobExperiments,
setChaosJobExperimentTriggered
};

Expand Down Expand Up @@ -173,6 +175,23 @@ async function getChaosJobExperimentByJobId(jobId, contextId) {
return chaosExperiment;
}

async function getFutureJobExperiments(timestamp, contextId) {
const options = {
where: {
is_triggered: false,
start_time: { [Op.gt]: timestamp }
}
};

if (contextId) {
options.where.context_id = contextId;
}

const chaosJobExperimentModel = client.model(CHAOS_JOB_EXPERIMENTS_TABLE_NAME);
const allJobExperiments = await chaosJobExperimentModel.findAll(options);
return allJobExperiments;
}

async function setChaosJobExperimentTriggered(id, isTriggered, contextId) {
const chaosJobExperimentModel = client.model(CHAOS_EXPERIMENTS_TABLE_NAME);
const options = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

module.exports.runChaosExperiment = async (kubernetesExperimentConfig) => {};
8 changes: 3 additions & 5 deletions src/jobs/helpers/jobVerifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,10 @@ module.exports.verifyExperimentsExist = async (req, res, next) => {
let errorToThrow;
const jobPlatform = await configHandler.getConfigValue(CONFIG.JOB_PLATFORM);
const experiments = jobBody.experiments;
if (!experiments || experiments.length === 0) {
if (jobPlatform.toUpperCase() !== KUBERNETES) {
next();
} else if (!experiments || experiments.length === 0) {
next();
} else if (experiments.length > 0 && jobPlatform.toUpperCase() !== KUBERNETES) {
errorToThrow = new Error(ERROR_MESSAGES.CHAOS_EXPERIMENT_SUPPORTED_ONLY_IN_KUBERNETES);
errorToThrow.statusCode = 400;
next(errorToThrow);
} else {
const uniqueExperimentIds = [...new Set(experiments.map(experiment => experiment.experiment_id))];
const chaosExperiments = await choasExperimentsManager.getChaosExperimentsByIds(uniqueExperimentIds, ['kubeObject']);
Expand Down
26 changes: 14 additions & 12 deletions src/jobs/models/jobExperimentsHandler.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

const chaosExperimentsDbConnector = require('../../chaos-experiments/models/database/databaseConnector'),
const chaosExperimentsManager = require('../../chaos-experiments/models/chaosExperimentsManager'),
chaosExperimentConnector = require('../../chaos-experiments/models/kubernetes/chaosExperimentConnector'),
{ v4: uuid } = require('uuid'),
logger = require('../../common/logger');

Expand All @@ -16,33 +17,33 @@ async function setChaosExperimentsIfExist(jobId, jobExperiments) {
try {
const baseTimestamp = Date.now();
const experimentIds = jobExperiments.map(experiment => experiment.experiment_id);
const experimentsFromDb = await chaosExperimentsDbConnector.getChaosExperimentsByIds(experimentIds);
const chaoExperimentsFromDb = await chaosExperimentsManager.getChaosExperimentsByIds(experimentIds);
await Promise.all(jobExperiments.map(async(experimentRequest) =>
await setSingleJobExperiment(experimentRequest, experimentsFromDb, baseTimestamp, jobId)
await setSingleJobExperiment(experimentRequest, chaoExperimentsFromDb, baseTimestamp, jobId)
));
} catch (error){
logger.error(error, `error while setting chaos experiments for job ${jobId}`);
}
};

async function setSingleJobExperiment(experimentRequest, experimentsFromDb, baseTimestamp, jobId) {
async function setSingleJobExperiment(experimentRequest, chaoExperimentsFromDb, baseTimestamp, jobId) {
try {
const experiment = experimentsFromDb.find(e => e.id === experimentRequest.experiment_id);
const experiment = chaoExperimentsFromDb.find(e => e.id === experimentRequest.experiment_id);
const startTime = baseTimestamp + experimentRequest.start_after;
const endTime = startTime + convertDurationStringToMillisecond(experiment.kubeObject.spec.duration);
const jobExperimentId = uuid();
await chaosExperimentsDbConnector.insertChaosJobExperiment(jobExperimentId, jobId, experiment.id, startTime, endTime);
await chaosExperimentsManager.insertChaosJobExperiment(jobExperimentId, jobId, experiment.id, startTime, endTime);
const kubeObject = experiment.kubeObject;
kubeObject.name = kubeObject.metadata.name.concat(`_${jobExperimentId}`);
const timeout = setTimeout(() => runChaosExperiment(kubeObject, jobId, jobExperimentId), experimentRequest.start_after);
jobExperimentsIdToTimeout.set(jobExperimentId, timeout);
scheduleChaosExperiment(kubeObject, jobId, jobExperimentId, experimentRequest.start_after);
} catch (error){
logger.error(error, `error while setting chaos experiment ${experimentRequest.experiment_id} for job ${jobId}`);
}
}

async function runChaosExperiment(kubeObject, jobId, jobExperimentId) {

function scheduleChaosExperiment(kubeObject, jobId, jobExperimentId, startAfter) {
const timeout = setTimeout(() => chaosExperimentConnector.runChaosExperiment(kubeObject, jobId, jobExperimentId), startAfter);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After you merge with me, note that jobId was dropped since it's not needed

jobExperimentsIdToTimeout.set(jobExperimentId, timeout);
}

function convertDurationStringToMillisecond(durationString) {
Expand All @@ -63,5 +64,6 @@ function convertDurationStringToMillisecond(durationString) {

module.exports = {
jobExperimentsIdToTimeout,
setChaosExperimentsIfExist
};
setChaosExperimentsIfExist,
scheduleChaosExperiment
};
31 changes: 27 additions & 4 deletions src/jobs/models/jobManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ const logger = require('../../common/logger'),
databaseConnector = require('./database/databaseConnector'),
webhooksManager = require('../../webhooks/models/webhookManager'),
streamingManager = require('../../streaming/manager'),
chaosExperimentsManager = require('../../chaos-experiments/models/chaosExperimentsManager'),
{ STREAMING_EVENT_TYPES } = require('../../streaming/entities/common'),
{ CONFIG, CONTEXT_ID, JOB_TYPE_FUNCTIONAL_TEST } = require('../../common/consts'),
{
CONFIG, CONTEXT_ID, JOB_TYPE_FUNCTIONAL_TEST,
KUBERNETES
} = require('../../common/consts'),
generateError = require('../../common/generateError'),
{ version: PREDATOR_VERSION } = require('../../../package.json'),
jobExperimentHandler = require('./jobExperimentsHandler');
Expand All @@ -34,16 +38,35 @@ module.exports.reloadCronJobs = async () => {
const configData = await configHandler.getConfig();
try {
const jobs = await databaseConnector.getJobs(contextId);
jobs.forEach(async function (job) {
for (const job of jobs) {
GuyAb marked this conversation as resolved.
Show resolved Hide resolved
if (job.cron_expression !== null) {
addCron(job, job.cron_expression, configData);
await addCron(job, job.cron_expression, configData);
GuyAb marked this conversation as resolved.
Show resolved Hide resolved
}
});
};
} catch (error) {
throw new Error('Unable to reload scheduled jobs, error: ' + error);
}
};

module.exports.reloadChaosExperiments = async () => {
const contextId = httpContext.get(CONTEXT_ID);
const jobPlatform = await configHandler.getConfigValue(CONFIG.JOB_PLATFORM);
if (jobPlatform.toUpperCase() !== KUBERNETES) {
return;
}
try {
const timestamp = new Date().valueOf();
GuyAb marked this conversation as resolved.
Show resolved Hide resolved
const futureJobExperiments = await chaosExperimentsManager.getFutureJobExperiments(timestamp, contextId);
for (const futureJobExperiment of futureJobExperiments) {
const calculatedStartAfter = futureJobExperiment.start_time - timestamp;
const chaosExperiment = await chaosExperimentsManager.getChaosExperimentById(futureJobExperiment.experiment_id);
jobExperimentHandler.scheduleChaosExperiment(chaosExperiment.kubeObject, futureJobExperiment.job_id, futureJobExperiment.id, calculatedStartAfter);
}
GuyAb marked this conversation as resolved.
Show resolved Hide resolved
} catch (error) {
throw new Error('Unable to reload job experiments , error: ' + error);
}
};

module.exports.scheduleFinishedContainersCleanup = async () => {
const interval = await configHandler.getConfigValue(CONFIG.INTERVAL_CLEANUP_FINISHED_CONTAINERS_MS);
if (interval > 0) {
Expand Down
43 changes: 42 additions & 1 deletion tests/unit-tests/jobs/helpers/jobVerifier-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ const sinon = require('sinon'),
should = require('should'),
jobVerifier = require('../../../../src/jobs/helpers/jobVerifier'),
testsManager = require('../../../../src/tests/models/manager'),
chaosExperimentsManager = require('../../../../src/chaos-experiments/models/chaosExperimentsManager'),
configHandler = require('../../../../src/configManager/models/configHandler'),
consts = require('../../../../src/common/consts');

describe('Jobs verifier tests', function () {
let req, res, sandbox, nextStub, resJsonStub, resStatusStub, testsManagerStub, configHandlerStub;
let req, res, sandbox, nextStub, resJsonStub, resStatusStub, testsManagerStub, configHandlerStub, getChaosExperimentsByIdsStub;

before(() => {
sandbox = sinon.createSandbox();
Expand All @@ -15,6 +16,7 @@ describe('Jobs verifier tests', function () {
resStatusStub = sandbox.stub();
configHandlerStub = sandbox.stub(configHandler, 'getConfigValue');
testsManagerStub = sandbox.stub(testsManager, 'getTest');
getChaosExperimentsByIdsStub = sandbox.stub(chaosExperimentsManager, 'getChaosExperimentsByIds');
res = {
json: (json) => {
resJsonStub(json);
Expand Down Expand Up @@ -179,4 +181,43 @@ describe('Jobs verifier tests', function () {
should(nextStub.args[0][0].statusCode).eql(400);
});
});

describe('verifyExperimentsExist tests', () => {
it('if job does not have experiments array, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES');
req = { body: { experiments: undefined } };
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
it('if job experiments array is empty, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES');
req = { body: { experiments: [] } };
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('if job platform is not KUBERNETES, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('DOCKER');
req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: false } };
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('if chaos experiments mentioned in the job exist, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES');
req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: true, experiments: [{ experiment_id: '1234', start_after: 1000 }] } };
getChaosExperimentsByIdsStub.resolves([{ id: '1234' }]);
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});

it('if chaos experiments mentioned in the job do not exist, should fail', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES');
req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: true, experiments: [{ experiment_id: '1234', start_after: 1000 }] } };
getChaosExperimentsByIdsStub.resolves([]);
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0].message).eql('One or more chaos experiments are not configured. Job can not be created');
should(nextStub.args[0][0].statusCode).eql(400);
});
});
});
40 changes: 40 additions & 0 deletions tests/unit-tests/jobs/models/jobManager-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const databaseConnector = require('../../../../src/jobs/models/database/database
webhooksManager = require('../../../../src/webhooks/models/webhookManager'),
basicTest = require('../../../testExamples/Basic_test.json'),
reportsManager = require('../../../../src/reports/models/reportsManager'),
chaosExperimentsManager = require('../../../../src/chaos-experiments/models/chaosExperimentsManager'),
chaosExperimentConnector = require('../../../../src/chaos-experiments/models/kubernetes/chaosExperimentConnector'),
config = require('../../../../src/common/consts').CONFIG;

let manager;
Expand Down Expand Up @@ -47,6 +49,11 @@ describe('Manager jobs', function () {

let testsManagerGetStub;

let getFutureJobExperimentsStub;
let getChaosExperimentByIdStub;

let runChaosExperimentStub;

before(() => {
sandbox = sinon.sandbox.create();

Expand All @@ -63,6 +70,11 @@ describe('Manager jobs', function () {

testsManagerGetStub = sandbox.stub(testsManager, 'getTest');

getFutureJobExperimentsStub = sandbox.stub(chaosExperimentsManager, 'getFutureJobExperiments');
getChaosExperimentByIdStub = sandbox.stub(chaosExperimentsManager, 'getChaosExperimentById');

runChaosExperimentStub = sandbox.stub(chaosExperimentConnector, 'runChaosExperiment');

jobGetLogsStub = sandbox.stub(jobConnector, 'getLogs');
jobDeleteContainerStub = sandbox.stub(jobConnector, 'deleteAllContainers');
jobStopRunStub = sandbox.stub(jobConnector, 'stopRun');
Expand Down Expand Up @@ -161,6 +173,34 @@ describe('Manager jobs', function () {
});
});

describe('Reload job experiments', function () {
it('found future experiments to reload', async () => {
const timestamp = 7200000;
const jobExperiment = { start_time: timestamp, job_id: '1234', experiment_id: '4321', id: '2468' };
const chaosExperiment = { kubeObject: { hello: 1 }, experiment_id: '4321' };
getFutureJobExperimentsStub.resolves([jobExperiment]);
getChaosExperimentByIdStub.resolves(chaosExperiment);
runChaosExperimentStub.returns();

const clock = sinon.useFakeTimers();
const promise = manager.reloadChaosExperiments();
clock.tick(3600000);
await promise;
clock.tick(3600010);
sinon.assert.calledOnce(runChaosExperimentStub);
sinon.assert.calledWith(runChaosExperimentStub, chaosExperiment.kubeObject, jobExperiment.job_id, jobExperiment.id);
clock.restore();
});
it('future experiments not found - nothing to reload', async () => {
getFutureJobExperimentsStub.resolves([]);
runChaosExperimentStub.returns();

await manager.reloadChaosExperiments();
sinon.assert.notCalled(getChaosExperimentByIdStub);
sinon.assert.notCalled(runChaosExperimentStub);
});
});

describe('schedule Finished Containers Cleanup', function () {
it('Interval is set to 0, no automatic cleanup is scheduled', (done) => {
getConfigValueStub.withArgs(config.INTERVAL_CLEANUP_FINISHED_CONTAINERS_MS).returns(0);
Expand Down