diff --git a/packages/@aws-cdk-containers/ecs-service-extensions/README.md b/packages/@aws-cdk-containers/ecs-service-extensions/README.md index f3845e69a55e5..53e4d2b6f3c56 100644 --- a/packages/@aws-cdk-containers/ecs-service-extensions/README.md +++ b/packages/@aws-cdk-containers/ecs-service-extensions/README.md @@ -19,6 +19,7 @@ The `Service` construct provided by this module can be extended with optional `S - [AWS AppMesh](https://aws.amazon.com/app-mesh/) for adding your application to a service mesh - [Application Load Balancer](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/introduction.html), for exposing your service to the public - [AWS FireLens](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_firelens.html), for filtering and routing application logs +- Queue to allow your service to consume messages from an SQS Queue which is populated by one or more SNS Topics that it is subscribed to - [Community Extensions](#community-extensions), providing support for advanced use cases The `ServiceExtension` class is an abstract class which you can also implement in @@ -321,6 +322,40 @@ const environment = Environment.fromEnvironmentAttributes(stack, 'Environment', ``` +## Queue Extension + +This service extension creates a default SQS Queue `eventsQueue` for the service (if not provided) and accepts a list of `ISubscribable` objects that the `eventsQueue` can subscribe to. The service extension creates the subscriptions and sets up permissions for the service to consume messages from the SQS Queue. + +### Setting up SNS Topic Subscriptions for SQS Queues + +You can use this extension to set up SNS Topic subscriptions for the `eventsQueue`. To do this, create a new object of type `TopicSubscription` for every SNS Topic you want the `eventsQueue` to subscribe to and provide it as input to the service extension. + +```ts +const myServiceDescription = nameDescription.add(new QueueExtension({ + // Provide list of topic subscriptions that you want the `eventsQueue` to subscribe to + subscriptions: [new TopicSubscription({ + topic: new sns.Topic(stack, 'my-topic'), + }], +})); + +// To access the `eventsQueue` for the service, use the `eventsQueue` getter for the extension +const myQueueExtension = myServiceDescription.extensions.queue as QueueExtension; +const myEventsQueue = myQueueExtension.eventsQueue; +``` + +For setting up a topic-specific queue subscription, you can provide a custom queue in the `TopicSubscription` object along with the SNS Topic. The extension will set up a topic subscription for the provided queue instead of the default `eventsQueue` of the service. + +```ts +nameDescription.add(new QueueExtension({ + queue: myEventsQueue, + subscriptions: [new TopicSubscription({ + topic: new sns.Topic(stack, 'my-topic'), + // `myTopicQueue` will subscribe to the `my-topic` instead of `eventsQueue` + queue: myTopicQueue, + }], +})); +``` + ## Community Extensions We encourage the development of Community Service Extensions that support diff --git a/packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/index.ts b/packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/index.ts index 4e464e0d0734e..78c138aba0102 100644 --- a/packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/index.ts +++ b/packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/index.ts @@ -6,3 +6,4 @@ export * from './cloudwatch-agent'; export * from './scale-on-cpu-utilization'; export * from './xray'; export * from './assign-public-ip'; +export * from './queue'; \ No newline at end of file diff --git a/packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/queue.ts b/packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/queue.ts new file mode 100644 index 0000000000000..d43b587d6326e --- /dev/null +++ b/packages/@aws-cdk-containers/ecs-service-extensions/lib/extensions/queue.ts @@ -0,0 +1,212 @@ +import * as ecs from '@aws-cdk/aws-ecs'; +import * as sns from '@aws-cdk/aws-sns'; +import * as subscription from '@aws-cdk/aws-sns-subscriptions'; +import * as sqs from '@aws-cdk/aws-sqs'; +import * as cdk from '@aws-cdk/core'; +import { Service } from '../service'; +import { Container } from './container'; +import { ContainerMutatingHook, ServiceExtension } from './extension-interfaces'; + +// Keep this import separate from other imports to reduce chance for merge conflicts with v2-main +// eslint-disable-next-line no-duplicate-imports, import/order +import { Construct } from '@aws-cdk/core'; + +/** + * An interface that will be implemented by all the resources that can be subscribed to. + */ +export interface ISubscribable { + /** + * All classes implementing this interface must also implement the `subscribe()` method + */ + subscribe(extension: QueueExtension): sqs.IQueue; +} + +/** + * The settings for the Queue extension. + */ +export interface QueueExtensionProps { + /** + * The list of subscriptions for this service. + * + * @default none + */ + readonly subscriptions?: ISubscribable[]; + + /** + * The user-provided default queue for this service. + * + * @default If the `eventsQueue` is not provided, a default SQS Queue is created for the service. + */ + readonly eventsQueue?: sqs.IQueue; +} + +/** + * The topic-specific settings for creating the queue subscriptions. + */ +export interface TopicSubscriptionProps { + /** + * The SNS Topic to subscribe to. + */ + readonly topic: sns.ITopic; + + /** + * The user-provided queue to subscribe to the given topic. + * If the `queue` is not provided, the default `eventsQueue` will subscribe to the given topic. + * + * @default none + */ + readonly queue?: sqs.IQueue; +} + +/** + * The `TopicSubscription` class represents an SNS Topic resource that can be subscribed to by the service queues. + */ +export class TopicSubscription implements ISubscribable { + public readonly topic: sns.ITopic; + + public readonly queue?: sqs.IQueue; + + constructor(props: TopicSubscriptionProps) { + this.topic = props.topic; + this.queue = props.queue; + } + + /** + * This method sets up SNS Topic subscriptions for the SQS queue provided by the user. If a `queue` is not provided, + * the default `eventsQueue` subscribes to the given topic. + * + * @param extension `QueueExtension` added to the service + * @returns the queue subscribed to the given topic + */ + public subscribe(extension: QueueExtension) : sqs.IQueue { + let queue = extension.eventsQueue; + if (this.queue) { + queue = this.queue; + } + this.topic.addSubscription(new subscription.SqsSubscription(queue)); + return queue; + } +} + +/** + * Settings for the hook which mutates the application container + * to add the events queue URI to its environment. + */ +interface ContainerMutatingProps { + /** + * The events queue name and URI to be added to the container environment. + */ + readonly environment: { [key: string]: string }; +} + +/** + * This hook modifies the application container's environment to + * add the queue URL for the events queue of the service. + */ +class QueueExtensionMutatingHook extends ContainerMutatingHook { + private environment: { [key: string]: string }; + + constructor(props: ContainerMutatingProps) { + super(); + this.environment = props.environment; + } + + public mutateContainerDefinition(props: ecs.ContainerDefinitionOptions): ecs.ContainerDefinitionOptions { + return { + ...props, + + environment: { ...(props.environment || {}), ...this.environment }, + } as ecs.ContainerDefinitionOptions; + } +} + +/** + * This extension creates a default `eventsQueue` for the service (if not provided) and accepts a list of objects of + * type `ISubscribable` that the `eventsQueue` subscribes to. It creates the subscriptions and sets up permissions + * for the service to consume messages from the SQS Queues. + * + * The default queue for this service can be accessed using the getter `.eventsQueue`. + */ +export class QueueExtension extends ServiceExtension { + private _eventsQueue!: sqs.IQueue; + + private subscriptionQueues = new Set(); + + private environment: { [key: string]: string } = {}; + + private props?: QueueExtensionProps; + + constructor(props?: QueueExtensionProps) { + super('queue'); + + this.props = props; + } + + /** + * This hook creates (if required) and sets the default queue `eventsQueue`. It also sets up the subscriptions for + * the provided `ISubscribable` objects. + * + * @param service The parent service which this extension has been added to + * @param scope The scope that this extension should create resources in + */ + public prehook(service: Service, scope: Construct) { + this.parentService = service; + this.scope = scope; + + let eventsQueue = this.props?.eventsQueue; + if (!eventsQueue) { + const deadLetterQueue = new sqs.Queue(this.scope, 'EventsDeadLetterQueue', { + retentionPeriod: cdk.Duration.days(14), + }); + + eventsQueue = new sqs.Queue(this.scope, 'EventsQueue', { + deadLetterQueue: { + queue: deadLetterQueue, + maxReceiveCount: 3, + }, + }); + } + this._eventsQueue = eventsQueue; + + this.environment[`${this.parentService.id.toUpperCase()}_QUEUE_URI`] = this._eventsQueue.queueUrl; + + if (this.props?.subscriptions) { + for (const subs of this.props.subscriptions) { + const subsQueue = subs.subscribe(this); + this.subscriptionQueues.add(subsQueue); + } + } + } + + /** + * Add hooks to the main application extension so that it is modified to + * add the events queue URL to the container environment. + */ + public addHooks() { + const container = this.parentService.serviceDescription.get('service-container') as Container; + + if (!container) { + throw new Error('Queue Extension requires an application extension'); + } + + container.addContainerMutatingHook(new QueueExtensionMutatingHook({ + environment: this.environment, + })); + } + + /** + * After the task definition has been created, this hook grants SQS permissions to the task role. + * + * @param taskDefinition The created task definition + */ + public useTaskDefinition(taskDefinition: ecs.TaskDefinition) { + this._eventsQueue.grantConsumeMessages(taskDefinition.taskRole); + for (const queue of this.subscriptionQueues) { + queue.grantConsumeMessages(taskDefinition.taskRole); + } + } + + public get eventsQueue() : sqs.IQueue { + return this._eventsQueue; + } +} \ No newline at end of file diff --git a/packages/@aws-cdk-containers/ecs-service-extensions/package.json b/packages/@aws-cdk-containers/ecs-service-extensions/package.json index 0d99dadbedb02..444e43ba9a19a 100644 --- a/packages/@aws-cdk-containers/ecs-service-extensions/package.json +++ b/packages/@aws-cdk-containers/ecs-service-extensions/package.json @@ -64,6 +64,8 @@ "@aws-cdk/aws-route53": "0.0.0", "@aws-cdk/aws-route53-targets": "0.0.0", "@aws-cdk/aws-servicediscovery": "0.0.0", + "@aws-cdk/aws-sns": "0.0.0", + "@aws-cdk/aws-sns-subscriptions": "0.0.0", "@aws-cdk/aws-sqs": "0.0.0", "@aws-cdk/core": "0.0.0", "@aws-cdk/custom-resources": "0.0.0", @@ -89,6 +91,8 @@ "@aws-cdk/aws-route53": "0.0.0", "@aws-cdk/aws-route53-targets": "0.0.0", "@aws-cdk/aws-servicediscovery": "0.0.0", + "@aws-cdk/aws-sns": "0.0.0", + "@aws-cdk/aws-sns-subscriptions": "0.0.0", "@aws-cdk/aws-sqs": "0.0.0", "@aws-cdk/core": "0.0.0", "@aws-cdk/custom-resources": "0.0.0", diff --git a/packages/@aws-cdk-containers/ecs-service-extensions/test/test.queue.ts b/packages/@aws-cdk-containers/ecs-service-extensions/test/test.queue.ts new file mode 100644 index 0000000000000..ec2fb234c16ce --- /dev/null +++ b/packages/@aws-cdk-containers/ecs-service-extensions/test/test.queue.ts @@ -0,0 +1,508 @@ +import { countResources, expect, haveResource } from '@aws-cdk/assert-internal'; +import * as ecs from '@aws-cdk/aws-ecs'; +import * as sns from '@aws-cdk/aws-sns'; +import * as sqs from '@aws-cdk/aws-sqs'; +import * as cdk from '@aws-cdk/core'; +import { Test } from 'nodeunit'; +import { Container, Environment, QueueExtension, Service, ServiceDescription, TopicSubscription } from '../lib'; + +export = { + 'should only create a default queue when no input props are provided'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const environment = new Environment(stack, 'production'); + const serviceDescription = new ServiceDescription(); + + serviceDescription.add(new Container({ + cpu: 256, + memoryMiB: 512, + trafficPort: 80, + image: ecs.ContainerImage.fromRegistry('nathanpeck/name'), + environment: { + PORT: '80', + }, + })); + + // WHEN + serviceDescription.add(new QueueExtension()); + + new Service(stack, 'my-service', { + environment, + serviceDescription, + }); + + // THEN + // Ensure creation of default queue and queue policy allowing SNS Topics to send message to the queue + expect(stack).to(haveResource('AWS::SQS::Queue', { + MessageRetentionPeriod: 1209600, + })); + + expect(stack).to(haveResource('AWS::SQS::Queue', { + RedrivePolicy: { + deadLetterTargetArn: { + 'Fn::GetAtt': [ + 'EventsDeadLetterQueue404572C7', + 'Arn', + ], + }, + maxReceiveCount: 3, + }, + })); + + // Ensure the task role is given permissions to consume messages from the queue + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 'sqs:ReceiveMessage', + 'sqs:ChangeMessageVisibility', + 'sqs:GetQueueUrl', + 'sqs:DeleteMessage', + 'sqs:GetQueueAttributes', + ], + Effect: 'Allow', + Resource: { + 'Fn::GetAtt': [ + 'EventsQueueB96EB0D2', + 'Arn', + ], + }, + }, + ], + Version: '2012-10-17', + }, + })); + + // Ensure there are no SNS Subscriptions created + expect(stack).to(countResources('AWS::SNS::Subscription', 0)); + + // Ensure that the queue URL has been correctly appended to the environment variables + expect(stack).to(haveResource('AWS::ECS::TaskDefinition', { + ContainerDefinitions: [ + { + Cpu: 256, + Environment: [ + { + Name: 'PORT', + Value: '80', + }, + { + Name: 'MY-SERVICE_QUEUE_URI', + Value: { + Ref: 'EventsQueueB96EB0D2', + }, + }, + ], + Image: 'nathanpeck/name', + Essential: true, + Memory: 512, + Name: 'app', + PortMappings: [ + { + ContainerPort: 80, + Protocol: 'tcp', + }, + ], + Ulimits: [ + { + HardLimit: 1024000, + Name: 'nofile', + SoftLimit: 1024000, + }, + ], + }, + ], + })); + + test.done(); + }, + + 'should be able to subscribe default events queue created by the extension to given topics'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const environment = new Environment(stack, 'production'); + const serviceDescription = new ServiceDescription(); + + serviceDescription.add(new Container({ + cpu: 256, + memoryMiB: 512, + trafficPort: 80, + image: ecs.ContainerImage.fromRegistry('nathanpeck/name'), + environment: { + PORT: '80', + }, + })); + + // WHEN + const topicSubscription1 = new TopicSubscription({ + topic: new sns.Topic(stack, 'topic1'), + }); + const topicSubscription2 = new TopicSubscription({ + topic: new sns.Topic(stack, 'topic2'), + }); + serviceDescription.add(new QueueExtension({ + subscriptions: [topicSubscription1, topicSubscription2], + })); + + new Service(stack, 'my-service', { + environment, + serviceDescription, + }); + + // THEN + // Ensure creation of default queue and queue policy allowing SNS Topics to send message to the queue + expect(stack).to(haveResource('AWS::SQS::Queue', { + MessageRetentionPeriod: 1209600, + })); + + expect(stack).to(haveResource('AWS::SQS::Queue', { + RedrivePolicy: { + deadLetterTargetArn: { + 'Fn::GetAtt': [ + 'EventsDeadLetterQueue404572C7', + 'Arn', + ], + }, + maxReceiveCount: 3, + }, + })); + + expect(stack).to(haveResource('AWS::SQS::QueuePolicy', { + PolicyDocument: { + Statement: [ + { + Action: 'sqs:SendMessage', + Condition: { + ArnEquals: { + 'aws:SourceArn': { + Ref: 'topic152D84A37', + }, + }, + }, + Effect: 'Allow', + Principal: { + Service: 'sns.amazonaws.com', + }, + Resource: { + 'Fn::GetAtt': [ + 'EventsQueueB96EB0D2', + 'Arn', + ], + }, + }, + { + Action: 'sqs:SendMessage', + Condition: { + ArnEquals: { + 'aws:SourceArn': { + Ref: 'topic2A4FB547F', + }, + }, + }, + Effect: 'Allow', + Principal: { + Service: 'sns.amazonaws.com', + }, + Resource: { + 'Fn::GetAtt': [ + 'EventsQueueB96EB0D2', + 'Arn', + ], + }, + }, + ], + Version: '2012-10-17', + }, + })); + + // Ensure the task role is given permissions to consume messages from the queue + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 'sqs:ReceiveMessage', + 'sqs:ChangeMessageVisibility', + 'sqs:GetQueueUrl', + 'sqs:DeleteMessage', + 'sqs:GetQueueAttributes', + ], + Effect: 'Allow', + Resource: { + 'Fn::GetAtt': [ + 'EventsQueueB96EB0D2', + 'Arn', + ], + }, + }, + ], + Version: '2012-10-17', + }, + })); + + // Ensure SNS Subscriptions for given topics + expect(stack).to(haveResource('AWS::SNS::Subscription', { + Protocol: 'sqs', + TopicArn: { + Ref: 'topic152D84A37', + }, + Endpoint: { + 'Fn::GetAtt': [ + 'EventsQueueB96EB0D2', + 'Arn', + ], + }, + })); + + expect(stack).to(haveResource('AWS::SNS::Subscription', { + Protocol: 'sqs', + TopicArn: { + Ref: 'topic2A4FB547F', + }, + Endpoint: { + 'Fn::GetAtt': [ + 'EventsQueueB96EB0D2', + 'Arn', + ], + }, + })); + + // Ensure that the queue URL has been correctly appended to the environment variables + expect(stack).to(haveResource('AWS::ECS::TaskDefinition', { + ContainerDefinitions: [ + { + Cpu: 256, + Environment: [ + { + Name: 'PORT', + Value: '80', + }, + { + Name: 'MY-SERVICE_QUEUE_URI', + Value: { + Ref: 'EventsQueueB96EB0D2', + }, + }, + ], + Image: 'nathanpeck/name', + Essential: true, + Memory: 512, + Name: 'app', + PortMappings: [ + { + ContainerPort: 80, + Protocol: 'tcp', + }, + ], + Ulimits: [ + { + HardLimit: 1024000, + Name: 'nofile', + SoftLimit: 1024000, + }, + ], + }, + ], + })); + + test.done(); + }, + + 'should be able to subscribe user-provided queue to given topics'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const environment = new Environment(stack, 'production'); + const serviceDescription = new ServiceDescription(); + + serviceDescription.add(new Container({ + cpu: 256, + memoryMiB: 512, + trafficPort: 80, + image: ecs.ContainerImage.fromRegistry('nathanpeck/name'), + })); + + const topicSubscription1 = new TopicSubscription({ + topic: new sns.Topic(stack, 'topic1'), + queue: new sqs.Queue(stack, 'myQueue'), + }); + const topicSubscription2 = new TopicSubscription({ + topic: new sns.Topic(stack, 'topic2'), + }); + serviceDescription.add(new QueueExtension({ + subscriptions: [topicSubscription1, topicSubscription2], + eventsQueue: new sqs.Queue(stack, 'defQueue'), + })); + + new Service(stack, 'my-service', { + environment, + serviceDescription, + }); + + // THEN + // Ensure queue policy allows SNS Topics to send message to the queue + expect(stack).to(haveResource('AWS::SQS::QueuePolicy', { + PolicyDocument: { + Statement: [ + { + Action: 'sqs:SendMessage', + Condition: { + ArnEquals: { + 'aws:SourceArn': { + Ref: 'topic152D84A37', + }, + }, + }, + Effect: 'Allow', + Principal: { + Service: 'sns.amazonaws.com', + }, + Resource: { + 'Fn::GetAtt': [ + 'myQueue4FDFF71C', + 'Arn', + ], + }, + }, + ], + Version: '2012-10-17', + }, + })); + + expect(stack).to(haveResource('AWS::SQS::QueuePolicy', { + PolicyDocument: { + Statement: [ + { + Action: 'sqs:SendMessage', + Condition: { + ArnEquals: { + 'aws:SourceArn': { + Ref: 'topic2A4FB547F', + }, + }, + }, + Effect: 'Allow', + Principal: { + Service: 'sns.amazonaws.com', + }, + Resource: { + 'Fn::GetAtt': [ + 'defQueue1F91A65B', + 'Arn', + ], + }, + }, + ], + Version: '2012-10-17', + }, + })); + + // Ensure the task role is given permissions to consume messages from the queue + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 'sqs:ReceiveMessage', + 'sqs:ChangeMessageVisibility', + 'sqs:GetQueueUrl', + 'sqs:DeleteMessage', + 'sqs:GetQueueAttributes', + ], + Effect: 'Allow', + Resource: { + 'Fn::GetAtt': [ + 'defQueue1F91A65B', + 'Arn', + ], + }, + }, + { + Action: [ + 'sqs:ReceiveMessage', + 'sqs:ChangeMessageVisibility', + 'sqs:GetQueueUrl', + 'sqs:DeleteMessage', + 'sqs:GetQueueAttributes', + ], + Effect: 'Allow', + Resource: { + 'Fn::GetAtt': [ + 'myQueue4FDFF71C', + 'Arn', + ], + }, + }, + ], + Version: '2012-10-17', + }, + })); + + // Ensure SNS Subscriptions for given topics + expect(stack).to(haveResource('AWS::SNS::Subscription', { + Protocol: 'sqs', + TopicArn: { + Ref: 'topic152D84A37', + }, + Endpoint: { + 'Fn::GetAtt': [ + 'myQueue4FDFF71C', + 'Arn', + ], + }, + })); + + expect(stack).to(haveResource('AWS::SNS::Subscription', { + Protocol: 'sqs', + TopicArn: { + Ref: 'topic2A4FB547F', + }, + Endpoint: { + 'Fn::GetAtt': [ + 'defQueue1F91A65B', + 'Arn', + ], + }, + })); + + // Ensure that the queue URL has been correctly added to the environment variables + expect(stack).to(haveResource('AWS::ECS::TaskDefinition', { + ContainerDefinitions: [ + { + Cpu: 256, + Environment: [ + { + Name: 'MY-SERVICE_QUEUE_URI', + Value: { + Ref: 'defQueue1F91A65B', + }, + }, + ], + Image: 'nathanpeck/name', + Essential: true, + Memory: 512, + Name: 'app', + PortMappings: [ + { + ContainerPort: 80, + Protocol: 'tcp', + }, + ], + Ulimits: [ + { + HardLimit: 1024000, + Name: 'nofile', + SoftLimit: 1024000, + }, + ], + }, + ], + })); + + test.done(); + }, +}; \ No newline at end of file