Skip to content

Commit

Permalink
Merge branch 'master' into nija-at/cognito-verif-invit-msgs
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Feb 20, 2020
2 parents 44c98c7 + e307d7f commit 85e2241
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 5 deletions.
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-ecs-patterns/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ const queueProcessingFargateService = new QueueProcessingFargateService(stack, '
});
```

when queue not provided by user, CDK will create a primary queue and a dead letter queue with default redrive policy and attach permission to the task to be able to access the primary queue.

## Scheduled Tasks

To define a task that runs periodically, instantiate an `ScheduledEc2Task`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ScalingInterval } from '@aws-cdk/aws-applicationautoscaling';
import { IVpc } from '@aws-cdk/aws-ec2';
import { AwsLogDriver, BaseService, Cluster, ContainerImage, ICluster, LogDriver, PropagatedTagSource, Secret } from '@aws-cdk/aws-ecs';
import { IQueue, Queue } from '@aws-cdk/aws-sqs';
import { CfnOutput, Construct, Stack } from '@aws-cdk/core';
import { CfnOutput, Construct, Duration, Stack } from '@aws-cdk/core';

/**
* The properties for the base QueueProcessingEc2Service or QueueProcessingFargateService service.
Expand Down Expand Up @@ -86,6 +86,21 @@ export interface QueueProcessingServiceBaseProps {
*/
readonly queue?: IQueue;

/**
* The maximum number of times that a message can be received by consumers.
* When this value is exceeded for a message the message will be automatically sent to the Dead Letter Queue.
*
* @default 3
*/
readonly maxReceiveCount?: number;

/**
* The number of seconds that Dead Letter Queue retains a message.
*
* @default Duration.days(14)
*/
readonly retentionPeriod?: Duration;

/**
* Maximum capacity to scale to.
*
Expand Down Expand Up @@ -143,6 +158,11 @@ export abstract class QueueProcessingServiceBase extends Construct {
*/
public readonly sqsQueue: IQueue;

/**
* The dead letter queue for the primary SQS queue
*/
public readonly deadLetterQueue?: IQueue;

/**
* The cluster where your service will be deployed
*/
Expand Down Expand Up @@ -191,8 +211,23 @@ export abstract class QueueProcessingServiceBase extends Construct {
}
this.cluster = props.cluster || this.getDefaultCluster(this, props.vpc);

// Create the SQS queue if one is not provided
this.sqsQueue = props.queue !== undefined ? props.queue : new Queue(this, 'EcsProcessingQueue', {});
// Create the SQS queue and it's corresponding DLQ if one is not provided
if (props.queue) {
this.sqsQueue = props.queue;
} else {
this.deadLetterQueue = new Queue(this, "EcsProcessingDeadLetterQueue", {
retentionPeriod: props.retentionPeriod || Duration.days(14)
});
this.sqsQueue = new Queue(this, 'EcsProcessingQueue', {
deadLetterQueue: {
queue: this.deadLetterQueue,
maxReceiveCount: props.maxReceiveCount || 3
}
});

new CfnOutput(this, 'SQSDeadLetterQueue', { value: this.deadLetterQueue.queueName });
new CfnOutput(this, 'SQSDeadLetterQueueArn', { value: this.deadLetterQueue.queueArn });
}

// Setup autoscaling scaling intervals
const defaultScalingSteps = [{ upper: 0, change: -1 }, { lower: 100, change: +1 }, { lower: 500, change: +5 }];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,96 @@ export = {
LaunchType: "EC2",
}));

expect(stack).to(haveResource("AWS::SQS::Queue"));
expect(stack).to(haveResource("AWS::SQS::Queue", {
RedrivePolicy: {
deadLetterTargetArn: {
"Fn::GetAtt": [
"ServiceEcsProcessingDeadLetterQueue4A89196E",
"Arn"
]
},
maxReceiveCount: 3
}
}));

expect(stack).to(haveResource("AWS::SQS::Queue", {
MessageRetentionPeriod: 1209600
}));

expect(stack).to(haveResourceLike('AWS::ECS::TaskDefinition', {
ContainerDefinitions: [
{
Environment: [
{
Name: "QUEUE_NAME",
Value: {
"Fn::GetAtt": [
"ServiceEcsProcessingQueueC266885C",
"QueueName"
]
}
}
],
LogConfiguration: {
LogDriver: "awslogs",
Options: {
"awslogs-group": {
Ref: "ServiceQueueProcessingTaskDefQueueProcessingContainerLogGroupD52338D1"
},
"awslogs-stream-prefix": "Service",
"awslogs-region": {
Ref: "AWS::Region"
}
}
},
Essential: true,
Image: "test",
Memory: 512
}
],
Family: "ServiceQueueProcessingTaskDef83DB34F1"
}));

test.done();
},

'test ECS queue worker service construct - with optional props for queues'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const vpc = new ec2.Vpc(stack, 'VPC');
const cluster = new ecs.Cluster(stack, 'Cluster', { vpc });
cluster.addCapacity('DefaultAutoScalingGroup', { instanceType: new ec2.InstanceType('t2.micro') });

// WHEN
new ecsPatterns.QueueProcessingEc2Service(stack, 'Service', {
cluster,
memoryLimitMiB: 512,
image: ecs.ContainerImage.fromRegistry('test'),
maxReceiveCount: 42,
retentionPeriod: cdk.Duration.days(7)
});

// THEN - QueueWorker is of EC2 launch type, an SQS queue is created and all default properties are set.
expect(stack).to(haveResource("AWS::ECS::Service", {
DesiredCount: 1,
LaunchType: "EC2",
}));

expect(stack).to(haveResource("AWS::SQS::Queue", {
RedrivePolicy: {
deadLetterTargetArn: {
"Fn::GetAtt": [
"ServiceEcsProcessingDeadLetterQueue4A89196E",
"Arn"
]
},
maxReceiveCount: 42
}
}));

expect(stack).to(haveResource("AWS::SQS::Queue", {
MessageRetentionPeriod: 604800
}));

expect(stack).to(haveResourceLike('AWS::ECS::TaskDefinition', {
ContainerDefinitions: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,118 @@ export = {
LaunchType: "FARGATE",
}));

expect(stack).to(haveResource("AWS::SQS::Queue"));
expect(stack).to(haveResource("AWS::SQS::Queue", {
RedrivePolicy: {
deadLetterTargetArn: {
"Fn::GetAtt": [
"ServiceEcsProcessingDeadLetterQueue4A89196E",
"Arn"
]
},
maxReceiveCount: 3
}
}));

expect(stack).to(haveResource("AWS::SQS::Queue", {
MessageRetentionPeriod: 1209600
}));

expect(stack).to(haveResource("AWS::IAM::Policy", {
PolicyDocument: {
Statement: [
{
Action: [
"sqs:ReceiveMessage",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueUrl",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
Effect: "Allow",
Resource: {
"Fn::GetAtt": [
"ServiceEcsProcessingQueueC266885C",
"Arn"
]
}
}
],
Version: "2012-10-17"
}
}));

expect(stack).to(haveResourceLike('AWS::ECS::TaskDefinition', {
ContainerDefinitions: [
{
Environment: [
{
Name: "QUEUE_NAME",
Value: {
"Fn::GetAtt": [
"ServiceEcsProcessingQueueC266885C",
"QueueName"
]
}
}
],
LogConfiguration: {
LogDriver: "awslogs",
Options: {
"awslogs-group": {
Ref: "ServiceQueueProcessingTaskDefQueueProcessingContainerLogGroupD52338D1"
},
"awslogs-stream-prefix": "Service",
"awslogs-region": {
Ref: "AWS::Region"
}
}
},
Image: "test",
}
],
Family: "ServiceQueueProcessingTaskDef83DB34F1"
}));

test.done();
},

'test fargate queue worker service construct - with optional props for queues'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const vpc = new ec2.Vpc(stack, 'VPC');
const cluster = new ecs.Cluster(stack, 'Cluster', { vpc });
cluster.addCapacity('DefaultAutoScalingGroup', { instanceType: new ec2.InstanceType('t2.micro') });

// WHEN
new ecsPatterns.QueueProcessingFargateService(stack, 'Service', {
cluster,
memoryLimitMiB: 512,
image: ecs.ContainerImage.fromRegistry('test'),
maxReceiveCount: 42,
retentionPeriod: cdk.Duration.days(7)
});

// THEN - QueueWorker is of FARGATE launch type, an SQS queue is created and all default properties are set.
expect(stack).to(haveResource("AWS::ECS::Service", {
DesiredCount: 1,
LaunchType: "FARGATE",
}));

expect(stack).to(haveResource("AWS::SQS::Queue", {
RedrivePolicy: {
deadLetterTargetArn: {
"Fn::GetAtt": [
"ServiceEcsProcessingDeadLetterQueue4A89196E",
"Arn"
]
},
maxReceiveCount: 42
}
}));

expect(stack).to(haveResource("AWS::SQS::Queue", {
MessageRetentionPeriod: 604800
}));

expect(stack).to(haveResource("AWS::IAM::Policy", {
PolicyDocument: {
Expand Down

0 comments on commit 85e2241

Please sign in to comment.