Skip to content

Commit

Permalink
feat: sqs event source mapping - maximum_concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
adityamohta committed May 17, 2024
1 parent 5106c17 commit 67e604b
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 19 deletions.
12 changes: 9 additions & 3 deletions chalice/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,8 @@ def on_sns_message(self, topic: str,
def on_sqs_message(self, queue: Optional[str] = None, batch_size: int = 1,
name: Optional[str] = None,
queue_arn: Optional[str] = None,
maximum_batching_window_in_seconds: int = 0
maximum_batching_window_in_seconds: int = 0,
maximum_concurrency: Optional[int] = None,
) -> Callable[..., Any]:
return self._create_registration_function(
handler_type='on_sqs_message',
Expand All @@ -778,7 +779,8 @@ def on_sqs_message(self, queue: Optional[str] = None, batch_size: int = 1,
'queue_arn': queue_arn,
'batch_size': batch_size,
'maximum_batching_window_in_seconds':
maximum_batching_window_in_seconds
maximum_batching_window_in_seconds,
'maximum_concurrency': maximum_concurrency,
}
)

Expand Down Expand Up @@ -1101,6 +1103,8 @@ def _register_on_sqs_message(self, name: str,
batch_size=kwargs['batch_size'],
maximum_batching_window_in_seconds=kwargs[
'maximum_batching_window_in_seconds'],
maximum_concurrency=kwargs[
'maximum_concurrency'],
)
self.event_sources.append(sqs_config)

Expand Down Expand Up @@ -1625,13 +1629,15 @@ def __init__(self, name: str, handler_string: str, topic: str):
class SQSEventConfig(BaseEventSourceConfig):
def __init__(self, name: str, handler_string: str, queue: Optional[str],
queue_arn: Optional[str], batch_size: int,
maximum_batching_window_in_seconds: int):
maximum_batching_window_in_seconds: int,
maximum_concurrency: Optional[int]):
super(SQSEventConfig, self).__init__(name, handler_string)
self.queue: Optional[str] = queue
self.queue_arn: Optional[str] = queue_arn
self.batch_size: int = batch_size
self.maximum_batching_window_in_seconds: int = \
maximum_batching_window_in_seconds
self.maximum_concurrency: Optional[int] = maximum_concurrency


