Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: maximum_concurrency option in on_sqs_message #2104

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions chalice/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,8 @@ def on_sns_message(self, topic: str,
def on_sqs_message(self, queue: Optional[str] = None, batch_size: int = 1,
name: Optional[str] = None,
queue_arn: Optional[str] = None,
maximum_batching_window_in_seconds: int = 0
maximum_batching_window_in_seconds: int = 0,
maximum_concurrency: Optional[int] = None,
) -> Callable[..., Any]:
return self._create_registration_function(
handler_type='on_sqs_message',
Expand All @@ -778,7 +779,8 @@ def on_sqs_message(self, queue: Optional[str] = None, batch_size: int = 1,
'queue_arn': queue_arn,
'batch_size': batch_size,
'maximum_batching_window_in_seconds':
maximum_batching_window_in_seconds
maximum_batching_window_in_seconds,
'maximum_concurrency': maximum_concurrency,
}
)

Expand Down Expand Up @@ -1101,6 +1103,8 @@ def _register_on_sqs_message(self, name: str,
batch_size=kwargs['batch_size'],
maximum_batching_window_in_seconds=kwargs[
'maximum_batching_window_in_seconds'],
maximum_concurrency=kwargs[
'maximum_concurrency'],
)
self.event_sources.append(sqs_config)

Expand Down Expand Up @@ -1625,13 +1629,15 @@ def __init__(self, name: str, handler_string: str, topic: str):
class SQSEventConfig(BaseEventSourceConfig):
def __init__(self, name: str, handler_string: str, queue: Optional[str],
queue_arn: Optional[str], batch_size: int,
maximum_batching_window_in_seconds: int):
maximum_batching_window_in_seconds: int,
maximum_concurrency: Optional[int]):
super(SQSEventConfig, self).__init__(name, handler_string)
self.queue: Optional[str] = queue
self.queue_arn: Optional[str] = queue_arn
self.batch_size: int = batch_size
self.maximum_batching_window_in_seconds: int = \
maximum_batching_window_in_seconds
self.maximum_concurrency: Optional[int] = maximum_concurrency


