Skip to content

Commit

Permalink
Feature Complete
Browse files Browse the repository at this point in the history
  • Loading branch information
vortarian committed Mar 16, 2017
1 parent 163c7e8 commit e394718
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 60 deletions.
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class SqsFifo {
.then(this.validate)
.then(this.create)
.then(this.decorate),
'before:deploy:remove': () =>
'after:remove:remove': () =>
BbPromise.bind(this)
.then(this.validate)
.then(this.remove),
Expand Down
179 changes: 122 additions & 57 deletions lib/queues.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,77 +2,142 @@

const aws = require('aws-sdk');
const BbPromise = require('bluebird');
const traverse = require('traverse');

aws.config.setPromisesDependency(BbPromise);

let sqs;

function searchAndReplace(target, queues) {
traverse(target).forEach(function(x) {
let queue;
for(var q in queues) {
switch(x) {
case `${queues[q].logical_name}.arn`:
this.update(queues[q].arn);
break;
case `${queues[q].logical_name}.url`:
this.update(queues[q].url);
break;
default:
break;
}
}
});
}

module.exports = {
create() {
let _this = this;
if(_this.queueStack.length === 0) return;
let queue = _this.queueStack.shift();
let sqs = new aws.SQS({
if(!sqs)
sqs = new aws.SQS({
'apiVersion': '2012-11-05',
'region': _this.options.region
});
let params = {
'QueueName': queue.QueueName,
'Attributes': {}

if (!_this.queueStack.length) {
return Promise.resolve();
}

let queue = _this.queueStack.shift();
let params = {
'QueueName': queue.QueueName,
'Attributes': {}
};
for( var p in queue.Properties) {
if ('RedrivePolicy' === p) {
params.Attributes[p] = JSON.stringify(queue.Properties[p]);
} else {
params.Attributes[p] = queue.Properties[p].toString();
}
for (var p in queue.Properties) {
if ("RedrivePolicy" === p) {
params.Attributes[p] = JSON.stringify(queue.Properties[p]);
};

return sqs.createQueue(params).promise()
.catch((err) => {
_this.serverless.cli.log(`Error in creating queue: ${JSON.stringify(err, null, 2)}`);
throw err;
})
.then((data) => {
queue.url = data.QueueUrl;
// Get the arn
return sqs.getQueueAttributes({
'AttributeNames': ["QueueArn"],
'QueueUrl': queue.url
}).promise()
})
.catch((err) => {
_this.serverless.cli.log(`Error in obtaining queue Arn: ${JSON.stringify(err, null, 2)}`);
throw err;
})
.then((q) => {
queue.arn = q.Attributes.QueueArn;
// Iterate the rest of the queue's and replace the arn reference if the other queues have dead letter queues
for(var iq in _this.queueStack) {
let nq = _this.queueStack[iq];
if ('RedrivePolicy' in nq.Properties) {
if (nq.Properties.RedrivePolicy.deadLetterTargetArn === queue.logical_name)
nq.Properties.RedrivePolicy.deadLetterTargetArn = queue.arn;
}
};
_this.serverless.cli.log(`severless-sqs-queue Created queue ${queue.url} for ${queue.logical_name}`);
// Are we on the last queue?
if (_this.queueStack.length !== 0) {
// Do this for the other items in the queue
return _this.create();
} else {
params.Attributes[p] = queue.Properties[p].toString();
// We're Done!
}
}
return sqs.createQueue(params).promise().then((data) => {
queue.url = data.QueueUrl;
// Get the arn
return sqs.getQueueAttributes({
'AttributeNames': ["QueueArn"],
'QueueUrl': queue.url
}).promise().catch((err) => {
_this.serverless.cli.log(`Error in serverless-sqs-fifo fetching queue attributes for ${queue.url}: ${err}`);
throw err;
}).then((q) => {
queue.arn = q.Attributes.QueueArn;
// Iterate the rest of the queue's and replace the arn reference if present for dead letter queues
for (var iq in _this.queueStack) {
let nq = _this.queueStack[iq];
if ("RedrivePolicy" in nq.Properties) {
if (nq.Properties.RedrivePolicy.deadLetterTargetArn === queue.logical_name)
nq.Properties.RedrivePolicy.deadLetterTargetArn = queue.arn;
}
}
// Recurse into create method, it will return when the stack is empty
return _this.queueStack.length;
}).catch((err) => {
_this.serverless.cli.log(`Error in serverless-sqs-fifo replacing queue arn for ${queue.arn}: ${err}`);
throw err;
}).then((count) => {
if (count === 0) {
let queues = _this.serverless.service.custom.sqs.queues;
for (var i in queues)
_this.serverless.cli.log(JSON.stringify(queues[i], null, 2));
} else {
return _this.create();
}
});
});
});
},
decorate(val) {
// Trigger serverless to evaluate variables again to pick up the new data we added
// If we break serverless, this is probably the place we did it
console.log("Calling populateService again");
this.serverless.variables.populateService(this.serverless.pluginManager.cliOptions);
},
update(data) {
console.log(`Calling update ${JSON.stringify(data, null, 2)}`);
this.serverless.update(data);
searchAndReplace(this.serverless.service, this.serverless.service.custom.sqs.queues);
},
remove() {
this.serverless.cli.log(`Stub queues remove`);
queues = this.serverless.service.custom.sqs.fifo.queues;
for (var i in queues)
console.log("Stub queue removing " + queues[i].arn);
}
console.log("sqs-fifo.remove()");
let _this = this;
if(!sqs)
sqs = new aws.SQS({
'apiVersion': '2012-11-05',
'region': _this.options.region
});

if (!_this.queueStack.length) {
return Promise.resolve();
}

let queue = _this.queueStack.shift();
let params = {
'QueueNamePrefix': queue.QueueName,
};

// Here's some crazyiness ... you need the url or arn to delete the queue, you get them by calling createQueue ...
return sqs.listQueues(params).promise()
.catch((err) => {
_this.serverless.cli.log(`Error in creating queue: ${JSON.stringify(err, null, 2)}`);
throw err;
})
.then((data) => {
console.log(`sqsRemoving queue ${JSON.stringify(data.QueueUrls, null, 2)}`);
// Delete the queue
for(var qu in data.QueueUrls) {
if(data.QueueUrls[qu].toString().endsWith(`/${queue.QueueName}`)) {
return sqs.deleteQueue({
'QueueUrl': data.QueueUrls[qu].toString()
}).promise()
.catch((err) => {
_this.serverless.cli.log(`Error in removing queue: ${JSON.stringify(err, null, 2)}`);
throw err;
})
.then(() => {
_this.remove();
})
}
}
})
.catch((err) => {
_this.serverless.cli.log(`Error in listing queues arn: ${JSON.stringify(err, null, 2)}`);
throw err;
});
},
};
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "serverless-sqs-fifo",
"version": "1.0.1",
"version": "1.0.0",
"description": "Augmenting support for SQS Fifo Queue's with Serverless",
"main": "index.js",
"scripts": {
Expand Down Expand Up @@ -45,6 +45,7 @@
"aws-sdk": "^2.22.0",
"bluebird": "^3.4.6",
"eslint-plugin-import": "^2.2.0",
"fs-extra": "^1.0.0"
"fs-extra": "^1.0.0",
"traverse": "^0.6.6"
}
}

0 comments on commit e394718

Please sign in to comment.