From 18b8ed4c545b149067a0459dae83a76ecb4d6b43 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 30 Apr 2025 23:00:40 +0200 Subject: [PATCH] Move SQS message queue to Amazon provider The common.messaging abstraction should discover common messaging queue providers using the same mechanism as we have for other core extensions. Previously common.messaging had the optional (but not really) dependencies to other providers, but that was not needed and introduced unnecessary coupling. By switching to our built-in discovery mechanism we get immediately all the niceties of provider discovery mechanisms: * queue is provided by the actual provider where the service or integration already is implemented (sqs -> amazon provider, in the future kafka -> kafka provider) * queues are discovered from installed providers * there is no coupling or imports between common.messaging and the providers that implement messaging, the dependency is in the other way - providers that implement messaging depend on common.messaging * airflow providers queues CLI and providers core extensions documentation is automatically generated --- .../authoring-and-scheduling/connections.rst | 2 +- airflow-core/docs/core-concepts/index.rst | 1 + .../docs/core-concepts/message-queues.rst | 41 +++++++++++++ airflow-core/src/airflow/cli/cli_config.py | 6 ++ .../airflow/cli/commands/provider_command.py | 13 +++++ .../src/airflow/provider.yaml.schema.json | 12 ++++ .../src/airflow/provider_info.schema.json | 12 ++++ airflow-core/src/airflow/providers_manager.py | 22 +++++++ .../src/airflow_breeze/global_constants.py | 3 +- .../sphinx_exts/operators_and_hooks_ref.py | 12 ++++ .../sphinx_exts/templates/queues.rst.jinja2 | 27 +++++++++ .../core-extensions/message-queues.rst | 33 +++++++++++ providers/amazon/docs/index.rst | 1 + .../amazon/docs/message-queues/index.rst | 43 ++++++++++++++ providers/amazon/provider.yaml | 4 ++ providers/amazon/pyproject.toml | 10 +++- .../src/airflow/providers/amazon/__init__.py | 2 +- .../providers/amazon/aws/queues/__init__.py | 16 ++++++ .../providers/amazon/aws/queues}/sqs.py | 15 ++++- .../providers/amazon/get_provider_info.py | 1 + .../tests/unit/amazon/aws/queues/__init__.py | 16 ++++++ .../tests/unit/amazon/aws/queues/test_sqs.py | 57 +++++++++++++++++++ .../unit/amazon/aws/triggers/test_sqs.py | 19 ++++++- providers/cncf/kubernetes/provider.yaml | 2 +- .../cncf/kubernetes/get_provider_info.py | 2 +- providers/common/messaging/docs/index.rst | 1 - providers/common/messaging/docs/providers.rst | 22 +++---- providers/common/messaging/provider.yaml | 1 + providers/common/messaging/pyproject.toml | 15 +++-- .../providers/common/messaging/__init__.py | 6 +- .../common/messaging/providers/__init__.py | 25 +++++++- .../common/messaging/triggers/msg_queue.py | 20 +++++-- .../messaging/triggers/test_msg_queue.py | 19 +++---- pyproject.toml | 4 +- .../update_airflow_pyproject_toml.py | 2 +- .../run_provider_yaml_files_check.py | 34 ++++++----- 36 files changed, 453 insertions(+), 68 deletions(-) create mode 100644 airflow-core/docs/core-concepts/message-queues.rst create mode 100644 devel-common/src/sphinx_exts/templates/queues.rst.jinja2 create mode 100644 providers-summary-docs/core-extensions/message-queues.rst create mode 100644 providers/amazon/docs/message-queues/index.rst create mode 100644 providers/amazon/src/airflow/providers/amazon/aws/queues/__init__.py rename providers/{common/messaging/src/airflow/providers/common/messaging/providers => amazon/src/airflow/providers/amazon/aws/queues}/sqs.py (73%) create mode 100644 providers/amazon/tests/unit/amazon/aws/queues/__init__.py create mode 100644 providers/amazon/tests/unit/amazon/aws/queues/test_sqs.py diff --git a/airflow-core/docs/authoring-and-scheduling/connections.rst b/airflow-core/docs/authoring-and-scheduling/connections.rst index ba74826519eb2..7f9cbaa443ecc 100644 --- a/airflow-core/docs/authoring-and-scheduling/connections.rst +++ b/airflow-core/docs/authoring-and-scheduling/connections.rst @@ -47,5 +47,5 @@ Airflow allows to define custom connection types. This is what is described in d :doc:`apache-airflow-providers:index` - providers give you the capability of defining your own connections. The connection customization can be done by any provider, but also many of the providers managed by the community define custom connection types. -The full list of all providers delivered by ``Apache Airflow community managed providers`` can be found in +The full list of all connections delivered by ``Apache Airflow community managed providers`` can be found in :doc:`apache-airflow-providers:core-extensions/connections`. diff --git a/airflow-core/docs/core-concepts/index.rst b/airflow-core/docs/core-concepts/index.rst index fdb9c2d146aaa..8ba314cd36778 100644 --- a/airflow-core/docs/core-concepts/index.rst +++ b/airflow-core/docs/core-concepts/index.rst @@ -43,6 +43,7 @@ Here you can find detailed documentation about each one of the core concepts of auth-manager/index objectstorage backfill + message-queues **Communication** diff --git a/airflow-core/docs/core-concepts/message-queues.rst b/airflow-core/docs/core-concepts/message-queues.rst new file mode 100644 index 0000000000000..573189a24f891 --- /dev/null +++ b/airflow-core/docs/core-concepts/message-queues.rst @@ -0,0 +1,41 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _concepts:message-queues: + +Message Queues +============== + +The Message Queues are a way to expose capability of external event-driven scheduling of Dags. + +Apache Airflow is primarily designed for time-based and dependency-based scheduling of workflows. However, +modern data architectures often require near real-time processing and the ability to react to +events from various sources, such as message queues. + +Airflow has native event-driven capability, allowing users to create workflows that can be +triggered by external events, thus enabling more responsive data pipelines. + +Airflow supports poll-based event-driven scheduling, where the Triggerer can poll +external message queues using built-in :class:`airflow.triggers.base.BaseTrigger` classes. This allows users +to create workflows that can be triggered by external events, such as messages arriving +in a queue or changes in a database efficiently. + +Airflow constantly monitors the state of an external resource and updates the asset whenever the external +resource reaches a given state (if it does reach it). To achieve this, we leverage Airflow Triggers. +Triggers are small, asynchronous pieces of Python code whose job is to poll an external resource state. + +The list of supported message queues is available in :doc:`apache-airflow-providers:core-extensions/message-queues`. diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index d094569c3c2f9..61622dbeefc4c 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -1597,6 +1597,12 @@ class GroupCommand(NamedTuple): func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"), args=(ARG_OUTPUT, ARG_VERBOSE), ), + ActionCommand( + name="queues", + help="Get information about queues provided", + func=lazy_load_command("airflow.cli.commands.provider_command.queues_list"), + args=(ARG_OUTPUT, ARG_VERBOSE), + ), ActionCommand( name="notifications", help="Get information about notifications provided", diff --git a/airflow-core/src/airflow/cli/commands/provider_command.py b/airflow-core/src/airflow/cli/commands/provider_command.py index bd03d07ee45b2..81a96b2a9bc24 100644 --- a/airflow-core/src/airflow/cli/commands/provider_command.py +++ b/airflow-core/src/airflow/cli/commands/provider_command.py @@ -220,6 +220,19 @@ def executors_list(args): ) +@suppress_logs_and_warning +@providers_configuration_loaded +def queues_list(args): + """List all queues at the command line.""" + AirflowConsole().print_as( + data=list(ProvidersManager().queue_class_names), + output=args.output, + mapper=lambda x: { + "queue_class_names": x, + }, + ) + + @suppress_logs_and_warning @providers_configuration_loaded def config_list(args): diff --git a/airflow-core/src/airflow/provider.yaml.schema.json b/airflow-core/src/airflow/provider.yaml.schema.json index 75ba892569b4e..c35e0d9de25e7 100644 --- a/airflow-core/src/airflow/provider.yaml.schema.json +++ b/airflow-core/src/airflow/provider.yaml.schema.json @@ -467,6 +467,18 @@ } } }, + "queues": { + "type": "array", + "description": "Message Queues exposed by the provider", + "items": { + "name": { + "type": "string" + }, + "message-queue-class": { + "type": "string" + } + } + }, "source-date-epoch": { "type": "integer", "description": "Source date epoch - seconds since epoch (gmtime) when the release documentation was prepared. Used to generate reproducible package builds with flint.", diff --git a/airflow-core/src/airflow/provider_info.schema.json b/airflow-core/src/airflow/provider_info.schema.json index 1785ba02ed623..3ca9756dfb2f6 100644 --- a/airflow-core/src/airflow/provider_info.schema.json +++ b/airflow-core/src/airflow/provider_info.schema.json @@ -416,6 +416,18 @@ "description": "Class to instantiate the plugin" } } + }, + "queues": { + "type": "array", + "description": "Message Queues exposed by the provider", + "items": { + "name": { + "type": "string" + }, + "message-queue-class": { + "type": "string" + } + } } }, "definitions": { diff --git a/airflow-core/src/airflow/providers_manager.py b/airflow-core/src/airflow/providers_manager.py index 85062e9f75e63..81611c8205df7 100644 --- a/airflow-core/src/airflow/providers_manager.py +++ b/airflow-core/src/airflow/providers_manager.py @@ -416,6 +416,7 @@ def __init__(self): self._auth_manager_class_name_set: set[str] = set() self._secrets_backend_class_name_set: set[str] = set() self._executor_class_name_set: set[str] = set() + self._queue_class_name_set: set[str] = set() self._provider_configs: dict[str, dict[str, Any]] = {} self._trigger_info_set: set[TriggerInfo] = set() self._notification_info_set: set[NotificationInfo] = set() @@ -533,6 +534,12 @@ def initialize_providers_executors(self): self.initialize_providers_list() self._discover_executors() + @provider_info_cache("queues") + def initialize_providers_queues(self): + """Lazy initialization of providers queue information.""" + self.initialize_providers_list() + self._discover_queues() + @provider_info_cache("notifications") def initialize_providers_notifications(self): """Lazy initialization of providers notifications information.""" @@ -1091,6 +1098,14 @@ def _discover_executors(self) -> None: if _correctness_check(provider_package, executors_class_name, provider): self._executor_class_name_set.add(executors_class_name) + def _discover_queues(self) -> None: + """Retrieve all queues defined in the providers.""" + for provider_package, provider in self._provider_dict.items(): + if provider.data.get("queues"): + for queue_class_name in provider.data["queues"]: + if _correctness_check(provider_package, queue_class_name, provider): + self._queue_class_name_set.add(queue_class_name) + def _discover_config(self) -> None: """Retrieve all configs defined in the providers.""" for provider_package, provider in self._provider_dict.items(): @@ -1221,6 +1236,11 @@ def executor_class_names(self) -> list[str]: self.initialize_providers_executors() return sorted(self._executor_class_name_set) + @property + def queue_class_names(self) -> list[str]: + self.initialize_providers_queues() + return sorted(self._queue_class_name_set) + @property def filesystem_module_names(self) -> list[str]: self.initialize_providers_filesystems() @@ -1268,9 +1288,11 @@ def _cleanup(self): self._auth_manager_class_name_set.clear() self._secrets_backend_class_name_set.clear() self._executor_class_name_set.clear() + self._queue_class_name_set.clear() self._provider_configs.clear() self._trigger_info_set.clear() self._notification_info_set.clear() self._plugins_set.clear() + self._initialized = False self._initialization_stack_trace = None diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index f6a262a6c7fbd..38654fdc924cd 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -696,7 +696,8 @@ def generate_provider_dependencies_if_needed(): { "python-version": "3.9", "airflow-version": "3.0.0", - "remove-providers": "cloudant", + # TODO: bring back common-messaging when we bump airflow to 3.0.1 + "remove-providers": "cloudant common.messaging", "run-tests": "true", }, ] diff --git a/devel-common/src/sphinx_exts/operators_and_hooks_ref.py b/devel-common/src/sphinx_exts/operators_and_hooks_ref.py index 2730df37baed5..eab292d668bff 100644 --- a/devel-common/src/sphinx_exts/operators_and_hooks_ref.py +++ b/devel-common/src/sphinx_exts/operators_and_hooks_ref.py @@ -481,6 +481,17 @@ def render_content( ) +class QueuesDirective(BaseJinjaReferenceDirective): + """Generate list of queues""" + + def render_content( + self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR + ) -> str: + return _common_render_list_content( + header_separator=header_separator, resource_type="queues", template="queues.rst.jinja2" + ) + + class DeferrableOperatorDirective(BaseJinjaReferenceDirective): """Generate list of deferrable operators""" @@ -521,6 +532,7 @@ def setup(app): app.add_directive("airflow-extra-links", ExtraLinksDirective) app.add_directive("airflow-notifications", NotificationsDirective) app.add_directive("airflow-executors", ExecutorsDirective) + app.add_directive("airflow-queues", QueuesDirective) app.add_directive("airflow-deferrable-operators", DeferrableOperatorDirective) app.add_directive("airflow-deprecations", DeprecationsDirective) app.add_directive("airflow-dataset-schemes", AssetSchemeDirective) diff --git a/devel-common/src/sphinx_exts/templates/queues.rst.jinja2 b/devel-common/src/sphinx_exts/templates/queues.rst.jinja2 new file mode 100644 index 0000000000000..914d3476906a9 --- /dev/null +++ b/devel-common/src/sphinx_exts/templates/queues.rst.jinja2 @@ -0,0 +1,27 @@ +{# + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +#} +{%for provider, provider_dict in items.items() %} +{{ provider_dict['name'] }} +{{ header_separator * (provider_dict['name']|length) }} + +{% for queue in provider_dict['queues'] -%} +- :class:`~{{ queue }}` +{% endfor -%} + +{% endfor %} diff --git a/providers-summary-docs/core-extensions/message-queues.rst b/providers-summary-docs/core-extensions/message-queues.rst new file mode 100644 index 0000000000000..46f3f40d2a3e4 --- /dev/null +++ b/providers-summary-docs/core-extensions/message-queues.rst @@ -0,0 +1,33 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Message Queues +-------------- + +This is a summary of all Apache Airflow Community provided implementations of Queues +exposed via community-managed providers. + +Airflow can be extended by providers with Queues. Each provider can define their own Queues, +that can be configured to handle executing tasks + +The queues are explained in +:doc:`apache-airflow:core-concepts/message-queues` and you can also see those +provided by the community-managed providers: + +.. airflow-queues:: + :tags: None + :header-separator: " diff --git a/providers/amazon/docs/index.rst b/providers/amazon/docs/index.rst index 7814fdf466e92..dbc8799cd01f1 100644 --- a/providers/amazon/docs/index.rst +++ b/providers/amazon/docs/index.rst @@ -42,6 +42,7 @@ Logging for Tasks Configuration Executors + Message Queues AWS Auth manager CLI diff --git a/providers/amazon/docs/message-queues/index.rst b/providers/amazon/docs/message-queues/index.rst new file mode 100644 index 0000000000000..388be374c667c --- /dev/null +++ b/providers/amazon/docs/message-queues/index.rst @@ -0,0 +1,43 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Amazon Messaging Queues +======================= + +Amazon SQS Queue Provider +------------------------- + +Implemented by :class:`~airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider` + +The Amazon SQS Queue Provider is a message queue provider that uses +Amazon Simple Queue Service (SQS) as the underlying message queue system. +It allows you to send and receive messages using SQS queues in your Airflow workflows. +The provider supports both standard and FIFO queues, and it provides features +such as message visibility timeout, message retention period, and dead-letter queues. + +The queue must be matching this regex: + +.. exampleinclude:: /../src/airflow/providers/amazon/aws/queues/sqs.py + :language: python + :dedent: 0 + :start-after: [START queue_regexp] + :end-before: [END queue_regexp] + + +The queue parameter is passed directly to ``sqs_queue`` parameter of the underlying +:class:`~airflow.providers.amazon.aws.triggers.sqs.SqsSensorTrigger` class, and passes +all the kwargs directly to the trigger constructor if added. diff --git a/providers/amazon/provider.yaml b/providers/amazon/provider.yaml index 560e71bccb9a8..cda46465d4712 100644 --- a/providers/amazon/provider.yaml +++ b/providers/amazon/provider.yaml @@ -25,6 +25,7 @@ state: ready source-date-epoch: 1744788746 # note that those versions are maintained by release manager - do not update them manually versions: + - 9.7.0 - 9.6.1 - 9.6.0 - 9.5.0 @@ -1178,3 +1179,6 @@ executors: auth-managers: - airflow.providers.amazon.aws.auth_manager.aws_auth_manager.AwsAuthManager + +queues: + - airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider diff --git a/providers/amazon/pyproject.toml b/providers/amazon/pyproject.toml index 472fd3fe2698f..437705ce3bc1b 100644 --- a/providers/amazon/pyproject.toml +++ b/providers/amazon/pyproject.toml @@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi" [project] name = "apache-airflow-providers-amazon" -version = "9.6.1" +version = "9.7.0" description = "Provider package apache-airflow-providers-amazon for Apache Airflow" readme = "README.rst" authors = [ @@ -149,6 +149,9 @@ dependencies = [ "standard" = [ "apache-airflow-providers-standard" ] +"common.messaging" = [ + "apache-airflow-providers-common-messaging>=1.0.1" +] [dependency-groups] dev = [ @@ -158,6 +161,7 @@ dev = [ "apache-airflow-providers-apache-hive", "apache-airflow-providers-cncf-kubernetes", "apache-airflow-providers-common-compat", + "apache-airflow-providers-common-messaging", "apache-airflow-providers-common-sql", "apache-airflow-providers-exasol", "apache-airflow-providers-ftp", @@ -211,8 +215,8 @@ apache-airflow-providers-common-sql = {workspace = true} apache-airflow-providers-standard = {workspace = true} [project.urls] -"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.1" -"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.1/changelog.html" +"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0" +"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0/changelog.html" "Bug Tracker" = "https://github.com/apache/airflow/issues" "Source Code" = "https://github.com/apache/airflow" "Slack Chat" = "https://s.apache.org/airflow-slack" diff --git a/providers/amazon/src/airflow/providers/amazon/__init__.py b/providers/amazon/src/airflow/providers/amazon/__init__.py index bd8e76ed50d24..705ef1e9f9cc8 100644 --- a/providers/amazon/src/airflow/providers/amazon/__init__.py +++ b/providers/amazon/src/airflow/providers/amazon/__init__.py @@ -29,7 +29,7 @@ __all__ = ["__version__"] -__version__ = "9.6.1" +__version__ = "9.7.0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( "2.10.0" diff --git a/providers/amazon/src/airflow/providers/amazon/aws/queues/__init__.py b/providers/amazon/src/airflow/providers/amazon/aws/queues/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/amazon/src/airflow/providers/amazon/aws/queues/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/providers/sqs.py b/providers/amazon/src/airflow/providers/amazon/aws/queues/sqs.py similarity index 73% rename from providers/common/messaging/src/airflow/providers/common/messaging/providers/sqs.py rename to providers/amazon/src/airflow/providers/amazon/aws/queues/sqs.py index 97ca2dd8905c4..aff5b462b7f32 100644 --- a/providers/common/messaging/src/airflow/providers/common/messaging/providers/sqs.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/queues/sqs.py @@ -19,18 +19,29 @@ import re from typing import TYPE_CHECKING +from airflow.exceptions import AirflowOptionalProviderFeatureException from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger -from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider + +try: + from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider +except ImportError: + raise AirflowOptionalProviderFeatureException( + "This feature requires the 'common.messaging' provider to be installed in version >= 1.0.1." + ) if TYPE_CHECKING: from airflow.triggers.base import BaseEventTrigger +# [START queue_regexp] +QUEUE_REGEXP = r"^https://sqs\.[^.]+\.amazonaws\.com/[0-9]+/.+" +# [END queue_regexp] + class SqsMessageQueueProvider(BaseMessageQueueProvider): """Configuration for SQS integration with common-messaging.""" def queue_matches(self, queue: str) -> bool: - return bool(re.match(r"^https://sqs\.[^.]+\.amazonaws\.com/[0-9]+/.+", queue)) + return bool(re.match(QUEUE_REGEXP, queue)) def trigger_class(self) -> type[BaseEventTrigger]: return SqsSensorTrigger diff --git a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py index 5aa818629b64d..136c907a08e82 100644 --- a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py +++ b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py @@ -1291,4 +1291,5 @@ def get_provider_info(): }, "executors": ["airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"], "auth-managers": ["airflow.providers.amazon.aws.auth_manager.aws_auth_manager.AwsAuthManager"], + "queues": ["airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider"], } diff --git a/providers/amazon/tests/unit/amazon/aws/queues/__init__.py b/providers/amazon/tests/unit/amazon/aws/queues/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/amazon/tests/unit/amazon/aws/queues/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/amazon/tests/unit/amazon/aws/queues/test_sqs.py b/providers/amazon/tests/unit/amazon/aws/queues/test_sqs.py new file mode 100644 index 0000000000000..95c28392473e5 --- /dev/null +++ b/providers/amazon/tests/unit/amazon/aws/queues/test_sqs.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger + +pytest.importorskip("airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider") + + +def test_message_sqs_queue_create(): + from airflow.providers.amazon.aws.queues.sqs import SqsMessageQueueProvider + from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider + + provider = SqsMessageQueueProvider() + assert isinstance(provider, BaseMessageQueueProvider) + + +def test_message_sqs_queue_matches(): + from airflow.providers.amazon.aws.queues.sqs import SqsMessageQueueProvider + + provider = SqsMessageQueueProvider() + assert provider.queue_matches("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue") + assert not provider.queue_matches("https://sqs.us-east-1.amazonaws.com/123456789012") + assert not provider.queue_matches("https://sqs.us-east-1.amazonaws.com/123456789012/") + assert not provider.queue_matches("https://sqs.us-east-1.amazonaws.com/") + + +def test_message_sqs_queue_trigger_class(): + from airflow.providers.amazon.aws.queues.sqs import SqsMessageQueueProvider + + provider = SqsMessageQueueProvider() + assert provider.trigger_class() == SqsSensorTrigger + + +def test_message_sqs_queue_trigger_kwargs(): + from airflow.providers.amazon.aws.queues.sqs import SqsMessageQueueProvider + + provider = SqsMessageQueueProvider() + assert provider.trigger_kwargs("https://sqs.us-east-1.amazonaws.com/123456789012/my-queue") == { + "sqs_queue": "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue", + } diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_sqs.py b/providers/amazon/tests/unit/amazon/aws/triggers/test_sqs.py index 97c6cde175069..11ea1c9ca0cb3 100644 --- a/providers/amazon/tests/unit/amazon/aws/triggers/test_sqs.py +++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_sqs.py @@ -20,7 +20,7 @@ import pytest -from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, get_base_airflow_version_tuple TEST_SQS_QUEUE = "test-sqs-queue" TEST_AWS_CONN_ID = "test-aws-conn-id" @@ -39,7 +39,9 @@ class TestSqsTriggers: @pytest.fixture(autouse=True) - def _setup_test_cases(self): + def _setup_test_cases(self, cleanup_providers_manager): + from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger + self.sqs_trigger = SqsSensorTrigger( sqs_queue=TEST_SQS_QUEUE, aws_conn_id=TEST_AWS_CONN_ID, @@ -92,3 +94,16 @@ async def test_poke_no_messages(self): mock_client.receive_message.return_value = mock_response messages = await self.sqs_trigger.poke(client=mock_client) assert len(messages) == 0 + + +@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Requires Airflow 3.0.+") +class TestMessageQueueTrigger: + def test_provider_integrations(self, cleanup_providers_manager): + if get_base_airflow_version_tuple() < (3, 0, 1): + pytest.skip("This test is only for Airflow 3.0.1+") + queue = "https://sqs.us-east-1.amazonaws.com/0123456789/Test" + from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger + from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger + + trigger = MessageQueueTrigger(queue=queue) + assert isinstance(trigger.trigger, SqsSensorTrigger) diff --git a/providers/cncf/kubernetes/provider.yaml b/providers/cncf/kubernetes/provider.yaml index 3df6ef515a29a..e8cd76c6dc29d 100644 --- a/providers/cncf/kubernetes/provider.yaml +++ b/providers/cncf/kubernetes/provider.yaml @@ -372,4 +372,4 @@ config: default: "0" executors: - - airflow.providers.cncf.kubernetes.kubernetes_executor.KubernetesExecutor + - airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py index 821f95cf614cb..b6f2da2ee7f33 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py @@ -279,5 +279,5 @@ def get_provider_info(): }, }, }, - "executors": ["airflow.providers.cncf.kubernetes.kubernetes_executor.KubernetesExecutor"], + "executors": ["airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor"], } diff --git a/providers/common/messaging/docs/index.rst b/providers/common/messaging/docs/index.rst index 0f50a2c2eb39d..d2217175a89c6 100644 --- a/providers/common/messaging/docs/index.rst +++ b/providers/common/messaging/docs/index.rst @@ -44,7 +44,6 @@ Python API <_api/airflow/providers/common/messaging/index> Base Provider <_api/airflow/providers/common/messaging/providers/base_provider/index> - SQS provider <_api/airflow/providers/common/messaging/providers/sqs/index> .. toctree:: :hidden: diff --git a/providers/common/messaging/docs/providers.rst b/providers/common/messaging/docs/providers.rst index c0b6b013f111b..0559f9c6ece72 100644 --- a/providers/common/messaging/docs/providers.rst +++ b/providers/common/messaging/docs/providers.rst @@ -19,20 +19,22 @@ Messaging Triggers ================== -Operators, sensors, and triggers in the Common Messaging provider serve as wrappers around those from other providers, -specifically for message queues. -They offer an abstraction layer to simplify usage and make it easier to switch between different queue providers. - -Supported queue providers -~~~~~~~~~~~~~~~~~~~~~~~~~ - -* Amazon SQS: :class:`~airflow.providers.common.messaging.providers.sqs.SqsMessageQueueProvider` +Operators, sensors, and triggers in the Common Messaging provider serve as wrappers around those +from other providers, specifically for message queues. +They offer an abstraction layer to simplify usage and make it easier to switch between +different queue providers. Add support for a provider ~~~~~~~~~~~~~~~~~~~~~~~~~~ To support a new provider please follow the steps below: -1. Create a new class extending :class:`~airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider`. +1. Create a new class in the provider extending +:class:`~airflow.providers.common.messaging.providers.base_provider.BaseMessageQueueProvider`. Make sure it implements all abstract methods -2. Add this class to the list ``MESSAGE_QUEUE_PROVIDERS`` in ``airflow/providers/common/messaging/providers/__init__.py`` + +2. Expose it via "queues" property in the ``provider.yaml`` file of the provider where you add the new class. +The ``queues`` property should be a list of fully-qualified class names of the queues. + + +The list of supported message queues is available in :doc:`apache-airflow-providers:core-extensions/message-queues`. diff --git a/providers/common/messaging/provider.yaml b/providers/common/messaging/provider.yaml index b6fd64d5ffb61..717163288663e 100644 --- a/providers/common/messaging/provider.yaml +++ b/providers/common/messaging/provider.yaml @@ -25,6 +25,7 @@ state: ready source-date-epoch: 1741121853 # note that those versions are maintained by release manager - do not update them manually versions: + - 1.1.0 - 1.0.0 triggers: diff --git a/providers/common/messaging/pyproject.toml b/providers/common/messaging/pyproject.toml index 7447ce1d3cf28..8ab8b41b5fe98 100644 --- a/providers/common/messaging/pyproject.toml +++ b/providers/common/messaging/pyproject.toml @@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi" [project] name = "apache-airflow-providers-common-messaging" -version = "1.0.0" +version = "1.1.0" description = "Provider package apache-airflow-providers-common-messaging for Apache Airflow" readme = "README.rst" authors = [ @@ -57,14 +57,17 @@ requires-python = "~=3.9" # Make sure to run ``breeze static-checks --type update-providers-dependencies --all-files`` # After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` dependencies = [ - "apache-airflow>=3.0.0", + # Provider's manager had a missing support for common.messaging provider's discovery before Airflow 3.0.1 + # The new common.messaging provider is not compatible with Airflow 3.0.0, and Airflow has to be + # Upgraded to 3.0.1+ to use the new provider. + "apache-airflow>=3.0.1", ] # The optional dependencies should be modified in place in the generated file # Any change in the dependencies is preserved when the file is regenerated [project.optional-dependencies] "amazon" = [ - "apache-airflow-providers-amazon" + "apache-airflow-providers-amazon>=9.7.0" ] [dependency-groups] @@ -72,8 +75,8 @@ dev = [ "apache-airflow", "apache-airflow-task-sdk", "apache-airflow-devel-common", - "apache-airflow-providers-amazon", # Additional devel dependencies (do not remove this line and add extra development dependencies) + "apache-airflow-providers-amazon", ] # To build docs: @@ -102,8 +105,8 @@ apache-airflow-providers-common-sql = {workspace = true} apache-airflow-providers-standard = {workspace = true} [project.urls] -"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-common-messaging/1.0.0" -"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-common-messaging/1.0.0/changelog.html" +"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-common-messaging/1.1.0" +"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-common-messaging/1.1.0/changelog.html" "Bug Tracker" = "https://github.com/apache/airflow/issues" "Source Code" = "https://github.com/apache/airflow" "Slack Chat" = "https://s.apache.org/airflow-slack" diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py b/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py index 563a134cb3847..b5a8fc53a40b8 100644 --- a/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py +++ b/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py @@ -29,11 +29,11 @@ __all__ = ["__version__"] -__version__ = "1.0.0" +__version__ = "1.1.0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( - "3.0.0" + "3.0.1" ): raise RuntimeError( - f"The package `apache-airflow-providers-common-messaging:{__version__}` needs Apache Airflow 3.0.0+" + f"The package `apache-airflow-providers-common-messaging:{__version__}` needs Apache Airflow 3.0.1+" ) diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py b/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py index b889022858b4e..91a3d7ca8450e 100644 --- a/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py +++ b/providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py @@ -16,6 +16,27 @@ # under the License. from __future__ import annotations -from airflow.providers.common.messaging.providers.sqs import SqsMessageQueueProvider +import importlib -MESSAGE_QUEUE_PROVIDERS = [SqsMessageQueueProvider()] +from airflow.providers_manager import ProvidersManager +from airflow.utils.deprecation_tools import add_deprecated_classes + +providers_manager = ProvidersManager() +providers_manager.initialize_providers_queues() + + +def create_class_by_name(name: str): + module_name, class_name = name.rsplit(".", 1) + module = importlib.import_module(module_name) + return getattr(module, class_name) + + +MESSAGE_QUEUE_PROVIDERS = [create_class_by_name(name)() for name in providers_manager.queue_class_names] + +__deprecated_classes = { + "sqs": { + "SqsMessageQueueProvider": "airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider", + }, +} + +add_deprecated_classes(__deprecated_classes, __name__) diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py index 35f1cfbfb39d7..689bbf6d7d40d 100644 --- a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py +++ b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py @@ -21,7 +21,6 @@ from functools import cached_property from typing import Any -from airflow.exceptions import AirflowException from airflow.providers.common.messaging.providers import MESSAGE_QUEUE_PROVIDERS from airflow.triggers.base import BaseEventTrigger, TriggerEvent @@ -48,17 +47,30 @@ def __init__(self, *, queue: str, **kwargs: Any) -> None: @cached_property def trigger(self) -> BaseEventTrigger: + if len(MESSAGE_QUEUE_PROVIDERS) == 0: + self.log.error( + "No message queue providers are available. " + "Please ensure that you have the necessary providers installed." + ) + raise ValueError("No message queue providers are available. ") providers = [provider for provider in MESSAGE_QUEUE_PROVIDERS if provider.queue_matches(self.queue)] if len(providers) == 0: - raise ValueError(f"The queue '{self.queue}' is not recognized by ``MessageQueueTrigger``.") + self.log.error( + "The queue '%s' is not recognized by any of the registered providers. " + "The available providers are: '%s'.", + self.queue, + ", ".join([provider for provider in MESSAGE_QUEUE_PROVIDERS]), + ) + raise ValueError("The queue is not recognized by any of the registered providers.") if len(providers) > 1: self.log.error( "The queue '%s' is recognized by more than one provider. " "At least two providers in ``MESSAGE_QUEUE_PROVIDERS`` are colliding with each " - "other.", + "other: '%s'", self.queue, + ", ".join([provider for provider in providers]), ) - raise AirflowException(f"The queue '{self.queue}' is recognized by more than one provider.") + raise ValueError(f"The queue '{self.queue}' is recognized by more than one provider.") return providers[0].trigger_class()( **providers[0].trigger_kwargs(self.queue, **self.kwargs), **self.kwargs ) diff --git a/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py b/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py index af998ebb07418..89f3fedf641ea 100644 --- a/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py +++ b/providers/common/messaging/tests/unit/common/messaging/triggers/test_msg_queue.py @@ -16,19 +16,14 @@ # under the License. from __future__ import annotations -import pytest +from unittest import mock -from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger -class TestMessageQueueTrigger: - @pytest.mark.parametrize( - ("queue", "expected_trigger_class"), - [ - ("https://sqs.us-east-1.amazonaws.com/0123456789/Test", SqsSensorTrigger), - ], - ) - def test_provider_integrations(self, queue, expected_trigger_class): - trigger = MessageQueueTrigger(queue=queue) - assert isinstance(trigger.trigger, expected_trigger_class) +@mock.patch( + "airflow.providers.common.messaging.providers.MESSAGE_QUEUE_PROVIDERS", new_callable=mock.PropertyMock +) +def test_provider_integrations(_): + trigger = MessageQueueTrigger(queue="any queue") + assert trigger is not None diff --git a/pyproject.toml b/pyproject.toml index 79a32907a3ed6..b7307c2390bd1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -179,7 +179,7 @@ packages = [] "apache-airflow-providers-common-io>=1.4.2" ] "common.messaging" = [ - "apache-airflow-providers-common-messaging>=1.0.0" + "apache-airflow-providers-common-messaging>=1.0.1" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py ] "common.sql" = [ "apache-airflow-providers-common-sql>=1.18.0" @@ -411,7 +411,7 @@ packages = [] "apache-airflow-providers-cohere>=1.4.0", "apache-airflow-providers-common-compat>=1.2.1", "apache-airflow-providers-common-io>=1.4.2", - "apache-airflow-providers-common-messaging>=1.0.0", + "apache-airflow-providers-common-messaging>=1.0.1", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py "apache-airflow-providers-common-sql>=1.18.0", "apache-airflow-providers-databricks>=6.11.0", "apache-airflow-providers-datadog>=3.8.0", diff --git a/scripts/ci/pre_commit/update_airflow_pyproject_toml.py b/scripts/ci/pre_commit/update_airflow_pyproject_toml.py index 224ecdaac8727..8bf1fb2ca2fdf 100755 --- a/scripts/ci/pre_commit/update_airflow_pyproject_toml.py +++ b/scripts/ci/pre_commit/update_airflow_pyproject_toml.py @@ -59,7 +59,7 @@ "fab": parse_version("2.0.2"), "openlineage": parse_version("2.1.3"), "git": parse_version("0.0.2"), - "common.messaging": parse_version("1.0.0"), + "common.messaging": parse_version("1.0.1"), } diff --git a/scripts/in_container/run_provider_yaml_files_check.py b/scripts/in_container/run_provider_yaml_files_check.py index 348997b48881f..d1d11cb4d9c14 100755 --- a/scripts/in_container/run_provider_yaml_files_check.py +++ b/scripts/in_container/run_provider_yaml_files_check.py @@ -467,9 +467,7 @@ def check_plugin_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: return num_plugins, num_errors -@run_check("Checking extra-links belong to package, exist and are classes") -def check_extra_link_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: - resource_type = "extra-links" +def _check_simple_class_list(resource_type, yaml_files): num_errors = 0 num_extra_links = 0 for yaml_file_path, provider_data in yaml_files.items(): @@ -483,20 +481,24 @@ def check_extra_link_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: return num_extra_links, num_errors +@run_check("Checking extra-links belong to package, exist and are classes") +def check_extra_link_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: + return _check_simple_class_list("extra-links", yaml_files) + + @run_check("Checking notifications belong to package, exist and are classes") def check_notification_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: - resource_type = "notifications" - num_errors = 0 - num_notifications = 0 - for yaml_file_path, provider_data in yaml_files.items(): - provider_package = _filepath_to_module(yaml_file_path) - notifications = provider_data.get(resource_type) - if notifications: - num_notifications += len(notifications) - num_errors += check_if_objects_exist_and_belong_to_package( - notifications, provider_package, yaml_file_path, resource_type, ObjectType.CLASS - ) - return num_notifications, num_errors + return _check_simple_class_list("notifications", yaml_files) + + +@run_check("Checking executors belong to package, exist and are classes") +def check_executor_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: + return _check_simple_class_list("executors", yaml_files) + + +@run_check("Checking queues belong to package, exist and are classes") +def check_queue_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: + return _check_simple_class_list("queues", yaml_files) @run_check("Checking for duplicates in list of transfers") @@ -731,6 +733,8 @@ def check_providers_have_all_documentation_files(yaml_files: dict[str, dict]): check_completeness_of_list_of_transfers(all_parsed_yaml_files) check_hook_class_name_entries_in_connection_types(all_parsed_yaml_files) + check_executor_classes(all_parsed_yaml_files) + check_queue_classes(all_parsed_yaml_files) check_plugin_classes(all_parsed_yaml_files) check_extra_link_classes(all_parsed_yaml_files) check_correctness_of_list_of_sensors_operators_hook_trigger_modules(all_parsed_yaml_files)