class KinesisEventConfig(BaseEventSourceConfig):
Expand Down
10 changes: 10 additions & 0 deletions chalice/awsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,7 @@ def create_lambda_event_source(
batch_size: int,
starting_position: Optional[str] = None,
maximum_batching_window_in_seconds: Optional[int] = 0,
maximum_concurrency: Optional[int] = None,
) -> None:
lambda_client = self._client('lambda')
batch_window = maximum_batching_window_in_seconds
Expand All @@ -1835,6 +1836,10 @@ def create_lambda_event_source(
'BatchSize': batch_size,
'MaximumBatchingWindowInSeconds': batch_window,
}
if maximum_concurrency:
kwargs['ScalingConfig'] = {
'MaximumConcurrency': maximum_concurrency
}
if starting_position is not None:
kwargs['StartingPosition'] = starting_position
return self._call_client_method_with_retries(
Expand All @@ -1848,6 +1853,7 @@ def update_lambda_event_source(
event_uuid: str,
batch_size: int,
maximum_batching_window_in_seconds: Optional[int] = 0,
maximum_concurrency: Optional[int] = None,
) -> None:
lambda_client = self._client('lambda')
batch_window = maximum_batching_window_in_seconds
Expand All @@ -1856,6 +1862,10 @@ def update_lambda_event_source(
'BatchSize': batch_size,
'MaximumBatchingWindowInSeconds': batch_window,
}
if maximum_concurrency:
kwargs['ScalingConfig'] = {
'MaximumConcurrency': maximum_concurrency
}
self._call_client_method_with_retries(
lambda_client.update_event_source_mapping,
kwargs,
Expand Down
1 change: 1 addition & 0 deletions chalice/deploy/appgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ def _create_sqs_subscription(
batch_size=sqs_config.batch_size,
lambda_function=lambda_function,
maximum_batching_window_in_seconds=batch_window,
maximum_concurrency=sqs_config.maximum_concurrency
)
return sqs_event_source

Expand Down
1 change: 1 addition & 0 deletions chalice/deploy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ class SQSEventSource(FunctionEventSubscriber):
queue: Union[str, QueueARN]
batch_size: int
maximum_batching_window_in_seconds: int
maximum_concurrency: Opt[int] = None


@dataclass
Expand Down
12 changes: 8 additions & 4 deletions chalice/deploy/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,13 @@ def _plan_sqseventsource(self, resource):
return instruction_for_queue_arn + [
models.APICall(
method_name='update_lambda_event_source',
params={'event_uuid': uuid,
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds}
params={
'event_uuid': uuid,
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'maximum_concurrency': resource.maximum_concurrency
}
)
] + self._batch_record_resource(
'sqs_event', resource.resource_name, {
Expand All @@ -751,6 +754,7 @@ def _plan_sqseventsource(self, resource):
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'maximum_concurrency': resource.maximum_concurrency,
'function_name': function_arn},
output_var=uuid_varname,
), 'Subscribing %s to SQS queue %s\n'
Expand Down
29 changes: 20 additions & 9 deletions chalice/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,15 +659,20 @@ def _generate_sqseventsource(self, resource, template):
'Fn::Sub': ('arn:${AWS::Partition}:sqs:${AWS::Region}'
':${AWS::AccountId}:%s' % resource.queue)
}
properties = {
'Queue': queue,
'BatchSize': resource.batch_size,
'MaximumBatchingWindowInSeconds':
resource.maximum_batching_window_in_seconds
}
if resource.maximum_concurrency:
properties["ScalingConfig"] = {
"MaximumConcurrency": resource.maximum_concurrency
}
function_cfn['Properties']['Events'] = {
sqs_cfn_name: {
'Type': 'SQS',
'Properties': {
'Queue': queue,
'BatchSize': resource.batch_size,
'MaximumBatchingWindowInSeconds':
resource.maximum_batching_window_in_seconds,
}
'Properties': properties
}
}

Expand Down Expand Up @@ -1123,14 +1128,20 @@ def _generate_sqseventsource(self, resource, template):
":%(account_id)s:%(queue)s",
queue=resource.queue
)
template['resource'].setdefault('aws_lambda_event_source_mapping', {})[
resource.resource_name] = {

aws_lambda_event_source_mapping = {
'event_source_arn': event_source_arn,
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'function_name': self._fref(resource.lambda_function)
'function_name': self._fref(resource.lambda_function),
}
if resource.maximum_concurrency:
aws_lambda_event_source_mapping["scaling_config"] = {
"maximum_concurrency": resource.maximum_concurrency
}
template['resource'].setdefault('aws_lambda_event_source_mapping', {})[
resource.resource_name] = aws_lambda_event_source_mapping

def _generate_kinesiseventsource(self, resource, template):
# type: (models.KinesisEventSource, Dict[str, Any]) -> None
Expand Down
5 changes: 4 additions & 1 deletion docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ Chalice
entire lambda function name. This parameter is optional. If it is
not provided, the name of the python function will be used.

.. method:: on_sqs_message(queue, batch_size=1, name=None, queue_arn=None, maximum_batching_window_in_seconds=0)
.. method:: on_sqs_message(queue, batch_size=1, name=None, queue_arn=None, maximum_batching_window_in_seconds=0, maximum_concurrency=None)

Create a lambda function and configure it to be automatically invoked
whenever a message is published to the specified SQS queue.
Expand Down Expand Up @@ -337,6 +337,9 @@ Chalice
:param maximum_batching_window_in_seconds: The maximum amount of time,
in seconds, to gather records before invoking the function.

:param maximum_concurrency: The maximum number of concurrent functions
that the event source can invoke.

.. method:: on_kinesis_record(stream, batch_size=100, starting_position='LATEST', name=None, maximum_batching_window_in_seconds=0)

Create a lambda function and configure it to be automatically invoked
Expand Down
81 changes: 79 additions & 2 deletions tests/unit/deploy/test_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,8 @@ def test_can_plan_sqs_event_source(self):
),
'batch_size': 10,
'maximum_batching_window_in_seconds': 60,
'function_name': Variable("function_name_lambda_arn")
'function_name': Variable("function_name_lambda_arn"),
'maximum_concurrency': None
},
output_var='function_name-sqs-event-source_uuid'
)
Expand Down Expand Up @@ -1819,7 +1820,8 @@ def test_sqs_event_supports_queue_arn(self):
),
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'function_name': Variable("function_name_lambda_arn")
'function_name': Variable("function_name_lambda_arn"),
'maximum_concurrency': None,
},
output_var='function_name-sqs-event-source_uuid'
)
Expand Down Expand Up @@ -1863,6 +1865,7 @@ def test_can_update_sqs_event_with_queue_arn(self):
'event_uuid': 'my-uuid',
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'maximum_concurrency': None,
},
)
self.assert_recorded_values(
Expand Down Expand Up @@ -1925,6 +1928,80 @@ def test_sqs_event_source_exists_updates_batch_size(self):
'event_uuid': 'my-uuid',
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'maximum_concurrency': None,
},
)
self.assert_recorded_values(
plan, 'sqs_event', 'function_name-sqs-event-source', {
'queue_arn': 'arn:sqs:myqueue',
'event_uuid': 'my-uuid',
'queue': 'myqueue',
'lambda_arn': 'arn:lambda'
}
)

