Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,8 @@
# under the License.
from __future__ import annotations

import importlib

from airflow.providers_manager import ProvidersManager
from airflow.utils.deprecation_tools import add_deprecated_classes

providers_manager = ProvidersManager()
providers_manager.initialize_providers_queues()


def create_class_by_name(name: str):
module_name, class_name = name.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)


MESSAGE_QUEUE_PROVIDERS = [create_class_by_name(name)() for name in providers_manager.queue_class_names]

__deprecated_classes = {
"sqs": {
"SqsMessageQueueProvider": "airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,26 @@
# under the License.
from __future__ import annotations

import importlib
from collections.abc import AsyncIterator
from functools import cached_property
from typing import Any

from airflow.providers.common.messaging.providers import MESSAGE_QUEUE_PROVIDERS
from airflow.providers_manager import ProvidersManager
from airflow.triggers.base import BaseEventTrigger, TriggerEvent

providers_manager = ProvidersManager()
providers_manager.initialize_providers_queues()


def create_class_by_name(name: str):
module_name, class_name = name.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)


MESSAGE_QUEUE_PROVIDERS = [create_class_by_name(name)() for name in providers_manager.queue_class_names]


class MessageQueueTrigger(BaseEventTrigger):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@


@mock.patch(
"airflow.providers.common.messaging.providers.MESSAGE_QUEUE_PROVIDERS", new_callable=mock.PropertyMock
"airflow.providers.common.messaging.triggers.msg_queue.MESSAGE_QUEUE_PROVIDERS",
new_callable=mock.PropertyMock,
)
def test_provider_integrations(_):
trigger = MessageQueueTrigger(queue="any queue")
Expand Down