diff --git a/providers/redis/any/dag_id=dag_for_testing_redis_task_handler/run_id=test/task_id=task_for_testing_redis_log_handler/attempt=1.log b/providers/redis/any/dag_id=dag_for_testing_redis_task_handler/run_id=test/task_id=task_for_testing_redis_log_handler/attempt=1.log new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/providers/redis/docs/index.rst b/providers/redis/docs/index.rst index b0e042ddc4823..78bb169b96271 100644 --- a/providers/redis/docs/index.rst +++ b/providers/redis/docs/index.rst @@ -36,6 +36,7 @@ Connection types Logging + Message Queues Triggers .. toctree:: diff --git a/providers/redis/docs/message-queues.rst b/providers/redis/docs/message-queues.rst new file mode 100644 index 0000000000000..84614e9a1f483 --- /dev/null +++ b/providers/redis/docs/message-queues.rst @@ -0,0 +1,72 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Redis Message Queue +=================== + + +Redis Queue Provider +-------------------- + +Implemented by :class:`~airflow.providers.redis.queues.redis.RedisPubSubMessageQueueProvider` + + +The Redis Queue Provider is a message queue provider that uses +Redis as the underlying message queue system. +It allows you to send and receive messages using Redis in your Airflow workflows. +The provider supports Redis channels. + +The queue must be matching this regex: + +.. exampleinclude::/../src/airflow/providers/redis/queues/redis.py + :language: python + :dedent: 0 + :start-after: [START queue_regexp] + :end-before: [END queue_regexp] + +Queue URI Format: + +.. code-block:: text + + redis://:/ + +Where: + +- ``host``: Redis server hostname +- ``port``: Redis server port +- ``channel_list``: Comma-separated list of Redis channels to subscribe to + +The ``queue`` parameter is used to configure the underlying +:class:`~airflow.providers.redis.triggers.redis_await_message.AwaitMessageTrigger` class and +passes all kwargs directly to the trigger constructor, if provided. + +Channels can also be specified via the Queue URI instead of the ``channels`` kwarg. The provider will extract channels from the URI as follows: + +.. exampleinclude:: /../src/airflow/providers/redis/queues/redis.py + :language: python + :dedent: 0 + :start-after: [START extract_channels] + :end-before: [END extract_channels] + + +Below is an example of how you can configure an Airflow DAG to be triggered by a message in Redis. + +.. exampleinclude:: /../tests/system/redis/example_dag_message_queue_trigger.py + :language: python + :dedent: 0 + :start-after: [START howto_trigger_message_queue] + :end-before: [END howto_trigger_message_queue] diff --git a/providers/redis/provider.yaml b/providers/redis/provider.yaml index ffa3d102cba3a..b5315fad36f23 100644 --- a/providers/redis/provider.yaml +++ b/providers/redis/provider.yaml @@ -68,6 +68,9 @@ operators: python-modules: - airflow.providers.redis.operators.redis_publish +queues: + - airflow.providers.redis.queues.redis.RedisPubSubMessageQueueProvider + sensors: - integration-name: Redis python-modules: diff --git a/providers/redis/pyproject.toml b/providers/redis/pyproject.toml index 8a91065b9c078..579ec414abd48 100644 --- a/providers/redis/pyproject.toml +++ b/providers/redis/pyproject.toml @@ -62,11 +62,19 @@ dependencies = [ "redis>=4.5.2,!=4.5.5,!=5.0.2", ] +# The optional dependencies should be modified in place in the generated file +# Any change in the dependencies is preserved when the file is regenerated +[project.optional-dependencies] +"common.messaging" = [ + "apache-airflow-providers-common-messaging>=1.0.3" +] + [dependency-groups] dev = [ "apache-airflow", "apache-airflow-task-sdk", "apache-airflow-devel-common", + "apache-airflow-providers-common-messaging", # Additional devel dependencies (do not remove this line and add extra development dependencies) ] diff --git a/providers/redis/src/airflow/providers/redis/get_provider_info.py b/providers/redis/src/airflow/providers/redis/get_provider_info.py index 8ee4dd23637bc..aeca9e7119bae 100644 --- a/providers/redis/src/airflow/providers/redis/get_provider_info.py +++ b/providers/redis/src/airflow/providers/redis/get_provider_info.py @@ -40,6 +40,7 @@ def get_provider_info(): "python-modules": ["airflow.providers.redis.operators.redis_publish"], } ], + "queues": ["airflow.providers.redis.queues.redis.RedisPubSubMessageQueueProvider"], "sensors": [ { "integration-name": "Redis", diff --git a/providers/redis/src/airflow/providers/redis/queues/__init__.py b/providers/redis/src/airflow/providers/redis/queues/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/redis/src/airflow/providers/redis/queues/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/redis/src/airflow/providers/redis/queues/redis.py b/providers/redis/src/airflow/providers/redis/queues/redis.py new file mode 100644 index 0000000000000..b36cdc1ad5b49 --- /dev/null +++ b/providers/redis/src/airflow/providers/redis/queues/redis.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import re +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider +from airflow.providers.redis.triggers.redis_await_message import AwaitMessageTrigger + +if TYPE_CHECKING: + from airflow.triggers.base import BaseEventTrigger + +# [START queue_regexp] +QUEUE_REGEXP = r"^redis\+pubsub://" +# [END queue_regexp] + + +class RedisPubSubMessageQueueProvider(BaseMessageQueueProvider): + """ + Configuration for Redis integration with common-messaging. + + It uses the ``redis+pubsub://`` URI scheme for identifying Redis queues. + + **URI Format**: + + .. code-block:: text + + redis+pubsub://:/ + + Where: + + * ``host``: Redis server hostname + * ``port``: Redis server port + * ``channel_list``: Comma-separated list of Redis channels to subscribe to + + **Examples**: + + .. code-block:: text + + redis+pubsub://localhost:6379/my_channel + + You can also provide ``channels`` directly in kwargs instead of in the URI. + + .. code-block:: python + + from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger + + trigger = MessageQueueTrigger(queue="redis+pubsub://localhost:6379/test") + + For a complete example, see: + :mod:`tests.system.redis.example_dag_message_queue_trigger` + """ + + def queue_matches(self, queue: str) -> bool: + return bool(re.match(QUEUE_REGEXP, queue)) + + def trigger_class(self) -> type[BaseEventTrigger]: + return AwaitMessageTrigger # type: ignore[return-value] + + def trigger_kwargs(self, queue: str, **kwargs) -> dict: + # [START extract_channels] + # Parse the queue URI + parsed = urlparse(queue) + # Extract channels (after host and port) + # parsed.path starts with a '/', so strip it + raw_channels = parsed.path.lstrip("/") + channels = raw_channels.split(",") if raw_channels else [] + # [END extract_channels] + + if not channels and "channels" not in kwargs: + raise ValueError( + "channels is required in RedisPubSubMessageQueueProvider kwargs or provide them in the queue URI" + ) + + return {} if "channels" in kwargs else {"channels": channels} diff --git a/providers/redis/tests/integration/redis/queues/__init__.py b/providers/redis/tests/integration/redis/queues/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/redis/tests/integration/redis/queues/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/redis/tests/integration/redis/queues/test_redis_pubsub_message_queue.py b/providers/redis/tests/integration/redis/queues/test_redis_pubsub_message_queue.py new file mode 100644 index 0000000000000..ae5902b50f656 --- /dev/null +++ b/providers/redis/tests/integration/redis/queues/test_redis_pubsub_message_queue.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import time + +import pytest + +from airflow.providers.redis.hooks.redis import RedisHook +from airflow.providers.redis.queues.redis import RedisPubSubMessageQueueProvider + + +@pytest.mark.integration("redis") +class TestRedisPubSubMessageQueueProviderIntegration: + def setup_method(self): + self.redis_hook = RedisHook(redis_conn_id="redis_default") + self.redis = self.redis_hook.get_conn() + self.provider = RedisPubSubMessageQueueProvider() + self.channel = "test_pubsub_channel" + + def test_pubsub_send_and_receive(self): + pubsub = self.redis.pubsub() + pubsub.subscribe(self.channel) + + test_message = "airflow-pubsub-integration-message" + self.redis.publish(self.channel, test_message) + + received = None + for _ in range(10): + message = pubsub.get_message() + if message and message["type"] == "message": + received = message["data"] + break + time.sleep(0.1) + + assert received == test_message.encode(), f"Expected {test_message!r}, got {received!r}" + + pubsub.unsubscribe(self.channel) + + def test_queue_matches(self): + assert self.provider.queue_matches(f"redis+pubsub://localhost:6379/{self.channel}") + + def test_trigger_kwargs(self): + uri = f"redis+pubsub://localhost:6379/{self.channel}" + kwargs = self.provider.trigger_kwargs(uri) + assert kwargs == {"channels": [self.channel]} diff --git a/providers/redis/tests/system/redis/example_dag_message_queue_trigger.py b/providers/redis/tests/system/redis/example_dag_message_queue_trigger.py new file mode 100644 index 0000000000000..b7089e0d4dca9 --- /dev/null +++ b/providers/redis/tests/system/redis/example_dag_message_queue_trigger.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +# [START howto_trigger_message_queue] +from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger +from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.sdk import DAG, Asset, AssetWatcher + +# Define a trigger that listens to an external message queue (Redis in this case) +trigger = MessageQueueTrigger(queue="redis+pubsub://localhost:6379/test") + +# Define an asset that watches for messages on the queue +asset = Asset("redis_queue_asset_1", watchers=[AssetWatcher(name="redis_watcher_1", trigger=trigger)]) + +with DAG(dag_id="example_redis_watcher_1", schedule=[asset]) as dag: + EmptyOperator(task_id="task_1") +# [END howto_trigger_message_queue] + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/providers/redis/tests/unit/redis/queues/__init__.py b/providers/redis/tests/unit/redis/queues/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/redis/tests/unit/redis/queues/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/redis/tests/unit/redis/queues/test_redis.py b/providers/redis/tests/unit/redis/queues/test_redis.py new file mode 100644 index 0000000000000..bf91fc35c4cdf --- /dev/null +++ b/providers/redis/tests/unit/redis/queues/test_redis.py @@ -0,0 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.providers.redis.triggers.redis_await_message import AwaitMessageTrigger + +pytest.importorskip("airflow.providers.common.messaging.providers.base_provider") + + +class TestRedisPubSubMessageQueueProvider: + """Tests for RedisPubSubMessageQueueProvider.""" + + def setup_method(self): + """Set up the test environment.""" + from airflow.providers.redis.queues.redis import RedisPubSubMessageQueueProvider + + self.provider = RedisPubSubMessageQueueProvider() + + def test_queue_create(self): + """Test the creation of the RedisPubSubMessageQueueProvider.""" + from airflow.providers.common.messaging.providers.base_provider import BaseMessageQueueProvider + + assert isinstance(self.provider, BaseMessageQueueProvider) + + @pytest.mark.parametrize( + "queue_uri, expected_result", + [ + pytest.param("redis+pubsub://localhost:6379/channel1", True, id="single_channel"), + pytest.param("redis+pubsub://localhost:6379/channel1,channel2", True, id="multiple_channels"), + pytest.param("http://example.com", False, id="http_url"), + pytest.param("not-a-url", False, id="invalid_url"), + ], + ) + def test_queue_matches(self, queue_uri, expected_result): + """Test the queue_matches method with various URLs.""" + assert self.provider.queue_matches(queue_uri) == expected_result + + def test_trigger_class(self): + """Test the trigger_class method.""" + assert self.provider.trigger_class() == AwaitMessageTrigger + + @pytest.mark.parametrize( + "queue_uri, extra_kwargs, expected_result", + [ + pytest.param( + "redis+pubsub://localhost:6379/channel1,channel2", + {"channels": ["channel1", "channel2"]}, + {}, + id="channels_from_uri", + ), + pytest.param( + "redis+pubsub://localhost:6379/", + {"channels": ["channel1", "channel2"]}, + {}, + id="channels_from_kwargs", + ), + ], + ) + def test_trigger_kwargs_valid_cases(self, queue_uri, extra_kwargs, expected_result): + """Test the trigger_kwargs method with valid parameters.""" + kwargs = self.provider.trigger_kwargs(queue_uri, **extra_kwargs) + assert kwargs == expected_result + + @pytest.mark.parametrize( + "queue_uri, extra_kwargs, expected_error, error_match", + [ + pytest.param( + "redis+pubsub://localhost:6379/", + {}, + ValueError, + "channels is required in RedisPubSubMessageQueueProvider kwargs or provide them in the queue URI", + id="missing_channels", + ), + ], + ) + def test_trigger_kwargs_error_cases(self, queue_uri, extra_kwargs, expected_error, error_match): + """Test that trigger_kwargs raises appropriate errors with invalid parameters.""" + with pytest.raises(expected_error, match=error_match): + self.provider.trigger_kwargs(queue_uri, **extra_kwargs)