class KinesisEventConfig(BaseEventSourceConfig):
Expand Down
10 changes: 10 additions & 0 deletions chalice/awsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,7 @@ def create_lambda_event_source(
batch_size: int,
starting_position: Optional[str] = None,
maximum_batching_window_in_seconds: Optional[int] = 0,
maximum_concurrency: Optional[int] = None,
) -> None:
lambda_client = self._client('lambda')
batch_window = maximum_batching_window_in_seconds
Expand All @@ -1835,6 +1836,10 @@ def create_lambda_event_source(
'BatchSize': batch_size,
'MaximumBatchingWindowInSeconds': batch_window,
}
if maximum_concurrency:
kwargs['ScalingConfig'] = {
'MaximumConcurrency': maximum_concurrency
}
if starting_position is not None:
kwargs['StartingPosition'] = starting_position
return self._call_client_method_with_retries(
Expand All @@ -1848,6 +1853,7 @@ def update_lambda_event_source(
event_uuid: str,
batch_size: int,
maximum_batching_window_in_seconds: Optional[int] = 0,
maximum_concurrency: Optional[int] = None,
) -> None:
lambda_client = self._client('lambda')
batch_window = maximum_batching_window_in_seconds
Expand All @@ -1856,6 +1862,10 @@ def update_lambda_event_source(
'BatchSize': batch_size,
'MaximumBatchingWindowInSeconds': batch_window,
}
if maximum_concurrency:
kwargs['ScalingConfig'] = {
'MaximumConcurrency': maximum_concurrency
}
self._call_client_method_with_retries(
lambda_client.update_event_source_mapping,
kwargs,
Expand Down
1 change: 1 addition & 0 deletions chalice/deploy/appgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ def _create_sqs_subscription(
batch_size=sqs_config.batch_size,
lambda_function=lambda_function,
maximum_batching_window_in_seconds=batch_window,
maximum_concurrency=sqs_config.maximum_concurrency
)
return sqs_event_source

Expand Down
1 change: 1 addition & 0 deletions chalice/deploy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ class SQSEventSource(FunctionEventSubscriber):
queue: Union[str, QueueARN]
batch_size: int
maximum_batching_window_in_seconds: int
maximum_concurrency: Opt[int] = None


@dataclass
Expand Down
12 changes: 8 additions & 4 deletions chalice/deploy/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,13 @@ def _plan_sqseventsource(self, resource):
return instruction_for_queue_arn + [
models.APICall(
method_name='update_lambda_event_source',
params={'event_uuid': uuid,
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds}
params={
'event_uuid': uuid,
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'maximum_concurrency': resource.maximum_concurrency
}
)
] + self._batch_record_resource(
'sqs_event', resource.resource_name, {
Expand All @@ -751,6 +754,7 @@ def _plan_sqseventsource(self, resource):
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'maximum_concurrency': resource.maximum_concurrency,
'function_name': function_arn},
output_var=uuid_varname,
), 'Subscribing %s to SQS queue %s\n'
Expand Down
29 changes: 20 additions & 9 deletions chalice/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,15 +659,20 @@ def _generate_sqseventsource(self, resource, template):
'Fn::Sub': ('arn:${AWS::Partition}:sqs:${AWS::Region}'
':${AWS::AccountId}:%s' % resource.queue)
}
properties = {
'Queue': queue,
'BatchSize': resource.batch_size,
'MaximumBatchingWindowInSeconds':
resource.maximum_batching_window_in_seconds
}
if resource.maximum_concurrency:
properties["ScalingConfig"] = {
"MaximumConcurrency": resource.maximum_concurrency
}
function_cfn['Properties']['Events'] = {
sqs_cfn_name: {
'Type': 'SQS',
'Properties': {
'Queue': queue,
'BatchSize': resource.batch_size,
'MaximumBatchingWindowInSeconds':
resource.maximum_batching_window_in_seconds,
}
'Properties': properties
}
}

Expand Down Expand Up @@ -1123,14 +1128,20 @@ def _generate_sqseventsource(self, resource, template):
":%(account_id)s:%(queue)s",
queue=resource.queue
)
template['resource'].setdefault('aws_lambda_event_source_mapping', {})[
resource.resource_name] = {

aws_lambda_event_source_mapping = {
'event_source_arn': event_source_arn,
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'function_name': self._fref(resource.lambda_function)
'function_name': self._fref(resource.lambda_function),
}
if resource.maximum_concurrency:
aws_lambda_event_source_mapping["scaling_config"] = {
"maximum_concurrency": resource.maximum_concurrency
}
template['resource'].setdefault('aws_lambda_event_source_mapping', {})[
resource.resource_name] = aws_lambda_event_source_mapping

def _generate_kinesiseventsource(self, resource, template):
# type: (models.KinesisEventSource, Dict[str, Any]) -> None
Expand Down
5 changes: 4 additions & 1 deletion docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ Chalice
entire lambda function name. This parameter is optional. If it is
not provided, the name of the python function will be used.

.. method:: on_sqs_message(queue, batch_size=1, name=None, queue_arn=None, maximum_batching_window_in_seconds=0)
.. method:: on_sqs_message(queue, batch_size=1, name=None, queue_arn=None, maximum_batching_window_in_seconds=0, maximum_concurrency=None)

Create a lambda function and configure it to be automatically invoked
whenever a message is published to the specified SQS queue.
Expand Down Expand Up @@ -337,6 +337,9 @@ Chalice
:param maximum_batching_window_in_seconds: The maximum amount of time,
in seconds, to gather records before invoking the function.

:param maximum_concurrency: The maximum number of concurrent functions
that the event source can invoke.

.. method:: on_kinesis_record(stream, batch_size=100, starting_position='LATEST', name=None, maximum_batching_window_in_seconds=0)

Create a lambda function and configure it to be automatically invoked
Expand Down
81 changes: 79 additions & 2 deletions tests/unit/deploy/test_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,8 @@ def test_can_plan_sqs_event_source(self):
),
'batch_size': 10,
'maximum_batching_window_in_seconds': 60,
'function_name': Variable("function_name_lambda_arn")
'function_name': Variable("function_name_lambda_arn"),
'maximum_concurrency': None
},
output_var='function_name-sqs-event-source_uuid'
)
Expand Down Expand Up @@ -1819,7 +1820,8 @@ def test_sqs_event_supports_queue_arn(self):
),
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'function_name': Variable("function_name_lambda_arn")
'function_name': Variable("function_name_lambda_arn"),
'maximum_concurrency': None,
},
output_var='function_name-sqs-event-source_uuid'
)
Expand Down Expand Up @@ -1863,6 +1865,7 @@ def test_can_update_sqs_event_with_queue_arn(self):
'event_uuid': 'my-uuid',
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'maximum_concurrency': None,
},
)
self.assert_recorded_values(
Expand Down Expand Up @@ -1925,6 +1928,80 @@ def test_sqs_event_source_exists_updates_batch_size(self):
'event_uuid': 'my-uuid',
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'maximum_concurrency': None,
},
)
self.assert_recorded_values(
plan, 'sqs_event', 'function_name-sqs-event-source', {
'queue_arn': 'arn:sqs:myqueue',
'event_uuid': 'my-uuid',
'queue': 'myqueue',
'lambda_arn': 'arn:lambda'
}
)

