From 8f57de92f27c1626ecf4ffb37c5aee4ccd882179 Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Wed, 12 Feb 2025 10:16:15 -0800 Subject: [PATCH 01/29] Draft Common Message Queue Here is a very early draft PR to introduce and socialize the concept of a "common message queue" abstraction similar to the "Common SQL" and "Common IO" abstractions in Airflow. This will be a provider package similar to those and is intended to be an abstraction over Apache Kafka, Amazon SQL, and Google PubSub to begin with. It can then be expanded to other messaging systems based on community adoption. The initial goal with this is to provide a simple abstraction for integrating Event Driven Scheduling coming with Airflow 3 to message notification systems such as Kafka, currently being used to publish data availability. At this stage, this is very much a WIP draft intended to solicit input from the community. --- providers/common/msgq/README.rst | 75 ++++++++++++ .../providers/common/msgq/hooks/msg_queue.py | 81 +++++++++++++ .../common/msgq/operators/msg_queue.py | 109 ++++++++++++++++++ .../common/msgq/sensors/msg_queue.py | 0 4 files changed, 265 insertions(+) create mode 100644 providers/common/msgq/README.rst create mode 100644 providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py create mode 100644 providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py create mode 100644 providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py diff --git a/providers/common/msgq/README.rst b/providers/common/msgq/README.rst new file mode 100644 index 0000000000000..02e764c2bef88 --- /dev/null +++ b/providers/common/msgq/README.rst @@ -0,0 +1,75 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + .. NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN! + + .. IF YOU WANT TO MODIFY TEMPLATE FOR THIS FILE, YOU SHOULD MODIFY THE TEMPLATE + `PROVIDER_README_TEMPLATE.rst.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY + + +Package ``apache-airflow-providers-common-msgq`` + +Release: ``0.1.0`` + + + +Provider package +---------------- + +This is a provider package for ``common.msgq`` provider. All classes for this provider package +are in ``airflow.providers.common.msgq`` python package. + +This provider package is intended to serve as a common abstraction on top of popular message queue +providers such as Apache Kafka, Amazon SQS, and Google PubSub. + +The expectation is that a common provider abstraction for message queues is especially useful for +Event Driven Scheduling, which is being introduced as part of Airflow 3.0. Based on conversations +with users of Apache Airflow, these Events for publishing of Data Assets are very often broadcast +over a publish and subscribe mechanism. The underlying technology used for this publish and subscribe +mechanism varies by environment, but Apache Kafka, Amazon SQS, and Google PubSub are commonly used. + + +.. You can find package information and changelog for the provider +.. in the `documentation `_. + +Installation +------------ + +You can install this package on top of an existing Airflow 3 installation (see ``Requirements`` below +for the minimum Airflow version supported) via +``pip install apache-airflow-providers-common-msgq`` + +The package supports the following python versions: 3.10,3.11,3.12 + +Requirements +------------ + +================== ================== +PIP package Version required +================== ================== +``apache-airflow`` ``>=3.0.0`` +================== ================== + +Cross provider package dependencies +----------------------------------- + +None at this time + + +The changelog for the provider package can be found in the +`changelog `_. diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py b/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py new file mode 100644 index 0000000000000..76a57e3aaeeb6 --- /dev/null +++ b/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# Centralized handling of the connection mechanism for all Message Queues +# + +from airflow.configuration import conf +from airflow.exceptions import ( + AirflowException, + AirflowOptionalProviderFeatureException, +) +from airflow.hooks.base import BaseHook + + +class MsgQueueHook(BaseHook): + """ + Abstract base class for all Message Queue Hooks. + """ + + # Typical parameters below + + # Override to provide connection name + conn_name_attr: str + # Override to have a default connection id for a particular msg queue + default_conn_name = "default_conn_id" + # Connection type - for types of message queues + conn_type = "kafka" + # Hook name + hook_name = "Apache Kafka" + + def __init__(self, *args, **kwargs): + super().__init__() + if not self.conn_name_attr: + raise AirflowException("conn_name_attr is not defined") + elif len(args) == 1: + setattr(self, self.conn_name_attr, args[0]) + elif self.conn_name_attr not in kwargs: + setattr(self, self.conn_name_attr, self.default_conn_name) + else: + setattr(self, self.conn_name_attr, kwargs[self.conn_name_attr]) + + def get_conn_id(self) -> str: + return getattr(self, self.conn_name_attr) + + + def get_conn(self) -> Any: + """Return a connection object.""" + queue = self.connection + if self.connector is None: + raise RuntimeError(f"{type(self).__name__} didn't have `self.connector` set!") + return self.connector.connect(host=queue.host, port=queue.port, username=queue.login) + + +class MsgQueueConsumerHook(MsgQueueHook): + """ + Abstract base class hook for creating a message queue consumer. + """ + + :param connection configuration information, default to BaseHook configuration + :param topics: A list of topics to subscribe to on the message queue + + def __init__(self, topics: Sequence[str], config_id=MsgQueueHook.default_conn_name) -> None: + super().__init__(config_id=config_id) + self.topics = topics + + diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py b/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py new file mode 100644 index 0000000000000..a5481c215c340 --- /dev/null +++ b/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py @@ -0,0 +1,109 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.exceptions import AirflowException, AirflowFailException +from airflow.hooks.base import BaseHook +from airflow.models import BaseOperator + +_PROVIDERS_MATCHER = re.compile(r"airflow\.providers\.(.*?)\.hooks.*") + +_MIN_SUPPORTED_PROVIDERS_VERSION = { + "amazon": "4.1.0", + "apache.kafka": "2.1.0", + "google": "8.2.0", +} + +class BaseMsgQueueOperator(BaseOperator): + """ + This is a base class for the generic Message Queue Operator to get a Queue Hook. + + The provided method is .get_queue_hook(). The default behavior will try to + retrieve the Queue hook based on connection type. + You can customize the behavior by overriding the .get_queue_hook() method. + + :param conn_id: reference to a specific message queue providers + """ + + conn_id_field = "conn_id" + + template_fields: Sequence[str] = ("conn_id", "message_queue", "hook_params") + + def __init__( + self, + *, + conn_id: str | None = None, + message_queue: str | None = None, + hook_params: dict | None = None, + retry_on_failure: bool = True, + **kwargs, + ): + super().__init__(**kwargs) + self.conn_id = conn_id + self.message_queue = message_queue + self.hook_params = hook_params or {} + self.retry_on_failure = retry_on_failure + + @classmethod + # TODO: can be removed once Airflow min version for this provider is 3.0.0 or higher + def get_hook(cls, conn_id: str, hook_params: dict | None = None) -> BaseHook: + """ + Return default hook for this connection id. + + :param conn_id: connection id + :param hook_params: hook parameters + :return: default hook for this connection + """ + connection = BaseHook.get_connection(conn_id) + return connection.get_hook(hook_params=hook_params) + + @cached_property + def _hook(self): + """Get MsgQueue Hook based on connection type.""" + conn_id = getattr(self, self.conn_id_field) + self.log.debug("Get connection for %s", conn_id) + hook = self.get_hook(conn_id=conn_id, hook_params=self.hook_params) + if not isinstance(hook, DbApiHook): + raise AirflowException( + f"You are trying to use `common-msgQ` with {hook.__class__.__name__}," + " but its provider does not support it. Please upgrade the provider to a version that" + " supports `common-msgQ`. The hook class should be a subclass of" + " `airflow.providers.common.msgq.hooks.msq_queue.DbApiHook`." + f" Got {hook.__class__.__name__} Hook with class hierarchy: {hook.__class__.mro()}" + ) + + if self.message_queue: + if hook.conn_type == "kafka": + hook.message_queue = self.message_queue + else: + hook.schema = self.message_queue + + return hook + + def get_db_hook(self) -> DbApiHook: + """ + Get the message_queue hook for the connection. + + :return: the message_queue hook object. + """ + return self._hook + + def _raise_exception(self, exception_string: str) -> NoReturn: + if self.retry_on_failure: + raise AirflowException(exception_string) + raise AirflowFailException(exception_string) + diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py b/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py new file mode 100644 index 0000000000000..e69de29bb2d1d From ad137fa8d819f492994732c1c3d7cec5c2643fc8 Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Wed, 12 Feb 2025 10:54:31 -0800 Subject: [PATCH 02/29] Updated the Readme for CommonMsgQueue Updated the Common Message Queue Readme with an example of an Event Driven Dag --- providers/common/msgq/README.rst | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/providers/common/msgq/README.rst b/providers/common/msgq/README.rst index 02e764c2bef88..09c126b6cbe20 100644 --- a/providers/common/msgq/README.rst +++ b/providers/common/msgq/README.rst @@ -44,6 +44,32 @@ over a publish and subscribe mechanism. The underlying technology used for this mechanism varies by environment, but Apache Kafka, Amazon SQS, and Google PubSub are commonly used. +Expected usage would be something on the lines below:: + + trigger = = MsgQueueSensorTrigger(msg_queue="https://sqs.us-east-1.amazonaws.com/722404908466/Test") + + data_asset = Asset("incoming_asset", watchers=[ + AssetWatcher(name="asset_watcher", trigger=trigger) + ) + + with DAG( + dag_id="example_message_queue_asset", + schedule=[data_asset], + catchup=False, + ): + + # Transform task which gets the data from sensor above and manipulates it with other data + # + transform_task = TransformOperator(task_id="transform_task") + + # Publish task which takes the analyzed data and makes it available + publish_task = PublishOperator(task_id="publish") + + chain(tranform_task, publish_task) + + + + .. You can find package information and changelog for the provider .. in the `documentation `_. From 653027ddcacad308c3b51ed51566e1d21577b992 Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Wed, 12 Feb 2025 11:00:05 -0800 Subject: [PATCH 03/29] Updated the message operator and sensor Updated the message queue Operator and Sensor to fix an issue in my sync --- .../common/msgq/operators/msg_queue.py | 46 ++++++++--------- .../common/msgq/sensors/msg_queue.py | 50 +++++++++++++++++++ 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py b/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py index a5481c215c340..db4c58c140b7b 100644 --- a/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py +++ b/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py @@ -16,9 +16,15 @@ # specific language governing permissions and limitations # under the License. +import re +from collections.abc import Sequence +from functools import cached_property +from typing import Any, NoReturn + from airflow.exceptions import AirflowException, AirflowFailException from airflow.hooks.base import BaseHook from airflow.models import BaseOperator +from airflow.providers.common.msgq.hooks.msg_queue import MsgQueueHook _PROVIDERS_MATCHER = re.compile(r"airflow\.providers\.(.*?)\.hooks.*") @@ -58,18 +64,6 @@ def __init__( self.hook_params = hook_params or {} self.retry_on_failure = retry_on_failure - @classmethod - # TODO: can be removed once Airflow min version for this provider is 3.0.0 or higher - def get_hook(cls, conn_id: str, hook_params: dict | None = None) -> BaseHook: - """ - Return default hook for this connection id. - - :param conn_id: connection id - :param hook_params: hook parameters - :return: default hook for this connection - """ - connection = BaseHook.get_connection(conn_id) - return connection.get_hook(hook_params=hook_params) @cached_property def _hook(self): @@ -77,12 +71,12 @@ def _hook(self): conn_id = getattr(self, self.conn_id_field) self.log.debug("Get connection for %s", conn_id) hook = self.get_hook(conn_id=conn_id, hook_params=self.hook_params) - if not isinstance(hook, DbApiHook): + if not isinstance(hook, MsgQueueHook): raise AirflowException( - f"You are trying to use `common-msgQ` with {hook.__class__.__name__}," + f"You are trying to use `common-msgq` with {hook.__class__.__name__}," " but its provider does not support it. Please upgrade the provider to a version that" - " supports `common-msgQ`. The hook class should be a subclass of" - " `airflow.providers.common.msgq.hooks.msq_queue.DbApiHook`." + " supports `common-msgq`. The hook class should be a subclass of" + " `airflow.providers.common.msgq.hooks.msq_queue.MsqQueueHook`." f" Got {hook.__class__.__name__} Hook with class hierarchy: {hook.__class__.mro()}" ) @@ -94,16 +88,22 @@ def _hook(self): return hook - def get_db_hook(self) -> DbApiHook: - """ - Get the message_queue hook for the connection. - - :return: the message_queue hook object. - """ - return self._hook + def _raise_exception(self, exception_string: str) -> NoReturn: if self.retry_on_failure: raise AirflowException(exception_string) raise AirflowFailException(exception_string) +class MsqQueuePublishOperator(BaseMsgQueueOperator): + """ + Publish something onto a message queue. + + :param topic + :param message + """ + def publish(self, message, topic) -> None: + # Publish the specified message, with the topic on the message queue + + return + diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py b/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py index e69de29bb2d1d..24d3846039192 100644 --- a/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py +++ b/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import re +from collections.abc import Sequence +from functools import cached_property +from typing import Any, NoReturn + +from airflow.exceptions import AirflowException, AirflowFailException +from airflow.hooks.base import BaseHook +from airflow.models import BaseOperator +from airflow.providers.common.msgq.hooks.msg_queue import MsgQueueHook +from airflow.providers.common.msgq.operators.msg_queue import BaseMsgQueueOperator + +class WaitForMessageTriggerFunction(BaseMsgQueueOperator): + """ + Defer until a message for a particular topic is published on the message queue + + When the message arrives, + - get the data for the particular message and post it to XCom + - Alternatively, trigger a registered function, need more input here + + Resume waiting for the next message + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init(**kwargs) + + def execute(self, context, event=None) -> Any: + """ + Need to insert the processing here based on the decision above + """ + + return event + From 45a4e5cf6118395d7ee80b278293049704ff0c9d Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Wed, 12 Feb 2025 13:24:59 -0800 Subject: [PATCH 04/29] Changed Sensor to Trigger Changed the Message Queue Sensor Operator to be a Deferrable Trigger --- .../common/msgq/{sensors => triggers}/msg_queue.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) rename providers/common/msgq/src/airflow/providers/common/msgq/{sensors => triggers}/msg_queue.py (86%) diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py b/providers/common/msgq/src/airflow/providers/common/msgq/triggers/msg_queue.py similarity index 86% rename from providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py rename to providers/common/msgq/src/airflow/providers/common/msgq/triggers/msg_queue.py index 24d3846039192..5b62fdf26d95e 100644 --- a/providers/common/msgq/src/airflow/providers/common/msgq/sensors/msg_queue.py +++ b/providers/common/msgq/src/airflow/providers/common/msgq/triggers/msg_queue.py @@ -17,17 +17,17 @@ # under the License. import re -from collections.abc import Sequence +from collections.abc import Sequence, AsyncIterator from functools import cached_property from typing import Any, NoReturn from airflow.exceptions import AirflowException, AirflowFailException from airflow.hooks.base import BaseHook from airflow.models import BaseOperator +from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.providers.common.msgq.hooks.msg_queue import MsgQueueHook -from airflow.providers.common.msgq.operators.msg_queue import BaseMsgQueueOperator -class WaitForMessageTriggerFunction(BaseMsgQueueOperator): +class MessageQueueTrigger(BaseTrigger): """ Defer until a message for a particular topic is published on the message queue @@ -41,10 +41,9 @@ class WaitForMessageTriggerFunction(BaseMsgQueueOperator): def __init__(self, **kwargs: Any) -> None: super().__init(**kwargs) - def execute(self, context, event=None) -> Any: + async def run(self) -> AsyncIterator[TriggerEvent]: """ Need to insert the processing here based on the decision above """ - return event From 514248f3749e1ceb0d6f8f8f75c32ee3aaae82f7 Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Wed, 12 Feb 2025 16:20:47 -0800 Subject: [PATCH 05/29] Fixed typos and imports Fixed typos and import errors in the MsgQueueHook --- .../src/airflow/providers/common/msgq/hooks/msg_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py b/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py index 76a57e3aaeeb6..d50ea42b7ad47 100644 --- a/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py +++ b/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py @@ -19,6 +19,9 @@ # Centralized handling of the connection mechanism for all Message Queues # +from collections.abc import Sequence +from typing import Any, NoReturn + from airflow.configuration import conf from airflow.exceptions import ( AirflowException, @@ -69,10 +72,10 @@ def get_conn(self) -> Any: class MsgQueueConsumerHook(MsgQueueHook): """ Abstract base class hook for creating a message queue consumer. - """ :param connection configuration information, default to BaseHook configuration :param topics: A list of topics to subscribe to on the message queue + """ def __init__(self, topics: Sequence[str], config_id=MsgQueueHook.default_conn_name) -> None: super().__init__(config_id=config_id) From 9a2b66c378a61fa1b68ac2d5302a036f641531c6 Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Thu, 13 Feb 2025 14:55:48 -0800 Subject: [PATCH 06/29] Updated README to fix typo Updated invocation of MsqQueueSensorTrigger to MsgQueueTrigger in example invocation --- providers/common/msgq/README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/common/msgq/README.rst b/providers/common/msgq/README.rst index 09c126b6cbe20..076600f0732e2 100644 --- a/providers/common/msgq/README.rst +++ b/providers/common/msgq/README.rst @@ -46,7 +46,7 @@ mechanism varies by environment, but Apache Kafka, Amazon SQS, and Google PubSub Expected usage would be something on the lines below:: - trigger = = MsgQueueSensorTrigger(msg_queue="https://sqs.us-east-1.amazonaws.com/722404908466/Test") + trigger = = MsgQueueTrigger(msg_queue="https://sqs.us-east-1.amazonaws.com/722404908466/Test") data_asset = Asset("incoming_asset", watchers=[ AssetWatcher(name="asset_watcher", trigger=trigger) From 536cc79a77686ba29dc0a663a39f6b7c9bb7d3d0 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Fri, 21 Feb 2025 16:44:05 -0500 Subject: [PATCH 07/29] Rename `msgq` to `messaging` --- providers/common/{msgq => messaging}/README.rst | 0 .../src/airflow/providers/common/messaging}/hooks/msg_queue.py | 0 .../airflow/providers/common/messaging}/operators/msg_queue.py | 0 .../src/airflow/providers/common/messaging}/triggers/msg_queue.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename providers/common/{msgq => messaging}/README.rst (100%) rename providers/common/{msgq/src/airflow/providers/common/msgq => messaging/src/airflow/providers/common/messaging}/hooks/msg_queue.py (100%) rename providers/common/{msgq/src/airflow/providers/common/msgq => messaging/src/airflow/providers/common/messaging}/operators/msg_queue.py (100%) rename providers/common/{msgq/src/airflow/providers/common/msgq => messaging/src/airflow/providers/common/messaging}/triggers/msg_queue.py (100%) diff --git a/providers/common/msgq/README.rst b/providers/common/messaging/README.rst similarity index 100% rename from providers/common/msgq/README.rst rename to providers/common/messaging/README.rst diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py b/providers/common/messaging/src/airflow/providers/common/messaging/hooks/msg_queue.py similarity index 100% rename from providers/common/msgq/src/airflow/providers/common/msgq/hooks/msg_queue.py rename to providers/common/messaging/src/airflow/providers/common/messaging/hooks/msg_queue.py diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py b/providers/common/messaging/src/airflow/providers/common/messaging/operators/msg_queue.py similarity index 100% rename from providers/common/msgq/src/airflow/providers/common/msgq/operators/msg_queue.py rename to providers/common/messaging/src/airflow/providers/common/messaging/operators/msg_queue.py diff --git a/providers/common/msgq/src/airflow/providers/common/msgq/triggers/msg_queue.py b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py similarity index 100% rename from providers/common/msgq/src/airflow/providers/common/msgq/triggers/msg_queue.py rename to providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py From ff6f5c857af6f6bf9cf6e8e659789e8dda859998 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Fri, 21 Feb 2025 18:02:39 -0500 Subject: [PATCH 08/29] Implement `MessageQueueTrigger` --- .../providers/amazon/aws/triggers/sqs.py | 4 +- .../providers/common/messaging/__init__.py | 39 ++++++++++ .../common/messaging/hooks/msg_queue.py | 18 ++--- .../common/messaging/operators/msg_queue.py | 13 ++-- .../common/messaging/triggers/__init__.py | 16 +++++ .../common/messaging/triggers/msg_queue.py | 72 +++++++++++++------ 6 files changed, 121 insertions(+), 41 deletions(-) create mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/__init__.py create mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/triggers/__init__.py diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/sqs.py b/providers/amazon/src/airflow/providers/amazon/aws/triggers/sqs.py index 28c0b509d28c0..032ff984fd451 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/sqs.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/sqs.py @@ -23,14 +23,14 @@ from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.hooks.sqs import SqsHook from airflow.providers.amazon.aws.utils.sqs import process_response -from airflow.triggers.base import BaseTrigger, TriggerEvent +from airflow.triggers.base import BaseEventTrigger, TriggerEvent if TYPE_CHECKING: from airflow.providers.amazon.aws.hooks.base_aws import BaseAwsConnection from airflow.providers.amazon.aws.utils.sqs import MessageFilteringType -class SqsSensorTrigger(BaseTrigger): +class SqsSensorTrigger(BaseEventTrigger): """ Asynchronously get messages from an Amazon SQS queue and then delete the messages from the queue. diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py b/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py new file mode 100644 index 0000000000000..a64fb6cb2450c --- /dev/null +++ b/providers/common/messaging/src/airflow/providers/common/messaging/__init__.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE +# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES. +# +# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE +# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY +# +from __future__ import annotations + +import packaging.version + +from airflow import __version__ as airflow_version + +__all__ = ["__version__"] + +__version__ = "0.0.1" + +if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( + "2.10.3" +): + raise RuntimeError( + f"The package `apache-airflow-providers-common-messaging:{__version__}` needs Apache Airflow 2.10.3+" + ) diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/hooks/msg_queue.py b/providers/common/messaging/src/airflow/providers/common/messaging/hooks/msg_queue.py index d50ea42b7ad47..2ec56c428a17b 100644 --- a/providers/common/messaging/src/airflow/providers/common/messaging/hooks/msg_queue.py +++ b/providers/common/messaging/src/airflow/providers/common/messaging/hooks/msg_queue.py @@ -16,23 +16,22 @@ # under the License. # -# Centralized handling of the connection mechanism for all Message Queues -# +# Centralized handling of the connection mechanism for all Message Queues +# +from __future__ import annotations from collections.abc import Sequence -from typing import Any, NoReturn +from typing import Any -from airflow.configuration import conf from airflow.exceptions import ( AirflowException, - AirflowOptionalProviderFeatureException, ) from airflow.hooks.base import BaseHook -class MsgQueueHook(BaseHook): +class MsgQueueHook(BaseHook): """ - Abstract base class for all Message Queue Hooks. + Abstract base class for all Message Queue Hooks. """ # Typical parameters below @@ -60,7 +59,6 @@ def __init__(self, *args, **kwargs): def get_conn_id(self) -> str: return getattr(self, self.conn_name_attr) - def get_conn(self) -> Any: """Return a connection object.""" queue = self.connection @@ -71,7 +69,7 @@ def get_conn(self) -> Any: class MsgQueueConsumerHook(MsgQueueHook): """ - Abstract base class hook for creating a message queue consumer. + Abstract base class hook for creating a message queue consumer. :param connection configuration information, default to BaseHook configuration :param topics: A list of topics to subscribe to on the message queue @@ -80,5 +78,3 @@ class MsgQueueConsumerHook(MsgQueueHook): def __init__(self, topics: Sequence[str], config_id=MsgQueueHook.default_conn_name) -> None: super().__init__(config_id=config_id) self.topics = topics - - diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/operators/msg_queue.py b/providers/common/messaging/src/airflow/providers/common/messaging/operators/msg_queue.py index db4c58c140b7b..c11b0f61b95d4 100644 --- a/providers/common/messaging/src/airflow/providers/common/messaging/operators/msg_queue.py +++ b/providers/common/messaging/src/airflow/providers/common/messaging/operators/msg_queue.py @@ -15,14 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations import re from collections.abc import Sequence from functools import cached_property -from typing import Any, NoReturn +from typing import NoReturn from airflow.exceptions import AirflowException, AirflowFailException -from airflow.hooks.base import BaseHook from airflow.models import BaseOperator from airflow.providers.common.msgq.hooks.msg_queue import MsgQueueHook @@ -34,6 +34,7 @@ "google": "8.2.0", } + class BaseMsgQueueOperator(BaseOperator): """ This is a base class for the generic Message Queue Operator to get a Queue Hook. @@ -64,7 +65,6 @@ def __init__( self.hook_params = hook_params or {} self.retry_on_failure = retry_on_failure - @cached_property def _hook(self): """Get MsgQueue Hook based on connection type.""" @@ -88,22 +88,21 @@ def _hook(self): return hook - - def _raise_exception(self, exception_string: str) -> NoReturn: if self.retry_on_failure: raise AirflowException(exception_string) raise AirflowFailException(exception_string) + class MsqQueuePublishOperator(BaseMsgQueueOperator): """ - Publish something onto a message queue. + Publish something onto a message queue. :param topic :param message """ + def publish(self, message, topic) -> None: # Publish the specified message, with the topic on the message queue return - diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/__init__.py b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py index 5b62fdf26d95e..1aeb1eef14471 100644 --- a/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py +++ b/providers/common/messaging/src/airflow/providers/common/messaging/triggers/msg_queue.py @@ -15,35 +15,65 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations import re -from collections.abc import Sequence, AsyncIterator +from collections.abc import AsyncIterator from functools import cached_property -from typing import Any, NoReturn +from typing import Any -from airflow.exceptions import AirflowException, AirflowFailException -from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator -from airflow.triggers.base import BaseTrigger, TriggerEvent -from airflow.providers.common.msgq.hooks.msg_queue import MsgQueueHook +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.triggers.sqs import SqsSensorTrigger +from airflow.triggers.base import BaseEventTrigger, TriggerEvent -class MessageQueueTrigger(BaseTrigger): + +def is_sqs_queue(queue: str) -> bool: + return bool(re.match(r"^https://sqs\.[^.]+\.amazonaws\.com/[0-9]+/.+", queue)) + + +# This list defines the supported providers. Each item in the list is a tuple containing: +# 1. A function (Callable) that checks if a given queue (string) matches a specific provider's pattern. +# 2. The corresponding trigger to use when the function returns True. +# +# The function that checks whether a queue matches a provider's pattern must be as specific as possible to +# avoid collision. Functions in this list should NOT overlap with each other in their matching criteria. +# To add support for a new provider in `MessageQueueTrigger`, add a new entry to this list. +MESSAGE_QUEUE_PROVIDERS = [(is_sqs_queue, SqsSensorTrigger)] + + +class MessageQueueTrigger(BaseEventTrigger): """ - Defer until a message for a particular topic is published on the message queue - - When the message arrives, - - get the data for the particular message and post it to XCom - - Alternatively, trigger a registered function, need more input here + ``MessageQueueTrigger`` serves as a unified trigger for monitoring message queues from different providers. + + It abstracts away provider-specific details, allowing users to monitor a queue with a single trigger, + regardless of the underlying provider. + + This makes it easy to switch providers without modifying the trigger. - Resume waiting for the next message + :param queue: The queue identifier """ - def __init__(self, **kwargs: Any) -> None: - super().__init(**kwargs) + def __init__(self, *, queue: str, **kwargs: Any) -> None: + self.queue = queue + self.kwargs = kwargs + + @cached_property + def trigger(self) -> BaseEventTrigger: + triggers = [trigger[1] for trigger in MESSAGE_QUEUE_PROVIDERS if trigger[0](self.queue)] + if len(triggers) == 0: + raise ValueError(f"The queue '{self.queue}' is not recognized by ``MessageQueueTrigger``.") + if len(triggers) > 1: + self.log.error( + "The queue '%s' is recognized by more than one trigger. " + "At least two functions in ``MESSAGE_QUEUE_PROVIDERS`` are colliding with each " + "other.", + self.queue, + ) + raise AirflowException(f"The queue '{self.queue}' is recognized by more than one trigger.") + return triggers[1](**self.kwargs) + + def serialize(self) -> tuple[str, dict[str, Any]]: + return self.trigger.serialize() async def run(self) -> AsyncIterator[TriggerEvent]: - """ - Need to insert the processing here based on the decision above - """ - - + return self.trigger.run() From 76317bc7835f87c983f198ab7f2f199d2d136259 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Tue, 25 Feb 2025 14:58:53 -0500 Subject: [PATCH 09/29] Create provider files + create base class for providers --- INSTALL | 15 +- .../12_airflow_dependencies_and_extras.rst | 15 +- dev/breeze/doc/images/output_build-docs.svg | 90 ++++---- dev/breeze/doc/images/output_build-docs.txt | 2 +- ...release-management_add-back-references.svg | 48 +++-- ...release-management_add-back-references.txt | 2 +- ...ement_generate-issue-content-providers.svg | 52 ++--- ...ement_generate-issue-content-providers.txt | 2 +- ...agement_prepare-provider-documentation.svg | 76 +++---- ...agement_prepare-provider-documentation.txt | 2 +- ...e-management_prepare-provider-packages.svg | 88 ++++---- ...e-management_prepare-provider-packages.txt | 2 +- ...output_release-management_publish-docs.svg | 86 ++++---- ...output_release-management_publish-docs.txt | 2 +- ...t_sbom_generate-providers-requirements.svg | 16 +- ...t_sbom_generate-providers-requirements.txt | 2 +- generated/provider_dependencies.json | 12 ++ providers/common/messaging/README.rst | 108 ++++------ providers/common/messaging/docs/changelog.rst | 35 +++ providers/common/messaging/docs/commits.rst | 18 ++ providers/common/messaging/docs/index.rst | 117 ++++++++++ .../installing-providers-from-sources.rst | 18 ++ providers/common/messaging/docs/security.rst | 18 ++ providers/common/messaging/provider.yaml | 28 +++ providers/common/messaging/pyproject.toml | 84 ++++++++ .../providers/common/messaging/LICENSE | 201 ++++++++++++++++++ .../providers/common/messaging/__init__.py | 6 +- .../common/messaging/get_provider_info.py | 34 +++ .../common/messaging/hooks/msg_queue.py | 80 ------- .../common/messaging/operators/msg_queue.py | 108 ---------- .../common/messaging/providers/__init__.py | 21 ++ .../messaging/providers/base_provider.py | 55 +++++ .../common/messaging/providers/sqs.py | 41 ++++ .../common/messaging/triggers/msg_queue.py | 31 +-- pyproject.toml | 18 +- scripts/ci/docker-compose/remove-sources.yml | 1 + scripts/ci/docker-compose/tests-sources.yml | 1 + 37 files changed, 1014 insertions(+), 521 deletions(-) create mode 100644 providers/common/messaging/docs/changelog.rst create mode 100644 providers/common/messaging/docs/commits.rst create mode 100644 providers/common/messaging/docs/index.rst create mode 100644 providers/common/messaging/docs/installing-providers-from-sources.rst create mode 100644 providers/common/messaging/docs/security.rst create mode 100644 providers/common/messaging/provider.yaml create mode 100644 providers/common/messaging/pyproject.toml create mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/LICENSE create mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/get_provider_info.py delete mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/hooks/msg_queue.py delete mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/operators/msg_queue.py create mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/providers/__init__.py create mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/providers/base_provider.py create mode 100644 providers/common/messaging/src/airflow/providers/common/messaging/providers/sqs.py diff --git a/INSTALL b/INSTALL index 2a83248ad68b6..55c0aa5a6435b 100644 --- a/INSTALL +++ b/INSTALL @@ -232,13 +232,14 @@ or dependencies that are necessary to enable the feature in an editable build. airbyte, alibaba, amazon, apache.beam, apache.cassandra, apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.iceberg, apache.impala, apache.kafka, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apprise, arangodb, asana, atlassian.jira, celery, cloudant, -cncf.kubernetes, cohere, common.compat, common.io, common.sql, databricks, datadog, dbt.cloud, -dingding, discord, docker, edge, elasticsearch, exasol, fab, facebook, ftp, github, google, grpc, -hashicorp, http, imap, influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.psrp, -microsoft.winrm, mongo, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, -oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, -samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, standard, tableau, -telegram, teradata, trino, vertica, weaviate, yandex, ydb, zendesk +cncf.kubernetes, cohere, common.compat, common.io, common.messaging, common.sql, databricks, +datadog, dbt.cloud, dingding, discord, docker, edge, elasticsearch, exasol, fab, facebook, ftp, +github, google, grpc, hashicorp, http, imap, influxdb, jdbc, jenkins, microsoft.azure, +microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mysql, neo4j, odbc, openai, openfaas, +openlineage, opensearch, opsgenie, oracle, pagerduty, papermill, pgvector, pinecone, postgres, +presto, qdrant, redis, salesforce, samba, segment, sendgrid, sftp, singularity, slack, smtp, +snowflake, sqlite, ssh, standard, tableau, telegram, teradata, trino, vertica, weaviate, yandex, +ydb, zendesk # END PROVIDER EXTRAS HERE diff --git a/contributing-docs/12_airflow_dependencies_and_extras.rst b/contributing-docs/12_airflow_dependencies_and_extras.rst index b21358db017f4..aedfeff388354 100644 --- a/contributing-docs/12_airflow_dependencies_and_extras.rst +++ b/contributing-docs/12_airflow_dependencies_and_extras.rst @@ -180,13 +180,14 @@ or dependencies that are necessary to enable the feature in editable build. airbyte, alibaba, amazon, apache.beam, apache.cassandra, apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.iceberg, apache.impala, apache.kafka, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apprise, arangodb, asana, atlassian.jira, celery, cloudant, -cncf.kubernetes, cohere, common.compat, common.io, common.sql, databricks, datadog, dbt.cloud, -dingding, discord, docker, edge, elasticsearch, exasol, fab, facebook, ftp, github, google, grpc, -hashicorp, http, imap, influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.psrp, -microsoft.winrm, mongo, mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, -oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, -samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, standard, tableau, -telegram, teradata, trino, vertica, weaviate, yandex, ydb, zendesk +cncf.kubernetes, cohere, common.compat, common.io, common.messaging, common.sql, databricks, +datadog, dbt.cloud, dingding, discord, docker, edge, elasticsearch, exasol, fab, facebook, ftp, +github, google, grpc, hashicorp, http, imap, influxdb, jdbc, jenkins, microsoft.azure, +microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mysql, neo4j, odbc, openai, openfaas, +openlineage, opensearch, opsgenie, oracle, pagerduty, papermill, pgvector, pinecone, postgres, +presto, qdrant, redis, salesforce, samba, segment, sendgrid, sftp, singularity, slack, smtp, +snowflake, sqlite, ssh, standard, tableau, telegram, teradata, trino, vertica, weaviate, yandex, +ydb, zendesk .. END PROVIDER EXTRAS HERE diff --git a/dev/breeze/doc/images/output_build-docs.svg b/dev/breeze/doc/images/output_build-docs.svg index 253298953d49c..115920257c9d6 100644 --- a/dev/breeze/doc/images/output_build-docs.svg +++ b/dev/breeze/doc/images/output_build-docs.svg @@ -1,4 +1,4 @@ - +