diff --git a/packages/@aws-cdk/aws-sqs/README.md b/packages/@aws-cdk/aws-sqs/README.md index a66e03a07aa20..c824f9410e8ad 100644 --- a/packages/@aws-cdk/aws-sqs/README.md +++ b/packages/@aws-cdk/aws-sqs/README.md @@ -63,4 +63,5 @@ features in order to help guarantee exactly-once processing. For more informatio the SQS manual. Note that FIFO queues are not available in all AWS regions. A queue can be made a FIFO queue by either setting `fifo: true`, giving it a name which ends -in `".fifo"`, or enabling content-based deduplication (which requires FIFO queues). +in `".fifo"`, or by enabling a FIFO specific feature such as: content-based deduplication, +deduplication scope or fifo throughput limit. diff --git a/packages/@aws-cdk/aws-sqs/lib/queue.ts b/packages/@aws-cdk/aws-sqs/lib/queue.ts index 1c3067c7a70d6..414ddaaf8a919 100644 --- a/packages/@aws-cdk/aws-sqs/lib/queue.ts +++ b/packages/@aws-cdk/aws-sqs/lib/queue.ts @@ -138,6 +138,26 @@ export interface QueueProps { */ readonly contentBasedDeduplication?: boolean; + /** + * For high throughput for FIFO queues, specifies whether message deduplication + * occurs at the message group or queue level. + * + * (Only applies to FIFO queues.) + * + * @default DeduplicationScope.QUEUE + */ + readonly deduplicationScope?: DeduplicationScope; + + /** + * For high throughput for FIFO queues, specifies whether the FIFO queue + * throughput quota applies to the entire queue or per message group. + * + * (Only applies to FIFO queues.) + * + * @default FifoThroughputLimit.PER_QUEUE + */ + readonly fifoThroughputLimit?: FifoThroughputLimit; + /** * Policy to apply when the user pool is removed from the stack * @@ -188,6 +208,34 @@ export enum QueueEncryption { KMS = 'KMS', } +/** + * What kind of deduplication scope to apply + */ +export enum DeduplicationScope { + /** + * Deduplication occurs at the message group level + */ + MESSAGE_GROUP = 'messageGroup', + /** + * Deduplication occurs at the message queue level + */ + QUEUE = 'queue', +} + +/** + * Whether the FIFO queue throughput quota applies to the entire queue or per message group + */ +export enum FifoThroughputLimit { + /** + * Throughput quota applies per queue + */ + PER_QUEUE = 'perQueue', + /** + * Throughput quota applies per message group id + */ + PER_MESSAGE_GROUP_ID = 'perMessageGroupId', +} + /** * A new Amazon SQS queue */ @@ -342,6 +390,8 @@ export class Queue extends QueueBase { const queueName = props.queueName; if (typeof fifoQueue === 'undefined' && queueName && !Token.isUnresolved(queueName) && queueName.endsWith('.fifo')) { fifoQueue = true; } if (typeof fifoQueue === 'undefined' && props.contentBasedDeduplication) { fifoQueue = true; } + if (typeof fifoQueue === 'undefined' && props.deduplicationScope) { fifoQueue = true; } + if (typeof fifoQueue === 'undefined' && props.fifoThroughputLimit) { fifoQueue = true; } // If we have a name, see that it agrees with the FIFO setting if (typeof queueName === 'string') { @@ -357,8 +407,18 @@ export class Queue extends QueueBase { throw new Error('Content-based deduplication can only be defined for FIFO queues'); } + if (props.deduplicationScope && !fifoQueue) { + throw new Error('Deduplication scope can only be defined for FIFO queues'); + } + + if (props.fifoThroughputLimit && !fifoQueue) { + throw new Error('FIFO throughput limit can only be defined for FIFO queues'); + } + return { contentBasedDeduplication: props.contentBasedDeduplication, + deduplicationScope: props.deduplicationScope, + fifoThroughputLimit: props.fifoThroughputLimit, fifoQueue, }; } @@ -367,6 +427,8 @@ export class Queue extends QueueBase { interface FifoProps { readonly fifoQueue?: boolean; readonly contentBasedDeduplication?: boolean; + readonly deduplicationScope?: DeduplicationScope; + readonly fifoThroughputLimit?: FifoThroughputLimit; } interface EncryptionProps { diff --git a/packages/@aws-cdk/aws-sqs/test/integ.sqs.expected.json b/packages/@aws-cdk/aws-sqs/test/integ.sqs.expected.json index c310198cfcae6..6dc7b035309d7 100644 --- a/packages/@aws-cdk/aws-sqs/test/integ.sqs.expected.json +++ b/packages/@aws-cdk/aws-sqs/test/integ.sqs.expected.json @@ -71,6 +71,16 @@ "UpdateReplacePolicy": "Delete", "DeletionPolicy": "Delete" }, + "HighThroughputFifoQueue40A0EEE4": { + "Type": "AWS::SQS::Queue", + "Properties": { + "DeduplicationScope": "messageGroup", + "FifoQueue": true, + "FifoThroughputLimit": "perMessageGroupId" + }, + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + }, "Role1ABCC5F0": { "Type": "AWS::IAM::Role", "Properties": { @@ -165,6 +175,22 @@ "Arn" ] } + }, + { + "Action": [ + "sqs:ReceiveMessage", + "sqs:ChangeMessageVisibility", + "sqs:GetQueueUrl", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "HighThroughputFifoQueue40A0EEE4", + "Arn" + ] + } } ], "Version": "2012-10-17" @@ -185,4 +211,4 @@ } } } -} +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-sqs/test/integ.sqs.ts b/packages/@aws-cdk/aws-sqs/test/integ.sqs.ts index 6839d050783b5..76c51a0e99cc6 100644 --- a/packages/@aws-cdk/aws-sqs/test/integ.sqs.ts +++ b/packages/@aws-cdk/aws-sqs/test/integ.sqs.ts @@ -1,7 +1,7 @@ import { AccountRootPrincipal, Role } from '@aws-cdk/aws-iam'; import { Key } from '@aws-cdk/aws-kms'; import { App, CfnOutput, RemovalPolicy, Stack } from '@aws-cdk/core'; -import { Queue, QueueEncryption } from '../lib'; +import { DeduplicationScope, FifoThroughputLimit, Queue, QueueEncryption } from '../lib'; const app = new App(); @@ -16,6 +16,11 @@ const fifo = new Queue(stack, 'FifoQueue', { fifo: true, encryptionMasterKey: new Key(stack, 'EncryptionKey', { removalPolicy: RemovalPolicy.DESTROY }), }); +const highThroughputFifo = new Queue(stack, 'HighThroughputFifoQueue', { + fifo: true, + fifoThroughputLimit: FifoThroughputLimit.PER_MESSAGE_GROUP_ID, + deduplicationScope: DeduplicationScope.MESSAGE_GROUP, +}); const role = new Role(stack, 'Role', { assumedBy: new AccountRootPrincipal(), @@ -24,6 +29,7 @@ const role = new Role(stack, 'Role', { dlq.grantConsumeMessages(role); queue.grantConsumeMessages(role); fifo.grantConsumeMessages(role); +highThroughputFifo.grantConsumeMessages(role); new CfnOutput(stack, 'QueueUrl', { value: queue.queueUrl }); diff --git a/packages/@aws-cdk/aws-sqs/test/test.sqs.ts b/packages/@aws-cdk/aws-sqs/test/test.sqs.ts index f73f8491ae98a..da8637ba5d4f5 100644 --- a/packages/@aws-cdk/aws-sqs/test/test.sqs.ts +++ b/packages/@aws-cdk/aws-sqs/test/test.sqs.ts @@ -427,6 +427,55 @@ export = { test.done(); }, + 'test a fifo queue is observed when high throughput properties are specified'(test: Test) { + const stack = new Stack(); + const queue = new sqs.Queue(stack, 'Queue', { + fifo: true, + fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID, + deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP, + }); + + test.deepEqual(queue.fifo, true); + expect(stack).toMatch({ + 'Resources': { + 'Queue4A7E3555': { + 'Type': 'AWS::SQS::Queue', + 'Properties': { + 'DeduplicationScope': 'messageGroup', + 'FifoQueue': true, + 'FifoThroughputLimit': 'perMessageGroupId', + }, + 'UpdateReplacePolicy': 'Delete', + 'DeletionPolicy': 'Delete', + }, + }, + }); + + test.done(); + }, + + 'test a queue throws when fifoThroughputLimit specified on non fifo queue'(test: Test) { + const stack = new Stack(); + test.throws(() => { + new sqs.Queue(stack, 'Queue', { + fifo: false, + fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID, + }); + }); + test.done(); + }, + + 'test a queue throws when deduplicationScope specified on non fifo queue'(test: Test) { + const stack = new Stack(); + test.throws(() => { + new sqs.Queue(stack, 'Queue', { + fifo: false, + deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP, + }); + }); + test.done(); + }, + 'test metrics'(test: Test) { // GIVEN const stack = new Stack();