diff --git a/packages/@aws-cdk/aws-ecs-patterns/README.md b/packages/@aws-cdk/aws-ecs-patterns/README.md index 5da8d9feb1391..e3f1f95984ed4 100644 --- a/packages/@aws-cdk/aws-ecs-patterns/README.md +++ b/packages/@aws-cdk/aws-ecs-patterns/README.md @@ -269,6 +269,8 @@ const queueProcessingFargateService = new QueueProcessingFargateService(stack, ' }); ``` +when queue not provided by user, CDK will create a primary queue and a dead letter queue with default redrive policy and attach permission to the task to be able to access the primary queue. + ## Scheduled Tasks To define a task that runs periodically, instantiate an `ScheduledEc2Task`: diff --git a/packages/@aws-cdk/aws-ecs-patterns/lib/base/queue-processing-service-base.ts b/packages/@aws-cdk/aws-ecs-patterns/lib/base/queue-processing-service-base.ts index 633385da4bbfc..08738ca788485 100644 --- a/packages/@aws-cdk/aws-ecs-patterns/lib/base/queue-processing-service-base.ts +++ b/packages/@aws-cdk/aws-ecs-patterns/lib/base/queue-processing-service-base.ts @@ -2,7 +2,7 @@ import { ScalingInterval } from '@aws-cdk/aws-applicationautoscaling'; import { IVpc } from '@aws-cdk/aws-ec2'; import { AwsLogDriver, BaseService, Cluster, ContainerImage, ICluster, LogDriver, PropagatedTagSource, Secret } from '@aws-cdk/aws-ecs'; import { IQueue, Queue } from '@aws-cdk/aws-sqs'; -import { CfnOutput, Construct, Stack } from '@aws-cdk/core'; +import { CfnOutput, Construct, Duration, Stack } from '@aws-cdk/core'; /** * The properties for the base QueueProcessingEc2Service or QueueProcessingFargateService service. @@ -86,6 +86,21 @@ export interface QueueProcessingServiceBaseProps { */ readonly queue?: IQueue; + /** + * The maximum number of times that a message can be received by consumers. + * When this value is exceeded for a message the message will be automatically sent to the Dead Letter Queue. + * + * @default 3 + */ + readonly maxReceiveCount?: number; + + /** + * The number of seconds that Dead Letter Queue retains a message. + * + * @default Duration.days(14) + */ + readonly retentionPeriod?: Duration; + /** * Maximum capacity to scale to. * @@ -143,6 +158,11 @@ export abstract class QueueProcessingServiceBase extends Construct { */ public readonly sqsQueue: IQueue; + /** + * The dead letter queue for the primary SQS queue + */ + public readonly deadLetterQueue?: IQueue; + /** * The cluster where your service will be deployed */ @@ -191,8 +211,23 @@ export abstract class QueueProcessingServiceBase extends Construct { } this.cluster = props.cluster || this.getDefaultCluster(this, props.vpc); - // Create the SQS queue if one is not provided - this.sqsQueue = props.queue !== undefined ? props.queue : new Queue(this, 'EcsProcessingQueue', {}); + // Create the SQS queue and it's corresponding DLQ if one is not provided + if (props.queue) { + this.sqsQueue = props.queue; + } else { + this.deadLetterQueue = new Queue(this, "EcsProcessingDeadLetterQueue", { + retentionPeriod: props.retentionPeriod || Duration.days(14) + }); + this.sqsQueue = new Queue(this, 'EcsProcessingQueue', { + deadLetterQueue: { + queue: this.deadLetterQueue, + maxReceiveCount: props.maxReceiveCount || 3 + } + }); + + new CfnOutput(this, 'SQSDeadLetterQueue', { value: this.deadLetterQueue.queueName }); + new CfnOutput(this, 'SQSDeadLetterQueueArn', { value: this.deadLetterQueue.queueArn }); + } // Setup autoscaling scaling intervals const defaultScalingSteps = [{ upper: 0, change: -1 }, { lower: 100, change: +1 }, { lower: 500, change: +5 }]; diff --git a/packages/@aws-cdk/aws-ecs-patterns/test/ec2/test.queue-processing-ecs-service.ts b/packages/@aws-cdk/aws-ecs-patterns/test/ec2/test.queue-processing-ecs-service.ts index 781ca75b339fa..3daa6ed266ec9 100644 --- a/packages/@aws-cdk/aws-ecs-patterns/test/ec2/test.queue-processing-ecs-service.ts +++ b/packages/@aws-cdk/aws-ecs-patterns/test/ec2/test.queue-processing-ecs-service.ts @@ -27,7 +27,96 @@ export = { LaunchType: "EC2", })); - expect(stack).to(haveResource("AWS::SQS::Queue")); + expect(stack).to(haveResource("AWS::SQS::Queue", { + RedrivePolicy: { + deadLetterTargetArn: { + "Fn::GetAtt": [ + "ServiceEcsProcessingDeadLetterQueue4A89196E", + "Arn" + ] + }, + maxReceiveCount: 3 + } + })); + + expect(stack).to(haveResource("AWS::SQS::Queue", { + MessageRetentionPeriod: 1209600 + })); + + expect(stack).to(haveResourceLike('AWS::ECS::TaskDefinition', { + ContainerDefinitions: [ + { + Environment: [ + { + Name: "QUEUE_NAME", + Value: { + "Fn::GetAtt": [ + "ServiceEcsProcessingQueueC266885C", + "QueueName" + ] + } + } + ], + LogConfiguration: { + LogDriver: "awslogs", + Options: { + "awslogs-group": { + Ref: "ServiceQueueProcessingTaskDefQueueProcessingContainerLogGroupD52338D1" + }, + "awslogs-stream-prefix": "Service", + "awslogs-region": { + Ref: "AWS::Region" + } + } + }, + Essential: true, + Image: "test", + Memory: 512 + } + ], + Family: "ServiceQueueProcessingTaskDef83DB34F1" + })); + + test.done(); + }, + + 'test ECS queue worker service construct - with optional props for queues'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const vpc = new ec2.Vpc(stack, 'VPC'); + const cluster = new ecs.Cluster(stack, 'Cluster', { vpc }); + cluster.addCapacity('DefaultAutoScalingGroup', { instanceType: new ec2.InstanceType('t2.micro') }); + + // WHEN + new ecsPatterns.QueueProcessingEc2Service(stack, 'Service', { + cluster, + memoryLimitMiB: 512, + image: ecs.ContainerImage.fromRegistry('test'), + maxReceiveCount: 42, + retentionPeriod: cdk.Duration.days(7) + }); + + // THEN - QueueWorker is of EC2 launch type, an SQS queue is created and all default properties are set. + expect(stack).to(haveResource("AWS::ECS::Service", { + DesiredCount: 1, + LaunchType: "EC2", + })); + + expect(stack).to(haveResource("AWS::SQS::Queue", { + RedrivePolicy: { + deadLetterTargetArn: { + "Fn::GetAtt": [ + "ServiceEcsProcessingDeadLetterQueue4A89196E", + "Arn" + ] + }, + maxReceiveCount: 42 + } + })); + + expect(stack).to(haveResource("AWS::SQS::Queue", { + MessageRetentionPeriod: 604800 + })); expect(stack).to(haveResourceLike('AWS::ECS::TaskDefinition', { ContainerDefinitions: [ diff --git a/packages/@aws-cdk/aws-ecs-patterns/test/fargate/test.queue-processing-fargate-service.ts b/packages/@aws-cdk/aws-ecs-patterns/test/fargate/test.queue-processing-fargate-service.ts index 248d6cbf57be6..c06f10b10b7aa 100644 --- a/packages/@aws-cdk/aws-ecs-patterns/test/fargate/test.queue-processing-fargate-service.ts +++ b/packages/@aws-cdk/aws-ecs-patterns/test/fargate/test.queue-processing-fargate-service.ts @@ -27,7 +27,118 @@ export = { LaunchType: "FARGATE", })); - expect(stack).to(haveResource("AWS::SQS::Queue")); + expect(stack).to(haveResource("AWS::SQS::Queue", { + RedrivePolicy: { + deadLetterTargetArn: { + "Fn::GetAtt": [ + "ServiceEcsProcessingDeadLetterQueue4A89196E", + "Arn" + ] + }, + maxReceiveCount: 3 + } + })); + + expect(stack).to(haveResource("AWS::SQS::Queue", { + MessageRetentionPeriod: 1209600 + })); + + expect(stack).to(haveResource("AWS::IAM::Policy", { + PolicyDocument: { + Statement: [ + { + Action: [ + "sqs:ReceiveMessage", + "sqs:ChangeMessageVisibility", + "sqs:GetQueueUrl", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + Effect: "Allow", + Resource: { + "Fn::GetAtt": [ + "ServiceEcsProcessingQueueC266885C", + "Arn" + ] + } + } + ], + Version: "2012-10-17" + } + })); + + expect(stack).to(haveResourceLike('AWS::ECS::TaskDefinition', { + ContainerDefinitions: [ + { + Environment: [ + { + Name: "QUEUE_NAME", + Value: { + "Fn::GetAtt": [ + "ServiceEcsProcessingQueueC266885C", + "QueueName" + ] + } + } + ], + LogConfiguration: { + LogDriver: "awslogs", + Options: { + "awslogs-group": { + Ref: "ServiceQueueProcessingTaskDefQueueProcessingContainerLogGroupD52338D1" + }, + "awslogs-stream-prefix": "Service", + "awslogs-region": { + Ref: "AWS::Region" + } + } + }, + Image: "test", + } + ], + Family: "ServiceQueueProcessingTaskDef83DB34F1" + })); + + test.done(); + }, + + 'test fargate queue worker service construct - with optional props for queues'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const vpc = new ec2.Vpc(stack, 'VPC'); + const cluster = new ecs.Cluster(stack, 'Cluster', { vpc }); + cluster.addCapacity('DefaultAutoScalingGroup', { instanceType: new ec2.InstanceType('t2.micro') }); + + // WHEN + new ecsPatterns.QueueProcessingFargateService(stack, 'Service', { + cluster, + memoryLimitMiB: 512, + image: ecs.ContainerImage.fromRegistry('test'), + maxReceiveCount: 42, + retentionPeriod: cdk.Duration.days(7) + }); + + // THEN - QueueWorker is of FARGATE launch type, an SQS queue is created and all default properties are set. + expect(stack).to(haveResource("AWS::ECS::Service", { + DesiredCount: 1, + LaunchType: "FARGATE", + })); + + expect(stack).to(haveResource("AWS::SQS::Queue", { + RedrivePolicy: { + deadLetterTargetArn: { + "Fn::GetAtt": [ + "ServiceEcsProcessingDeadLetterQueue4A89196E", + "Arn" + ] + }, + maxReceiveCount: 42 + } + })); + + expect(stack).to(haveResource("AWS::SQS::Queue", { + MessageRetentionPeriod: 604800 + })); expect(stack).to(haveResource("AWS::IAM::Policy", { PolicyDocument: {