Skip to content

Commit

Permalink
feat(emr-runner): update resources stack before start cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
weixu365 committed Sep 20, 2020
1 parent c58722f commit 087df92
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 26 deletions.
110 changes: 93 additions & 17 deletions src/aws/cloudformation_client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const fs = require("fs");
const yaml = require('js-yaml');
const lodash = require('lodash');
const promiseRetry = require('promise-retry');
const AWS = require("./aws");
const logger = require("../logger");

Expand All @@ -25,21 +26,29 @@ class CloudformationClient {
.catch(e => Promise.reject(new Error(`Failed to get cloudformation stack resources from '${stackName}', caused by ${e}`)));
}

deploy(stackName, resources, stackParameters, tags) {
const changeSetName = `aws-emr-runner/${new Date().toISOString()}`.replace(/[^a-z0-9]/gi, '-')
async deploy(stackName, resources, stackParameters, tags) {
stackName = this.normaliseStackName(stackName)
let operation = 'CREATE'
this.getStack(stackName)
.then(stack => {operation = stack == null ? 'CREATE': 'UPDATE'})
.then(() => operation == 'UPDATE' && this.clearPreviousChangeSets(stackName))
.then(() => console.log(`Create changeset to ${operation} stack ${stackName}`))
.then(stack => this.createChangeSet(stackName, resources, stackParameters, tags, changeSetName, operation))
.then(() => console.log(`Waiting for changeset to be created`))
.then(() => this.waitForChangeSet(stackName, changeSetName))
.then(() => console.log(`Execute changeset on stack ${stackName}`))
.then(() => this.executeChangeSet(stackName, changeSetName))
.then(() => console.log(`Waiting for changeset to be applied to stack ${stackName}`))
.then(() => this.waitFor(stackName, operation == 'CREATE' ? 'stackCreateComplete': 'stackUpdateComplete'))
const changeSetName = `aws-emr-runner/${new Date().toISOString()}`.replace(/[^a-z0-9]/gi, '-')

const stack = await this.getStack(stackName)
const operation = stack == null ? 'CREATE': 'UPDATE'
if (operation == 'UPDATE') {
await this.clearPreviousChangeSets(stackName)
}

this.logger.info(`Create changeset to ${operation} resources stack ${stackName}`)
await this.createChangeSet(stackName, resources, stackParameters, tags, changeSetName, operation)

this.logger.info(`Waiting for changeset to be created`)
const changeSet = await this.waitForChangeSet(stackName, changeSetName)
if(changeSet == null) {
this.logger.info(`No changes on resources stack ${stackName}`)
} else {
this.logger.info(`Executing changeset on stack ${stackName}`)
await this.executeChangeSet(stackName, changeSetName)
this.logger.info(`Waiting for changeset to be applied to stack ${stackName}`)
await this.waitFor(stackName, operation == 'CREATE' ? 'stackCreateComplete': 'stackUpdateComplete')
}
}

createChangeSet(stackName, resources, stackParameters, tags, changeSetName, changeSetType) {
Expand All @@ -50,10 +59,11 @@ class CloudformationClient {

lodash.forEach(this.stackParameters, (value, key) => {
lodash.set(configDoc, key, value);
console.log(`override settings: ${key}=${value}`)
this.logger.info(`override settings: ${key}=${value}`)
})

console.log(this.generateStackTemplate(stackName, resources))
const stackTemplateBody = this.generateStackTemplate(stackName, resources)
this.logger.debug(`Generated stack template: ${stackTemplateBody}`)
const params = {
ChangeSetName: changeSetName,
StackName: stackName,
Expand All @@ -77,7 +87,7 @@ class CloudformationClient {
}

clearPreviousChangeSets(stackName) {
console.log(`Cleaning up previous changesets on stack ${stackName}`)
this.logger.info(`Cleaning up previous changesets on stack ${stackName}`)
return this.listChangeSets(stackName)
.then(result => result.Summaries)
.each(changeSet => this.deleteChangeSet(stackName, changeSet.ChangeSetName))
Expand All @@ -104,12 +114,39 @@ class CloudformationClient {
.catch(e => Promise.reject(new Error(`Failed to delete changeset from cloudformation stack '${stackName}', caused by ${e}`)));
}

deleteStack(stackName) {
stackName = this.normaliseStackName(stackName)
var params = {
StackName: stackName
};
return this.cloudformation.deleteStack(params).promise()
.then(() => {
return promiseRetry((retry, number) => {
return this.getStack(stackName)
.then(r => {
if(r == null) {
return null
}
return retry()
})
.catch(e => {
if(!this.isRetryError(e)) {
this.logger.info(`Failed to check stack status(${number}): ${e}`)
}
return retry()
})
}, {retries: 1000, minTimeout: 2000, factor: 1})
})
.catch(e => Promise.reject(new Error(`Failed to delete cloudformation stacks '${stackName}', caused by ${e}`)))
}

getStack(stackName) {
stackName = this.normaliseStackName(stackName)
const params = {
StackName: stackName
};
return this.cloudformation.describeStacks(params).promise()
.then(response => response.Stacks[0])
.catch(e => {
if (e.message.indexOf("not exist") >=0) {
return null
Expand All @@ -128,12 +165,27 @@ class CloudformationClient {
return yaml.dump({...defaultTemplate, Resources: resources})
}

getEvents(stackName) {
stackName = this.normaliseStackName(stackName)
var params = {
StackName: stackName
};
return this.cloudformation.describeStackEvents(params).promise()
.then(response => response.StackEvents)
.map(e => lodash.pick(e, ['LogicalResourceId', 'ResourceStatus', 'ResourceStatusReason']))
.catch(e => Promise.reject(new Error(`Failed to get events from stack: '${stackName}', caused by ${e}`)));
}

waitFor(stackName, event) {
const params = {
StackName: stackName
};

return this.cloudformation.waitFor(event, params).promise()
.catch(e => {
return this.getEvents(stackName)
.then((events) => Promise.reject(new Error(`Stack is not in the state '${event}', detailed events:\n${JSON.stringify(events, null, ' ')}`)))
});
}

waitForChangeSet(stackName, changeSetName) {
Expand All @@ -143,6 +195,30 @@ class CloudformationClient {
};

return this.cloudformation.waitFor('changeSetCreateComplete', params).promise()
.catch(e => {
this.getChangeset(stackName, changeSetName)
.tap(changeSet => this.logger.debug(`Changeset : ${JSON.stringify(changeSet, null, ' ')}`))
.then(changeSet => {
if (lodash.size(changeSet.Changes) == 0) {
return null
}else{
return changeSet
}
})
})
}

getChangeset(stackName, changeSetName) {
const params = {
ChangeSetName: changeSetName,
StackName: stackName
};
return this.cloudformation.describeChangeSet(params).promise()
.catch(e => Promise.reject(new Error(`Failed to get changeset from stack: '${stackName}', caused by ${e}`)));
}

isRetryError(err) {
return err && err.code === 'EPROMISERETRY' && Object.prototype.hasOwnProperty.call(err, 'retried');
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/aws/emr_client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const Bluebird = require('bluebird');
const promiseRetry = require('promise-retry');

const AWS = require("./aws");
const logger = require("../logger");
const EmrHadoopDebuggingStep = require('../steps/emr_hadoop_debugging_step');
Expand Down Expand Up @@ -28,7 +28,8 @@ class EmrClient {

waitForClusterStarted(cluster_id) {
return this.waitForCluster(cluster_id, ['WAITING'])
.tap(() => this.logger.info(`Cluster ${cluster_id} started`))
.then(() => this.logger.info(`Cluster ${cluster_id} started`))
.then(() => cluster_id)
}

waitForCluster(cluster_id, waitingState=[]) {
Expand All @@ -37,7 +38,6 @@ class EmrClient {
};

return promiseRetry((retry, number) => {

return this.emr.describeCluster(params).promise()
.then(r => {
this.logger.info(`Checking cluster ${cluster_id} status(${number}): ${r.Cluster.Status.State}`);
Expand Down Expand Up @@ -65,7 +65,6 @@ class EmrClient {
return retry()
});
}, {retries: 10000, minTimeout: 10 * 1000, factor: 1})
.then(() => cluster_id)
}

getClusterByName(name) {
Expand Down
10 changes: 7 additions & 3 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ class Config {
this.overrideSettings = {}

this.resources = null
this.builtInVariables = {
EmrHadoopDebuggingStep: JSON.stringify(new EmrHadoopDebuggingStep().get(), null, ' '),
}
this.config = null
}

load() {
const defaultSettings = {
env: {...process.env, BUILD_NUMBER: 'manual', AUTO_TERMINATE: 'false'},
EmrHadoopDebuggingStep: JSON.stringify(new EmrHadoopDebuggingStep().get(), null, ' '),
...(this.resources && { Resources: this.resources })
...(this.resources && { Resources: this.resources }),
...this.builtInVariables,
}

const fileSettings = this.loadSettingsFiles(this.settingsPath, defaultSettings)
Expand Down Expand Up @@ -67,7 +70,8 @@ class Config {
return `${this.getName()}-resources-${this.getSetting('environment')}`
}

reloadWithResources(resources) {
reloadWithResources(accountId, resources) {
this.builtInVariables.AWSAccountId = accountId
this.resources = resources
this.load()
logger.debug(`Loaded config file with resources: \n${JSON.stringify(this.config, null, ' ')}`);
Expand Down
11 changes: 9 additions & 2 deletions src/emr_runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const lodash = require('lodash')
const Bluebird = require('bluebird');
const EmrClient = require('./aws/emr_client')
const S3Client = require('./aws/s3_client')
const StsClient = require('./aws/sts_client')
const CloudformationClient = require('./aws/cloudformation_client')
const EmrSparkStep = require('./steps/emr_spark_step')
const logger = require("./logger");
Expand All @@ -16,6 +17,7 @@ class EmrRunner {
const region = this.config.get().deploy.region
this.emrClient = new EmrClient(region)
this.s3Client = new S3Client(region)
this.stsClient = new StsClient(region)
this.cloudformationClient = new CloudformationClient(region)
this.logger = logger
}
Expand Down Expand Up @@ -60,8 +62,7 @@ class EmrRunner {
}

startCluster(steps=[]) {
return loadResources()
.then(resources => this.config.reloadWithResources(resources))
return this.loadAwsSettings()
.then(() => this.emrClient.startCluster(this.config.get().cluster))
.then(cluster_id => this.emrClient.waitForClusterStarted(cluster_id))
}
Expand All @@ -75,6 +76,12 @@ class EmrRunner {
return this.emrClient.getClusterByName(this.config.get().cluster.Name).then(c => c.id)
}

deleteResources() {
const stackName = this.config.getResourceStackName()
return this.cloudformationClient.deleteStack(stackName)
.then(()=> this.logger.info("Resources stack deleted"))
}

deployResources() {
const fileConfig = this.config.load().get()
const resources = fileConfig.resources
Expand Down
7 changes: 7 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ program
return new EmrRunner(getConfig().load()).deployResources()
});

program
.command('delete-resources')
.description('Delete resources stack')
.action((cmd) => {
return new EmrRunner(getConfig().load()).deleteResources()
});

program
.command('start-cluster')
.description('Start a new EMR cluster. You need to manually terminate the cluster.')
Expand Down

0 comments on commit 087df92

Please sign in to comment.