Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/authoring-and-scheduling/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ Airflow allows to define custom connection types. This is what is described in d
:doc:`apache-airflow-providers:index` - providers give you the capability of defining your own connections.
The connection customization can be done by any provider, but also
many of the providers managed by the community define custom connection types.
The full list of all providers delivered by ``Apache Airflow community managed providers`` can be found in
The full list of all connections delivered by ``Apache Airflow community managed providers`` can be found in
:doc:`apache-airflow-providers:core-extensions/connections`.
1 change: 1 addition & 0 deletions airflow-core/docs/core-concepts/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Here you can find detailed documentation about each one of the core concepts of
auth-manager/index
objectstorage
backfill
message-queues

**Communication**

Expand Down
41 changes: 41 additions & 0 deletions airflow-core/docs/core-concepts/message-queues.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. _concepts:message-queues:

Message Queues
==============

The Message Queues are a way to expose capability of external event-driven scheduling of Dags.

Apache Airflow is primarily designed for time-based and dependency-based scheduling of workflows. However,
modern data architectures often require near real-time processing and the ability to react to
events from various sources, such as message queues.

Airflow has native event-driven capability, allowing users to create workflows that can be
triggered by external events, thus enabling more responsive data pipelines.

Airflow supports poll-based event-driven scheduling, where the Triggerer can poll
external message queues using built-in :class:`airflow.triggers.base.BaseTrigger` classes. This allows users
to create workflows that can be triggered by external events, such as messages arriving
in a queue or changes in a database efficiently.

Airflow constantly monitors the state of an external resource and updates the asset whenever the external
resource reaches a given state (if it does reach it). To achieve this, we leverage Airflow Triggers.
Triggers are small, asynchronous pieces of Python code whose job is to poll an external resource state.

The list of supported message queues is available in :doc:`apache-airflow-providers:core-extensions/message-queues`.
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,12 @@ class GroupCommand(NamedTuple):
func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="queues",
help="Get information about queues provided",
func=lazy_load_command("airflow.cli.commands.provider_command.queues_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="notifications",
help="Get information about notifications provided",
Expand Down
13 changes: 13 additions & 0 deletions airflow-core/src/airflow/cli/commands/provider_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ def executors_list(args):
)