def test_sqs_event_supports_maximum_concurrency(self):
function = create_function_resource('function_name')
sqs_event_source = models.SQSEventSource(
resource_name='function_name-sqs-event-source',
queue=models.QueueARN(arn='arn:us-west-2:myqueue'),
batch_size=10,
lambda_function=function,
maximum_batching_window_in_seconds=0,
maximum_concurrency=2
)
plan = self.determine_plan(sqs_event_source)
assert plan[1] == models.APICall(
method_name='create_lambda_event_source',
params={
'event_source_arn': Variable(
"function_name-sqs-event-source_queue_arn"
),
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'function_name': Variable("function_name_lambda_arn"),
'maximum_concurrency': 2,
},
output_var='function_name-sqs-event-source_uuid'
)
self.assert_recorded_values(
plan, 'sqs_event', 'function_name-sqs-event-source', {
'queue_arn': Variable(
'function_name-sqs-event-source_queue_arn'),
'event_uuid': Variable(
'function_name-sqs-event-source_uuid'),
'queue': 'myqueue',
'lambda_arn': Variable(
'function_name_lambda_arn')
}
)

def test_sqs_event_source_exists_updates_maximum_concurrency(self):
function = create_function_resource('function_name')
sqs_event_source = models.SQSEventSource(
resource_name='function_name-sqs-event-source',
queue='myqueue',
batch_size=10,
lambda_function=function,
maximum_batching_window_in_seconds=0,
maximum_concurrency=2
)
self.remote_state.declare_resource_exists(
sqs_event_source,
queue='myqueue',
queue_arn='arn:sqs:myqueue',
resource_type='sqs_event',
lambda_arn='arn:lambda',
event_uuid='my-uuid',
)
plan = self.determine_plan(sqs_event_source)
assert plan[5] == models.APICall(
method_name='update_lambda_event_source',
params={
'event_uuid': 'my-uuid',
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'maximum_concurrency': 2,
},
)
self.assert_recorded_values(
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/test_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,34 @@ def handler(event):
'maximum_batching_window_in_seconds': 0
}

def test_can_package_sqs_handler_with_max_concurrency(self, sample_app):
@sample_app.on_sqs_message(
queue='foo',
batch_size=5,
maximum_concurrency=2
)
def handler(event):
pass

config = Config.create(chalice_app=sample_app,
project_dir='.',
app_name='sample_app',
api_gateway_stage='api')
template = self.generate_template(config)

assert template['resource'][
'aws_lambda_event_source_mapping'][
'handler-sqs-event-source'] == {
'event_source_arn': (
'arn:${data.aws_partition.chalice.partition}:sqs'
':${data.aws_region.chalice.name}:'
'${data.aws_caller_identity.chalice.account_id}:foo'),
'function_name': '${aws_lambda_function.handler.arn}',
'batch_size': 5,
'maximum_batching_window_in_seconds': 0,
'scaling_config': {'maximum_concurrency': 2}
}

def test_sqs_arn_does_not_use_fn_sub(self, sample_app):
@sample_app.on_sqs_message(queue_arn='arn:foo:bar', batch_size=5)
def handler(event):
Expand Down Expand Up @@ -1736,6 +1764,37 @@ def handler(event):
}
}

def test_can_package_sqs_handler_with_max_concurrency(self, sample_app):
@sample_app.on_sqs_message(
queue='foo',
batch_size=5,
maximum_concurrency=2
)
def handler(event):
pass

config = Config.create(chalice_app=sample_app,
project_dir='.',
api_gateway_stage='api')
template = self.generate_template(config)
sns_handler = template['Resources']['Handler']
assert sns_handler['Properties']['Events'] == {
'HandlerSqsEventSource': {
'Type': 'SQS',
'Properties': {
'Queue': {
'Fn::Sub': (
'arn:${AWS::Partition}:sqs:${AWS::Region}'
':${AWS::AccountId}:foo'
)
},
'BatchSize': 5,
'MaximumBatchingWindowInSeconds': 0,
'ScalingConfig': {'MaximumConcurrency': 2}
},
}
}

def test_sqs_arn_does_not_use_fn_sub(self, sample_app):
@sample_app.on_sqs_message(queue_arn='arn:foo:bar', batch_size=5)
def handler(event):
Expand Down

0 comments on commit 67e604b

Please sign in to comment.