Skip to content

Commit

Permalink
feat(ecs-service-extensions): Auto scaling for Queue Extension (aws#1…
Browse files Browse the repository at this point in the history
…7430)

----
This PR adds target tracking auto scaling policy for the the SQS Queues provided to and created by the `QueueExtension` (in the `useService()` hook).

The auto scaling is based on `backlogPerTask` custom metric which is emitted by an AWS Lambda Function. The PR also contains this Lambda Function and its tests.

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
upparekh authored and Pedro Pimentel committed Dec 1, 2021
1 parent 3431453 commit 6c5b30e
Show file tree
Hide file tree
Showing 18 changed files with 1,456 additions and 254 deletions.
35 changes: 33 additions & 2 deletions packages/@aws-cdk-containers/ecs-service-extensions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,42 @@ For setting up a topic-specific queue subscription, you can provide a custom que
```ts
nameDescription.add(new QueueExtension({
queue: myEventsQueue,
eventsQueue: myEventsQueue,
subscriptions: [new TopicSubscription({
topic: new sns.Topic(stack, 'my-topic'),
// `myTopicQueue` will subscribe to the `my-topic` instead of `eventsQueue`
queue: myTopicQueue,
topicSubscriptionQueue: {
queue: myTopicQueue,
},
}],
}));
```
### Configuring auto scaling based on SQS Queues
You can scale your service up or down to maintain an acceptable queue latency by tracking the backlog per task. It configures a target tracking scaling policy with target value (acceptable backlog per task) calculated by dividing the `acceptableLatency` by `messageProcessingTime`. For example, if the maximum acceptable latency for a message to be processed after its arrival in the SQS Queue is 10 mins and the average processing time for a task is 250 milliseconds per message, then `acceptableBacklogPerTask = 10 * 60 / 0.25 = 2400`. Therefore, each queue can hold up to 2400 messages before the service starts to scale up. For this, a target tracking policy will be attached to the scaling target for your service with target value `2400`. For more information, please refer: https://docs.aws.amazon.com/autoscaling/ec2/userguide/as-using-sqs-queue.html .
You can configure auto scaling based on SQS Queue for your service as follows:
```ts
nameDescription.add(new QueueExtension({
eventsQueue: myEventsQueue,
// Need to specify `scaleOnLatency` to configure auto scaling based on SQS Queue
scaleOnLatency: {
acceptableLatency: cdk.Duration.minutes(10),
messageProcessingTime: cdk.Duration.millis(250),
},
subscriptions: [new TopicSubscription({
topic: new sns.Topic(stack, 'my-topic'),
// `myTopicQueue` will subscribe to the `my-topic` instead of `eventsQueue`
topicSubscriptionQueue: {
queue: myTopicQueue,
// Optionally provide `scaleOnLatency` for configuring separate autoscaling for `myTopicQueue`
scaleOnLatency: {
acceptableLatency: cdk.Duration.minutes(10),
messageProcessingTime: cdk.Duration.millis(250),
}
},
}],
}));
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ export * from './cloudwatch-agent';
export * from './scale-on-cpu-utilization';
export * from './xray';
export * from './assign-public-ip';
export * from './queue';
export * from './queue/queue';
export * from './injecter';

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './queue';
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os
import boto3
from queue_backlog_calculator import QueueHandler

def queue_handler(event, context):
"""
Handler for the lambda trigger
"""

ecs = boto3.client('ecs')
sqs = boto3.client('sqs')

queue_handler = QueueHandler(ecs_client=ecs, sqs_client=sqs, environ=os.environ)

return queue_handler.emit()
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from math import ceil
import time
import json

class QueueHandler:
def __init__(self, ecs_client, sqs_client, environ):
self.ecs = ecs_client
self.sqs = sqs_client
self.cluster_name = environ['CLUSTER_NAME']
self.service_name = environ['SERVICE_NAME']
self.namespace = environ['NAMESPACE']
self.queue_names = environ['QUEUE_NAMES'].split(',')

def emit(self):
try:
running_count = self.get_running_task_count()
backlogs = [self.get_queue_backlog(queue_name, running_count) for queue_name in self.queue_names]
self.timestamp = int(time.time() * 1000)
for backlog in backlogs:
self.emit_backlog_per_task_metric(backlog['queueName'], backlog['backlogPerTask'])
except Exception as e:
Exception('Exception: {}'.format(e))

"""
Write the backlogPerTask metric to the stdout according to the Cloudwatch embedded metric format.
"""
def emit_backlog_per_task_metric(self, queue_name, backlog_per_task):
print(json.dumps({
"_aws": {
"Timestamp": self.timestamp,
"CloudWatchMetrics": [{
"Namespace": self.namespace,
"Dimensions": [["QueueName"]],
"Metrics": [{"Name":"BacklogPerTask", "Unit": "Count"}]
}],
},
"QueueName": queue_name,
"BacklogPerTask": backlog_per_task,
}))

"""
Get the number of tasks in the 'RUNNING' state for the service 'service_name'.
"""
def get_running_task_count(self):
service_desc = self.ecs.describe_services(
cluster=self.cluster_name,
services=[self.service_name],
)
if len(service_desc['services']) == 0:
raise Exception('There are no services with name {} in cluster: {}'.format(self.service_name, self.cluster_name))
return service_desc['services'][0].get('runningCount', 0)

"""
This method calculates and returns the backlogPerTask metric for the given queue.
"""
def get_queue_backlog(self, queue_name, count):
queue_url = self.sqs.get_queue_url(QueueName=queue_name)
running_count = 1 if count == 0 else count

def get_backlog_per_task():
queue_attributes = self.sqs.get_queue_attributes(
QueueUrl=queue_url['QueueUrl'],
AttributeNames=['ApproximateNumberOfMessages']
)
num_of_msgs = int(queue_attributes['Attributes'].get('ApproximateNumberOfMessages', 0))
return ceil(num_of_msgs/running_count)

return {
'queueName': queue_name,
'backlogPerTask': get_backlog_per_task()
}
Loading

0 comments on commit 6c5b30e

Please sign in to comment.