Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sqs): add support for high throughput fifo #15202

Merged
merged 7 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/@aws-cdk/aws-sqs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ features in order to help guarantee exactly-once processing. For more informatio
the SQS manual. Note that FIFO queues are not available in all AWS regions.

A queue can be made a FIFO queue by either setting `fifo: true`, giving it a name which ends
in `".fifo"`, or enabling content-based deduplication (which requires FIFO queues).
in `".fifo"`, or by enabling a FIFO specific feature such as: content-based deduplication,
deduplication scope or fifo throughput limit.
62 changes: 62 additions & 0 deletions packages/@aws-cdk/aws-sqs/lib/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,26 @@ export interface QueueProps {
*/
readonly contentBasedDeduplication?: boolean;

/**
* For high throughput for FIFO queues, specifies whether message deduplication
* occurs at the message group or queue level.
*
* (Only applies to FIFO queues.)
*
* @default DeduplicationScope.QUEUE
*/
readonly deduplicationScope?: DeduplicationScope;

/**
* For high throughput for FIFO queues, specifies whether the FIFO queue
* throughput quota applies to the entire queue or per message group.
*
* (Only applies to FIFO queues.)
*
* @default FifoThroughputLimit.PER_QUEUE
*/
readonly fifoThroughputLimit?: FifoThroughputLimit;

/**
* Policy to apply when the user pool is removed from the stack
*
Expand Down Expand Up @@ -188,6 +208,34 @@ export enum QueueEncryption {
KMS = 'KMS',
}

/**
* What kind of deduplication scope to apply
*/
export enum DeduplicationScope {
/**
* Deduplication occurs at the message group level
*/
MESSAGE_GROUP = 'messageGroup',
/**
* Deduplication occurs at the message queue level
*/
QUEUE = 'queue',
}

/**
* Whether the FIFO queue throughput quota applies to the entire queue or per message group
*/
export enum FifoThroughputLimit {
/**
* Throughput quota applies per queue
*/
PER_QUEUE = 'perQueue',
/**
* Throughput quota applies per message group id
*/
PER_MESSAGE_GROUP_ID = 'perMessageGroupId',
}

/**
* A new Amazon SQS queue
*/
Expand Down Expand Up @@ -342,6 +390,8 @@ export class Queue extends QueueBase {
const queueName = props.queueName;
if (typeof fifoQueue === 'undefined' && queueName && !Token.isUnresolved(queueName) && queueName.endsWith('.fifo')) { fifoQueue = true; }
if (typeof fifoQueue === 'undefined' && props.contentBasedDeduplication) { fifoQueue = true; }
if (typeof fifoQueue === 'undefined' && props.deduplicationScope) { fifoQueue = true; }
if (typeof fifoQueue === 'undefined' && props.fifoThroughputLimit) { fifoQueue = true; }

// If we have a name, see that it agrees with the FIFO setting
if (typeof queueName === 'string') {
Expand All @@ -357,8 +407,18 @@ export class Queue extends QueueBase {
throw new Error('Content-based deduplication can only be defined for FIFO queues');
}

if (props.deduplicationScope && !fifoQueue) {
throw new Error('Deduplication scope can only be defined for FIFO queues');
}

if (props.fifoThroughputLimit && !fifoQueue) {
throw new Error('FIFO throughput limit can only be defined for FIFO queues');
}

return {
contentBasedDeduplication: props.contentBasedDeduplication,
deduplicationScope: props.deduplicationScope,
fifoThroughputLimit: props.fifoThroughputLimit,
fifoQueue,
};
}
Expand All @@ -367,6 +427,8 @@ export class Queue extends QueueBase {
interface FifoProps {
readonly fifoQueue?: boolean;
readonly contentBasedDeduplication?: boolean;
readonly deduplicationScope?: DeduplicationScope;
readonly fifoThroughputLimit?: FifoThroughputLimit;
}

interface EncryptionProps {
Expand Down
28 changes: 27 additions & 1 deletion packages/@aws-cdk/aws-sqs/test/integ.sqs.expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"HighThroughputFifoQueue40A0EEE4": {
"Type": "AWS::SQS::Queue",
"Properties": {
"DeduplicationScope": "messageGroup",
"FifoQueue": true,
"FifoThroughputLimit": "perMessageGroupId"
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"Role1ABCC5F0": {
"Type": "AWS::IAM::Role",
"Properties": {
Expand Down Expand Up @@ -165,6 +175,22 @@
"Arn"
]
}
},
{
"Action": [
"sqs:ReceiveMessage",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueUrl",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"HighThroughputFifoQueue40A0EEE4",
"Arn"
]
}
}
],
"Version": "2012-10-17"
Expand All @@ -185,4 +211,4 @@
}
}
}
}
}
8 changes: 7 additions & 1 deletion packages/@aws-cdk/aws-sqs/test/integ.sqs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AccountRootPrincipal, Role } from '@aws-cdk/aws-iam';
import { Key } from '@aws-cdk/aws-kms';
import { App, CfnOutput, RemovalPolicy, Stack } from '@aws-cdk/core';
import { Queue, QueueEncryption } from '../lib';
import { DeduplicationScope, FifoThroughputLimit, Queue, QueueEncryption } from '../lib';

const app = new App();

Expand All @@ -16,6 +16,11 @@ const fifo = new Queue(stack, 'FifoQueue', {
fifo: true,
encryptionMasterKey: new Key(stack, 'EncryptionKey', { removalPolicy: RemovalPolicy.DESTROY }),
});
const highThroughputFifo = new Queue(stack, 'HighThroughputFifoQueue', {
fifo: true,
fifoThroughputLimit: FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
deduplicationScope: DeduplicationScope.MESSAGE_GROUP,
});

const role = new Role(stack, 'Role', {
assumedBy: new AccountRootPrincipal(),
Expand All @@ -24,6 +29,7 @@ const role = new Role(stack, 'Role', {
dlq.grantConsumeMessages(role);
queue.grantConsumeMessages(role);
fifo.grantConsumeMessages(role);
highThroughputFifo.grantConsumeMessages(role);

new CfnOutput(stack, 'QueueUrl', { value: queue.queueUrl });

Expand Down
49 changes: 49 additions & 0 deletions packages/@aws-cdk/aws-sqs/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,55 @@ export = {
test.done();
},

'test a fifo queue is observed when high throughput properties are specified'(test: Test) {
const stack = new Stack();
const queue = new sqs.Queue(stack, 'Queue', {
fifo: true,
fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
});

test.deepEqual(queue.fifo, true);
expect(stack).toMatch({
'Resources': {
'Queue4A7E3555': {
'Type': 'AWS::SQS::Queue',
'Properties': {
'DeduplicationScope': 'messageGroup',
'FifoQueue': true,
'FifoThroughputLimit': 'perMessageGroupId',
},
'UpdateReplacePolicy': 'Delete',
'DeletionPolicy': 'Delete',
},
},
});

test.done();
},

'test a queue throws when fifoThroughputLimit specified on non fifo queue'(test: Test) {
const stack = new Stack();
test.throws(() => {
new sqs.Queue(stack, 'Queue', {
fifo: false,
fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
});
});
test.done();
},

'test a queue throws when deduplicationScope specified on non fifo queue'(test: Test) {
const stack = new Stack();
test.throws(() => {
new sqs.Queue(stack, 'Queue', {
fifo: false,
deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
});
});
test.done();
},

'test metrics'(test: Test) {
// GIVEN
const stack = new Stack();
Expand Down