@suppress_logs_and_warning
@providers_configuration_loaded
def queues_list(args):
"""List all queues at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().queue_class_names),
output=args.output,
mapper=lambda x: {
"queue_class_names": x,
},
)


@suppress_logs_and_warning
@providers_configuration_loaded
def config_list(args):
Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,18 @@
}
}
},
"queues": {
"type": "array",
"description": "Message Queues exposed by the provider",
"items": {
"name": {
"type": "string"
},
"message-queue-class": {
"type": "string"
}
}
},
"source-date-epoch": {
"type": "integer",
"description": "Source date epoch - seconds since epoch (gmtime) when the release documentation was prepared. Used to generate reproducible package builds with flint.",
Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/provider_info.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,18 @@
"description": "Class to instantiate the plugin"
}
}
},
"queues": {
"type": "array",
"description": "Message Queues exposed by the provider",
"items": {
"name": {
"type": "string"
},
"message-queue-class": {
"type": "string"
}
}
}
},
"definitions": {
Expand Down
22 changes: 22 additions & 0 deletions airflow-core/src/airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ def __init__(self):
self._auth_manager_class_name_set: set[str] = set()
self._secrets_backend_class_name_set: set[str] = set()
self._executor_class_name_set: set[str] = set()
self._queue_class_name_set: set[str] = set()
self._provider_configs: dict[str, dict[str, Any]] = {}
self._trigger_info_set: set[TriggerInfo] = set()
self._notification_info_set: set[NotificationInfo] = set()
Expand Down Expand Up @@ -533,6 +534,12 @@ def initialize_providers_executors(self):
self.initialize_providers_list()
self._discover_executors()

@provider_info_cache("queues")
def initialize_providers_queues(self):
"""Lazy initialization of providers queue information."""
self.initialize_providers_list()
self._discover_queues()

@provider_info_cache("notifications")
def initialize_providers_notifications(self):
"""Lazy initialization of providers notifications information."""
Expand Down Expand Up @@ -1091,6 +1098,14 @@ def _discover_executors(self) -> None:
if _correctness_check(provider_package, executors_class_name, provider):
self._executor_class_name_set.add(executors_class_name)

def _discover_queues(self) -> None:
"""Retrieve all queues defined in the providers."""
for provider_package, provider in self._provider_dict.items():
if provider.data.get("queues"):
for queue_class_name in provider.data["queues"]:
if _correctness_check(provider_package, queue_class_name, provider):
self._queue_class_name_set.add(queue_class_name)

def _discover_config(self) -> None:
"""Retrieve all configs defined in the providers."""
for provider_package, provider in self._provider_dict.items():
Expand Down Expand Up @@ -1221,6 +1236,11 @@ def executor_class_names(self) -> list[str]:
self.initialize_providers_executors()
return sorted(self._executor_class_name_set)

@property
def queue_class_names(self) -> list[str]:
self.initialize_providers_queues()
return sorted(self._queue_class_name_set)

@property
def filesystem_module_names(self) -> list[str]:
self.initialize_providers_filesystems()
Expand Down Expand Up @@ -1268,9 +1288,11 @@ def _cleanup(self):
self._auth_manager_class_name_set.clear()
self._secrets_backend_class_name_set.clear()
self._executor_class_name_set.clear()
self._queue_class_name_set.clear()
self._provider_configs.clear()
self._trigger_info_set.clear()
self._notification_info_set.clear()
self._plugins_set.clear()

self._initialized = False
self._initialization_stack_trace = None
3 changes: 2 additions & 1 deletion dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,8 @@ def generate_provider_dependencies_if_needed():
{
"python-version": "3.9",
"airflow-version": "3.0.0",
"remove-providers": "cloudant",
# TODO: bring back common-messaging when we bump airflow to 3.0.1
"remove-providers": "cloudant common.messaging",
"run-tests": "true",
},
]
Expand Down
12 changes: 12 additions & 0 deletions devel-common/src/sphinx_exts/operators_and_hooks_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,17 @@ def render_content(
)


class QueuesDirective(BaseJinjaReferenceDirective):
"""Generate list of queues"""

def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _common_render_list_content(
header_separator=header_separator, resource_type="queues", template="queues.rst.jinja2"
)


class DeferrableOperatorDirective(BaseJinjaReferenceDirective):
"""Generate list of deferrable operators"""

Expand Down Expand Up @@ -521,6 +532,7 @@ def setup(app):
app.add_directive("airflow-extra-links", ExtraLinksDirective)
app.add_directive("airflow-notifications", NotificationsDirective)
app.add_directive("airflow-executors", ExecutorsDirective)
app.add_directive("airflow-queues", QueuesDirective)
app.add_directive("airflow-deferrable-operators", DeferrableOperatorDirective)
app.add_directive("airflow-deprecations", DeprecationsDirective)
app.add_directive("airflow-dataset-schemes", AssetSchemeDirective)
Expand Down
27 changes: 27 additions & 0 deletions devel-common/src/sphinx_exts/templates/queues.rst.jinja2
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{#
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
#}
{%for provider, provider_dict in items.items() %}
{{ provider_dict['name'] }}
{{ header_separator * (provider_dict['name']|length) }}

{% for queue in provider_dict['queues'] -%}
- :class:`~{{ queue }}`
{% endfor -%}

{% endfor %}
33 changes: 33 additions & 0 deletions providers-summary-docs/core-extensions/message-queues.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Message Queues
--------------

This is a summary of all Apache Airflow Community provided implementations of Queues
exposed via community-managed providers.

Airflow can be extended by providers with Queues. Each provider can define their own Queues,
that can be configured to handle executing tasks

The queues are explained in
:doc:`apache-airflow:core-concepts/message-queues` and you can also see those
provided by the community-managed providers:

.. airflow-queues::
:tags: None
:header-separator: "
1 change: 1 addition & 0 deletions providers/amazon/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
Logging for Tasks <logging/index>
Configuration <configurations-ref>
Executors <executors/index>
Message Queues <message-queues/index>
AWS Auth manager <auth-manager/index>
CLI <cli-ref>

Expand Down
43 changes: 43 additions & 0 deletions providers/amazon/docs/message-queues/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Amazon Messaging Queues
=======================

Amazon SQS Queue Provider
-------------------------

Implemented by :class:`~airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider`

The Amazon SQS Queue Provider is a message queue provider that uses
Amazon Simple Queue Service (SQS) as the underlying message queue system.
It allows you to send and receive messages using SQS queues in your Airflow workflows.
The provider supports both standard and FIFO queues, and it provides features
such as message visibility timeout, message retention period, and dead-letter queues.

The queue must be matching this regex:

.. exampleinclude:: /../src/airflow/providers/amazon/aws/queues/sqs.py
:language: python
:dedent: 0
:start-after: [START queue_regexp]
:end-before: [END queue_regexp]


The queue parameter is passed directly to ``sqs_queue`` parameter of the underlying
:class:`~airflow.providers.amazon.aws.triggers.sqs.SqsSensorTrigger` class, and passes
all the kwargs directly to the trigger constructor if added.
4 changes: 4 additions & 0 deletions providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ state: ready
source-date-epoch: 1744788746
# note that those versions are maintained by release manager - do not update them manually
versions:
- 9.7.0
- 9.6.1
- 9.6.0
- 9.5.0
Expand Down Expand Up @@ -1178,3 +1179,6 @@ executors:

auth-managers:
- airflow.providers.amazon.aws.auth_manager.aws_auth_manager.AwsAuthManager

queues:
- airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider
10 changes: 7 additions & 3 deletions providers/amazon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"

[project]
name = "apache-airflow-providers-amazon"
version = "9.6.1"
version = "9.7.0"
description = "Provider package apache-airflow-providers-amazon for Apache Airflow"
readme = "README.rst"
authors = [
Expand Down Expand Up @@ -149,6 +149,9 @@ dependencies = [
"standard" = [
"apache-airflow-providers-standard"
]
"common.messaging" = [
"apache-airflow-providers-common-messaging>=1.0.1"
]

[dependency-groups]
dev = [
Expand All @@ -158,6 +161,7 @@ dev = [
"apache-airflow-providers-apache-hive",
"apache-airflow-providers-cncf-kubernetes",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-common-messaging",
"apache-airflow-providers-common-sql",
"apache-airflow-providers-exasol",
"apache-airflow-providers-ftp",
Expand Down Expand Up @@ -211,8 +215,8 @@ apache-airflow-providers-common-sql = {workspace = true}
apache-airflow-providers-standard = {workspace = true}

[project.urls]
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.1"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.1/changelog.html"
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0"
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "9.6.1"
__version__ = "9.7.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
Loading