From 872356e09480d8474decf9c4855f0bc7f5a1d1ea Mon Sep 17 00:00:00 2001 From: Bjorn Olsen Date: Thu, 8 Jul 2021 10:41:13 +0200 Subject: [PATCH 1/6] Improve AWS SQS Sensor (#16880) --- airflow/providers/amazon/aws/sensors/sqs.py | 96 +++++++++++++++---- setup.cfg | 1 + .../providers/amazon/aws/sensors/test_sqs.py | 87 +++++++++++++++++ 3 files changed, 163 insertions(+), 21 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index dc6217b5bd0a3..f4a51355d56f4 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -16,7 +16,10 @@ # specific language governing permissions and limitations # under the License. """Reads and then deletes the message from SQS queue""" -from typing import Optional +import json +from typing import Any, Optional + +from jsonpath_ng import parse from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.sqs import SQSHook @@ -37,6 +40,17 @@ class SQSSensor(BaseSensorOperator): :type max_messages: int :param wait_time_seconds: The time in seconds to wait for receiving messages (default: 1 second) :type wait_time_seconds: int + :param visibility_timeout: Visibility timeout, a period of time during which + Amazon SQS prevents other consumers from receiving and processing the message. + :type visibility_timeout: Optional[Int] + :param message_filtering: Specified how received messages should be filtered. Supported options are: + `None` (no filtering, default) or `'jsonpath'` (message Body filtered using a JSONPath expression). + You may add further methods by overriding the relevant class methods. + :type message_filtering: Optional[str] + :param message_filtering_config: Additional configuration to pass to the message filter. + For example with JSONPath filtering you can pass a JSONPath expression string here, + such as `'foo[*].baz'`. Messages with a Body which does not match are ignored. + :type message_filtering_config: Optional[str] """ template_fields = ('sqs_queue', 'max_messages') @@ -48,6 +62,9 @@ def __init__( aws_conn_id: str = 'aws_default', max_messages: int = 5, wait_time_seconds: int = 1, + visibility_timeout: Optional[int] = None, + message_filtering: Optional[str] = None, + message_filtering_config: Optional[Any] = None, **kwargs, ): super().__init__(**kwargs) @@ -55,6 +72,9 @@ def __init__( self.aws_conn_id = aws_conn_id self.max_messages = max_messages self.wait_time_seconds = wait_time_seconds + self.visibility_timeout = visibility_timeout + self.message_filtering = message_filtering + self.message_filtering_config = message_filtering_config self.hook: Optional[SQSHook] = None def poke(self, context): @@ -69,31 +89,48 @@ def poke(self, context): self.log.info('SQSSensor checking for message on queue: %s', self.sqs_queue) - messages = sqs_conn.receive_message( - QueueUrl=self.sqs_queue, - MaxNumberOfMessages=self.max_messages, - WaitTimeSeconds=self.wait_time_seconds, - ) + receive_message_kwargs = { + 'QueueUrl': self.sqs_queue, + 'MaxNumberOfMessages': self.max_messages, + 'WaitTimeSeconds': self.wait_time_seconds, + } + if self.visibility_timeout is not None: + receive_message_kwargs['VisibilityTimeout'] = self.visibility_timeout + + response = sqs_conn.receive_message(**receive_message_kwargs) + + if "Messages" not in response: + return False - self.log.info("received message %s", str(messages)) + messages = response['Messages'] + num_messages = len(messages) + self.log.info("received %s messages", str(num_messages)) - if 'Messages' in messages and messages['Messages']: - entries = [ - {'Id': message['MessageId'], 'ReceiptHandle': message['ReceiptHandle']} - for message in messages['Messages'] - ] + if num_messages == 0: + return False - result = sqs_conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries) + if self.message_filtering: + messages = self.filter_messages(messages) + num_messages = len(messages) + self.log.info("filtered %s messages", str(num_messages)) - if 'Successful' in result: - context['ti'].xcom_push(key='messages', value=messages) - return True - else: - raise AirflowException( - 'Delete SQS Messages failed ' + str(result) + ' for messages ' + str(messages) - ) + if num_messages == 0: + return False - return False + self.log.info("deleting %s messages", str(num_messages)) + + entries = [ + {'Id': message['MessageId'], 'ReceiptHandle': message['ReceiptHandle']} for message in messages + ] + response = sqs_conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries) + + if 'Successful' in response: + context['ti'].xcom_push(key='messages', value=messages) + return True + else: + raise AirflowException( + 'Delete SQS Messages failed ' + str(response) + ' for messages ' + str(messages) + ) def get_hook(self) -> SQSHook: """Create and return an SQSHook""" @@ -102,3 +139,20 @@ def get_hook(self) -> SQSHook: self.hook = SQSHook(aws_conn_id=self.aws_conn_id) return self.hook + + def filter_messages(self, messages): + if self.message_filtering == 'jsonpath': + return self.filter_messages_jsonpath(messages) + else: + raise NotImplementedError('Override this method to define custom filters') + + def filter_messages_jsonpath(self, messages): + jsonpath_expr = parse(self.message_filtering_config) + filtered_messages = [] + for message in messages: + body = message['Body'] + # Body is a string, deserialise to an object and then parse + body = json.loads(body) + if jsonpath_expr.find(body): + filtered_messages.append(message) + return filtered_messages diff --git a/setup.cfg b/setup.cfg index d3c5f574c0b7d..318dfa1215ef6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -115,6 +115,7 @@ install_requires = # Logging is broken with itsdangerous > 2 itsdangerous>=1.1.0, <2.0 jinja2>=2.10.1,<4 + jsonpath_ng>=1.5.3 jsonschema~=3.0 lazy-object-proxy lockfile>=0.12.2 diff --git a/tests/providers/amazon/aws/sensors/test_sqs.py b/tests/providers/amazon/aws/sensors/test_sqs.py index 90349a321c1e9..837bbf73ff1c8 100644 --- a/tests/providers/amazon/aws/sensors/test_sqs.py +++ b/tests/providers/amazon/aws/sensors/test_sqs.py @@ -17,6 +17,7 @@ # under the License. +import json import unittest from unittest import mock @@ -107,3 +108,89 @@ def test_poke_receive_raise_exception(self, mock_conn): self.sensor.poke(self.mock_context) assert 'test exception' in ctx.value.args[0] + + @mock.patch.object(SQSHook, 'get_conn') + def test_poke_visibility_timeout(self, mock_conn): + # Check without visibility_timeout parameter + self.sqs_hook.create_queue('test') + self.sqs_hook.send_message(queue_url='test', message_body='hello') + + self.sensor.poke(self.mock_context) + + calls_receive_message = [ + mock.call().receive_message(QueueUrl='test', MaxNumberOfMessages=5, WaitTimeSeconds=1) + ] + mock_conn.assert_has_calls(calls_receive_message) + # Check with visibility_timeout parameter + self.sensor = SQSSensor( + task_id='test_task2', + dag=self.dag, + sqs_queue='test', + aws_conn_id='aws_default', + visibility_timeout=42, + ) + self.sensor.poke(self.mock_context) + + calls_receive_message = [ + mock.call().receive_message( + QueueUrl='test', MaxNumberOfMessages=5, WaitTimeSeconds=1, VisibilityTimeout=42 + ) + ] + mock_conn.assert_has_calls(calls_receive_message) + + @mock_sqs + def test_poke_message_invalid_filtering(self): + self.sqs_hook.create_queue('test') + self.sqs_hook.send_message(queue_url='test', message_body='hello') + sensor = SQSSensor( + task_id='test_task2', + dag=self.dag, + sqs_queue='test', + aws_conn_id='aws_default', + message_filtering='invalid_option', + ) + with pytest.raises(NotImplementedError) as ctx: + sensor.poke(self.mock_context) + assert 'Override this method to define custom filters' in ctx.value.args[0] + + @mock.patch.object(SQSHook, "get_conn") + def test_poke_message_filtering_jsonpath(self, mock_conn): + self.sqs_hook.create_queue('test') + matching = [ + {"id": 11, "key": {"matches": [1, 2]}}, + {"id": 12, "key": {"matches": [3, 4, 5]}}, + {"id": 13, "key": {"matches": [10]}}, + ] + non_matching = [ + {"id": 14, "key": {"nope": [5, 6]}}, + {"id": 15, "key": {"nope": [7, 8]}}, + ] + all = matching + non_matching + + def mock_receive_message(**kwargs): + messages = [] + for body in all: + messages.append( + {'MessageId': body['id'], 'ReceiptHandle': 100 + body['id'], 'Body': json.dumps(body)} + ) + return {'Messages': messages} + + mock_conn.return_value.receive_message.side_effect = mock_receive_message + + def mock_delete_message_batch(**kwargs): + return {'Successful'} + + mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch + + # Test that messages are filtered + self.sensor.message_filtering = 'jsonpath' + self.sensor.message_filtering_config = 'key.matches[*]' + result = self.sensor.poke(self.mock_context) + assert result + + # Test that only filtered messages are deleted + delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching] + calls_delete_message_batch = [ + mock.call().delete_message_batch(QueueUrl='test', Entries=delete_entries) + ] + mock_conn.assert_has_calls(calls_delete_message_batch) From 786cc73d27199577b932bcb62d32d9ce728660b7 Mon Sep 17 00:00:00 2001 From: Bjorn Olsen Date: Mon, 12 Jul 2021 07:52:46 +0200 Subject: [PATCH 2/6] address review comments --- setup.cfg | 1 - setup.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 318dfa1215ef6..d3c5f574c0b7d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -115,7 +115,6 @@ install_requires = # Logging is broken with itsdangerous > 2 itsdangerous>=1.1.0, <2.0 jinja2>=2.10.1,<4 - jsonpath_ng>=1.5.3 jsonschema~=3.0 lazy-object-proxy lockfile>=0.12.2 diff --git a/setup.py b/setup.py index f29c9a2b14af7..eb816cf29127f 100644 --- a/setup.py +++ b/setup.py @@ -182,6 +182,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version amazon = [ 'boto3>=1.15.0,<1.18.0', 'watchtower~=1.0.6', + 'jsonpath_ng>=1.5.3', ] apache_beam = [ 'apache-beam>=2.20.0', From 9f71b319cbc1dfc6edeb7d7bc4c9be53e668a297 Mon Sep 17 00:00:00 2001 From: Bjorn Olsen Date: Mon, 12 Jul 2021 07:58:08 +0200 Subject: [PATCH 3/6] made message_filtering_config templatable --- airflow/providers/amazon/aws/sensors/sqs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index f4a51355d56f4..85cfeb0001730 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -53,7 +53,7 @@ class SQSSensor(BaseSensorOperator): :type message_filtering_config: Optional[str] """ - template_fields = ('sqs_queue', 'max_messages') + template_fields = ('sqs_queue', 'max_messages', 'message_filtering_config') def __init__( self, From 75a5b4fa09dbbba145082bae55f0d38d0d88594d Mon Sep 17 00:00:00 2001 From: Bjorn Olsen Date: Mon, 12 Jul 2021 09:09:45 +0200 Subject: [PATCH 4/6] add value matching --- airflow/providers/amazon/aws/sensors/sqs.py | 38 +++++++- .../providers/amazon/aws/sensors/test_sqs.py | 95 ++++++++++++++++++- 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index 85cfeb0001730..11734aa814539 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -44,13 +44,19 @@ class SQSSensor(BaseSensorOperator): Amazon SQS prevents other consumers from receiving and processing the message. :type visibility_timeout: Optional[Int] :param message_filtering: Specified how received messages should be filtered. Supported options are: - `None` (no filtering, default) or `'jsonpath'` (message Body filtered using a JSONPath expression). + `None` (no filtering, default), `'literal'` (message Body literal match) or `'jsonpath'` + (message Body filtered using a JSONPath expression). You may add further methods by overriding the relevant class methods. :type message_filtering: Optional[str] + :param message_filtering_match_values: Optional value/s for the message filter to match on. + For example, with literal matching, if a message body matches any of the specified values + then it is included. For JSONPath matching, the result of the JSONPath expression is used + and may match any of the specified values. + :type message_filtering_match_values: Optional[Any] :param message_filtering_config: Additional configuration to pass to the message filter. For example with JSONPath filtering you can pass a JSONPath expression string here, such as `'foo[*].baz'`. Messages with a Body which does not match are ignored. - :type message_filtering_config: Optional[str] + :type message_filtering_config: Optional[Any] """ template_fields = ('sqs_queue', 'max_messages', 'message_filtering_config') @@ -64,6 +70,7 @@ def __init__( wait_time_seconds: int = 1, visibility_timeout: Optional[int] = None, message_filtering: Optional[str] = None, + message_filtering_match_values: Optional[Any] = None, message_filtering_config: Optional[Any] = None, **kwargs, ): @@ -73,8 +80,14 @@ def __init__( self.max_messages = max_messages self.wait_time_seconds = wait_time_seconds self.visibility_timeout = visibility_timeout + self.message_filtering = message_filtering + if message_filtering_match_values is not None: + if not isinstance(message_filtering_match_values, list): + message_filtering_match_values = [message_filtering_match_values] + self.message_filtering_match_values = message_filtering_match_values self.message_filtering_config = message_filtering_config + self.hook: Optional[SQSHook] = None def poke(self, context): @@ -141,11 +154,22 @@ def get_hook(self) -> SQSHook: return self.hook def filter_messages(self, messages): + if self.message_filtering == 'literal': + return self.filter_messages_literal(messages) if self.message_filtering == 'jsonpath': return self.filter_messages_jsonpath(messages) else: raise NotImplementedError('Override this method to define custom filters') + def filter_messages_literal(self, messages): + filtered_messages = [] + if self.message_filtering_match_values is None: + raise Exception('message_filtering_match_values must be specified for literal matching') + for message in messages: + if message['Body'] in self.message_filtering_match_values: + filtered_messages.append(message) + return filtered_messages + def filter_messages_jsonpath(self, messages): jsonpath_expr = parse(self.message_filtering_config) filtered_messages = [] @@ -153,6 +177,14 @@ def filter_messages_jsonpath(self, messages): body = message['Body'] # Body is a string, deserialise to an object and then parse body = json.loads(body) - if jsonpath_expr.find(body): + results = jsonpath_expr.find(body) + if not results: + continue + if self.message_filtering_match_values is None: filtered_messages.append(message) + continue + for result in results: + if result.value in self.message_filtering_match_values: + filtered_messages.append(message) + break return filtered_messages diff --git a/tests/providers/amazon/aws/sensors/test_sqs.py b/tests/providers/amazon/aws/sensors/test_sqs.py index 837bbf73ff1c8..82a1aacb2a4a2 100644 --- a/tests/providers/amazon/aws/sensors/test_sqs.py +++ b/tests/providers/amazon/aws/sensors/test_sqs.py @@ -153,6 +153,45 @@ def test_poke_message_invalid_filtering(self): sensor.poke(self.mock_context) assert 'Override this method to define custom filters' in ctx.value.args[0] + @mock.patch.object(SQSHook, "get_conn") + def test_poke_message_filtering_literal_values(self, mock_conn): + self.sqs_hook.create_queue('test') + matching = [{"id": 11, "body": "a matching message"}] + non_matching = [{"id": 12, "body": "a non-matching message"}] + all = matching + non_matching + + def mock_receive_message(**kwargs): + messages = [] + for message in all: + messages.append( + { + 'MessageId': message['id'], + 'ReceiptHandle': 100 + message['id'], + 'Body': message['body'], + } + ) + return {'Messages': messages} + + mock_conn.return_value.receive_message.side_effect = mock_receive_message + + def mock_delete_message_batch(**kwargs): + return {'Successful'} + + mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch + + # Test that messages are filtered + self.sensor.message_filtering = 'literal' + self.sensor.message_filtering_match_values = ["a matching message"] + result = self.sensor.poke(self.mock_context) + assert result + + # Test that only filtered messages are deleted + delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching] + calls_delete_message_batch = [ + mock.call().delete_message_batch(QueueUrl='test', Entries=delete_entries) + ] + mock_conn.assert_has_calls(calls_delete_message_batch) + @mock.patch.object(SQSHook, "get_conn") def test_poke_message_filtering_jsonpath(self, mock_conn): self.sqs_hook.create_queue('test') @@ -169,9 +208,60 @@ def test_poke_message_filtering_jsonpath(self, mock_conn): def mock_receive_message(**kwargs): messages = [] - for body in all: + for message in all: + messages.append( + { + 'MessageId': message['id'], + 'ReceiptHandle': 100 + message['id'], + 'Body': json.dumps(message), + } + ) + return {'Messages': messages} + + mock_conn.return_value.receive_message.side_effect = mock_receive_message + + def mock_delete_message_batch(**kwargs): + return {'Successful'} + + mock_conn.return_value.delete_message_batch.side_effect = mock_delete_message_batch + + # Test that messages are filtered + self.sensor.message_filtering = 'jsonpath' + self.sensor.message_filtering_config = 'key.matches[*]' + result = self.sensor.poke(self.mock_context) + assert result + + # Test that only filtered messages are deleted + delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching] + calls_delete_message_batch = [ + mock.call().delete_message_batch(QueueUrl='test', Entries=delete_entries) + ] + mock_conn.assert_has_calls(calls_delete_message_batch) + + @mock.patch.object(SQSHook, "get_conn") + def test_poke_message_filtering_jsonpath_values(self, mock_conn): + self.sqs_hook.create_queue('test') + matching = [ + {"id": 11, "key": {"matches": [1, 2]}}, + {"id": 12, "key": {"matches": [1, 4, 5]}}, + {"id": 13, "key": {"matches": [4, 5]}}, + ] + non_matching = [ + {"id": 21, "key": {"matches": [10]}}, + {"id": 22, "key": {"nope": [5, 6]}}, + {"id": 23, "key": {"nope": [7, 8]}}, + ] + all = matching + non_matching + + def mock_receive_message(**kwargs): + messages = [] + for message in all: messages.append( - {'MessageId': body['id'], 'ReceiptHandle': 100 + body['id'], 'Body': json.dumps(body)} + { + 'MessageId': message['id'], + 'ReceiptHandle': 100 + message['id'], + 'Body': json.dumps(message), + } ) return {'Messages': messages} @@ -185,6 +275,7 @@ def mock_delete_message_batch(**kwargs): # Test that messages are filtered self.sensor.message_filtering = 'jsonpath' self.sensor.message_filtering_config = 'key.matches[*]' + self.sensor.message_filtering_match_values = [1, 4] result = self.sensor.poke(self.mock_context) assert result From 45d61c95469db4b264503d3a7c3373c51652ab41 Mon Sep 17 00:00:00 2001 From: Bjorn Olsen Date: Thu, 15 Jul 2021 10:43:28 +0200 Subject: [PATCH 5/6] address PR comments --- airflow/providers/amazon/aws/sensors/sqs.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index 11734aa814539..be09734fb7f37 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -117,20 +117,20 @@ def poke(self, context): messages = response['Messages'] num_messages = len(messages) - self.log.info("received %s messages", str(num_messages)) + self.log.info("Received %d messages", num_messages) - if num_messages == 0: + if not num_messages: return False if self.message_filtering: messages = self.filter_messages(messages) num_messages = len(messages) - self.log.info("filtered %s messages", str(num_messages)) + self.log.info("There are %d messages left after filtering", num_messages) - if num_messages == 0: + if not num_messages: return False - self.log.info("deleting %s messages", str(num_messages)) + self.log.info("Deleting %d messages", num_messages) entries = [ {'Id': message['MessageId'], 'ReceiptHandle': message['ReceiptHandle']} for message in messages From 045bde68d858b29bfeea83199dd7aaadb09234b5 Mon Sep 17 00:00:00 2001 From: Bjorn Olsen Date: Mon, 26 Jul 2021 10:26:58 +0200 Subject: [PATCH 6/6] address PR comments 2 --- airflow/providers/amazon/aws/sensors/sqs.py | 25 ++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index be09734fb7f37..e2d04e1f1b0bd 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -20,6 +20,7 @@ from typing import Any, Optional from jsonpath_ng import parse +from typing_extensions import Literal from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.sqs import SQSHook @@ -47,16 +48,16 @@ class SQSSensor(BaseSensorOperator): `None` (no filtering, default), `'literal'` (message Body literal match) or `'jsonpath'` (message Body filtered using a JSONPath expression). You may add further methods by overriding the relevant class methods. - :type message_filtering: Optional[str] + :type message_filtering: Optional[Literal["literal", "jsonpath"]] :param message_filtering_match_values: Optional value/s for the message filter to match on. For example, with literal matching, if a message body matches any of the specified values then it is included. For JSONPath matching, the result of the JSONPath expression is used and may match any of the specified values. - :type message_filtering_match_values: Optional[Any] + :type message_filtering_match_values: Any :param message_filtering_config: Additional configuration to pass to the message filter. For example with JSONPath filtering you can pass a JSONPath expression string here, such as `'foo[*].baz'`. Messages with a Body which does not match are ignored. - :type message_filtering_config: Optional[Any] + :type message_filtering_config: Any """ template_fields = ('sqs_queue', 'max_messages', 'message_filtering_config') @@ -69,9 +70,9 @@ def __init__( max_messages: int = 5, wait_time_seconds: int = 1, visibility_timeout: Optional[int] = None, - message_filtering: Optional[str] = None, - message_filtering_match_values: Optional[Any] = None, - message_filtering_config: Optional[Any] = None, + message_filtering: Optional[Literal["literal", "jsonpath"]] = None, + message_filtering_match_values: Any = None, + message_filtering_config: Any = None, **kwargs, ): super().__init__(**kwargs) @@ -82,10 +83,16 @@ def __init__( self.visibility_timeout = visibility_timeout self.message_filtering = message_filtering + if message_filtering_match_values is not None: - if not isinstance(message_filtering_match_values, list): - message_filtering_match_values = [message_filtering_match_values] + if not isinstance(message_filtering_match_values, set): + message_filtering_match_values = set(message_filtering_match_values) self.message_filtering_match_values = message_filtering_match_values + + if self.message_filtering == 'literal': + if self.message_filtering_match_values is None: + raise TypeError('message_filtering_match_values must be specified for literal matching') + self.message_filtering_config = message_filtering_config self.hook: Optional[SQSHook] = None @@ -163,8 +170,6 @@ def filter_messages(self, messages): def filter_messages_literal(self, messages): filtered_messages = [] - if self.message_filtering_match_values is None: - raise Exception('message_filtering_match_values must be specified for literal matching') for message in messages: if message['Body'] in self.message_filtering_match_values: filtered_messages.append(message)