def test_sqs_event_supports_maximum_concurrency(self):
function = create_function_resource('function_name')
sqs_event_source = models.SQSEventSource(
resource_name='function_name-sqs-event-source',
queue=models.QueueARN(arn='arn:us-west-2:myqueue'),
batch_size=10,
lambda_function=function,
maximum_batching_window_in_seconds=0,
maximum_concurrency=2
)
plan = self.determine_plan(sqs_event_source)
assert plan[1] == models.APICall(
method_name='create_lambda_event_source',
params={
'event_source_arn': Variable(
"function_name-sqs-event-source_queue_arn"
),
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'function_name': Variable("function_name_lambda_arn"),
'maximum_concurrency': 2,
},
output_var='function_name-sqs-event-source_uuid'
)
self.assert_recorded_values(
plan, 'sqs_event', 'function_name-sqs-event-source', {
'queue_arn': Variable(
'function_name-sqs-event-source_queue_arn'),
'event_uuid': Variable(
'function_name-sqs-event-source_uuid'),
'queue': 'myqueue',
'lambda_arn': Variable(
'function_name_lambda_arn')
}
)

def test_sqs_event_source_exists_updates_maximum_concurrency(self):
function = create_function_resource('function_name')
sqs_event_source = models.SQSEventSource(
resource_name='function_name-sqs-event-source',
queue='myqueue',
batch_size=10,
lambda_function=function,
maximum_batching_window_in_seconds=0,
maximum_concurrency=2
)
self.remote_state.declare_resource_exists(
sqs_event_source,
queue='myqueue',
queue_arn='arn:sqs:myqueue',
resource_type='sqs_event',
lambda_arn='arn:lambda',
event_uuid='my-uuid',
)
plan = self.determine_plan(sqs_event_source)
assert plan[5] == models.APICall(
method_name='update_lambda_event_source',
params={
'event_uuid': 'my-uuid',
'batch_size': 10,
'maximum_batching_window_in_seconds': 0,
'maximum_concurrency': 2,
},
)
self.assert_recorded_values(
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/test_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,34 @@ def handler(event):
'maximum_batching_window_in_seconds': 0
}

def test_can_package_sqs_handler_with_max_concurrency(self, sample_app):
@sample_app.on_sqs_message(
queue='foo',
batch_size=5,
maximum_concurrency=2
)
def handler(event):
pass

config = Config.create(chalice_app=sample_app,
project_dir='.',
app_name='sample_app',
api_gateway_stage='api')
template = self.generate_template(config)

assert template['resource'][
'aws_lambda_event_source_mapping'][
'handler-sqs-event-source'] == {
'event_source_arn': (
'arn:${data.aws_partition.chalice.partition}:sqs'
':${data.aws_region.chalice.name}:'
'${data.aws_caller_identity.chalice.account_id}:foo'),
'function_name': '${aws_lambda_function.handler.arn}',
'batch_size': 5,
'maximum_batching_window_in_seconds': 0,
'scaling_config': {'maximum_concurrency': 2}
}

def test_sqs_arn_does_not_use_fn_sub(self, sample_app):
@sample_app.on_sqs_message(queue_arn='arn:foo:bar', batch_size=5)
def handler(event):
Expand Down Expand Up @@ -1736,6 +1764,37 @@ def handler(event):
}
}

def test_can_package_sqs_handler_with_max_concurrency(self, sample_app):
@sample_app.on_sqs_message(
queue='foo',
batch_size=5,
maximum_concurrency=2
)
def handler(event):
pass

config = Config.create(chalice_app=sample_app,
project_dir='.',
api_gateway_stage='api')
template = self.generate_template(config)
sns_handler = template['Resources']['Handler']
assert sns_handler['Properties']['Events'] == {
'HandlerSqsEventSource': {
'Type': 'SQS',
'Properties': {
'Queue': {
'Fn::Sub': (
'arn:${AWS::Partition}:sqs:${AWS::Region}'
':${AWS::AccountId}:foo'
)
},
'BatchSize': 5,
'MaximumBatchingWindowInSeconds': 0,
'ScalingConfig': {'MaximumConcurrency': 2}
},
}
}

def test_sqs_arn_does_not_use_fn_sub(self, sample_app):
@sample_app.on_sqs_message(queue_arn='arn:foo:bar', batch_size=5)
def handler(event):
Expand Down
Loading