Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
979a473
AIP-86 - Add async support for Notifiers
ramitkataria Jul 28, 2025
39e11d2
Docs and naming fixes
ramitkataria Jul 28, 2025
c2900fd
CI fixes
ramitkataria Jul 29, 2025
3c94260
Make context a kwarg
ramitkataria Aug 1, 2025
5c4a869
PR changes
ramitkataria Aug 5, 2025
dc03815
Merge branch 'main' of https://github.com/apache/airflow into ramitka…
ramitkataria Aug 5, 2025
77bf740
Exception handling in BaseNotifier
ramitkataria Aug 5, 2025
ed9bbec
Update docstrings and typechecks
ramitkataria Aug 5, 2025
1c76f75
Update Notifiers docs pages
ferruzzi Aug 5, 2025
d4251f2
Add unit tests
ferruzzi Aug 6, 2025
b760512
Add Slack notifier unit tests
ferruzzi Aug 6, 2025
f63a7f5
Add task-sdk unit tests
ferruzzi Aug 6, 2025
5fecfb7
Merge branch 'main' into ramitkataria/deadlines/async-notifiers
ferruzzi Aug 6, 2025
3b5ea70
fix unit tests broken by the rebase
ferruzzi Aug 7, 2025
9563235
roll back changes to one unit test
ferruzzi Aug 7, 2025
f1cf491
un-re-revert to remove pointless check in unit test
ferruzzi Aug 7, 2025
fb15eea
Refactor Deadline model unit tests to not make assertions about speci…
ramitkataria Aug 12, 2025
b5cab60
Merge branch 'main' of github.com:aws-mwaa/upstream-to-airflow into r…
ramitkataria Aug 12, 2025
45dcb43
Merge branch 'main' into ramitkataria/deadlines/async-notifiers
ramitkataria Aug 13, 2025
c998a71
Merge branch 'main' into ramitkataria/deadlines/async-notifiers
ramitkataria Aug 13, 2025
3600129
Re-add accidentally removed code comment about TaskSDK context
ramitkataria Aug 18, 2025
bd5f89f
Merge branch 'main' into ramitkataria/deadlines/async-notifiers
ramitkataria Aug 18, 2025
65e3655
Pass context in async notifier constructor
ramitkataria Aug 18, 2025
7bfd77b
Revisions for Ash
ferruzzi Aug 18, 2025
d6eaff6
Remove redundant unit test
ferruzzi Aug 19, 2025
7c27539
Merge branch 'main' into ramitkataria/deadlines/async-notifiers
ramitkataria Aug 19, 2025
89667a5
Merge branch 'main' into ramitkataria/deadlines/async-notifiers
ramitkataria Aug 21, 2025
90901aa
Add missing kwargs to parent class constructor
ramitkataria Aug 22, 2025
1c69759
Merge branch 'main' into ramitkataria/deadlines/async-notifiers
ramitkataria Aug 25, 2025
b88859c
Merge branch 'main' into ramitkataria/deadlines/async-notifiers
ramitkataria Aug 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@
Callbacks
=========

A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given DAG or task, or across all tasks in a given DAG.
For example, you may wish to alert when certain tasks have failed, or invoke a callback when your DAG succeeds.
A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given Dag or task, or across all tasks in a given Dag.
For example, you may wish to alert when certain tasks have failed, or invoke a callback when your Dag succeeds.

There are three different places where callbacks can be defined.

- Callbacks set in the DAG definition will be applied at the DAG level.
- Using ``default_args``, callbacks can be set for each task in a DAG.
- Callbacks set in the Dag definition will be applied at the Dag level.
- Using ``default_args``, callbacks can be set for each task in a Dag.
- Individual callbacks can be set for a task by setting that callback within the task definition itself.

.. note::

Callback functions are only invoked when the DAG or task state changes due to execution by a worker.
As such, DAG and task changes set by the command line interface (:doc:`CLI <../../howto/usage-cli>`) or user interface (:doc:`UI <../../ui>`) do not
Callback functions are only invoked when the Dag or task state changes due to execution by a worker.
As such, Dag and task changes set by the command line interface (:doc:`CLI <../../howto/usage-cli>`) or user interface (:doc:`UI <../../ui>`) do not
execute callback functions.

.. warning::
Expand All @@ -42,6 +42,12 @@ There are three different places where callbacks can be defined.
By default, scheduler logs do not show up in the UI and instead can be found in
``$AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log``

.. note::
As of Airflow 2.6.0, callbacks now supports a list of callback functions, allowing users to specify multiple functions
to be executed in the desired event. Simply pass a list of callback functions to the callback args when defining your Dag/task
callbacks: e.g ``on_failure_callback=[callback_func_1, callback_func_2]``


Callback Types
--------------

Expand All @@ -50,33 +56,33 @@ There are six types of events that can trigger a callback:
=========================================== ================================================================
Name Description
=========================================== ================================================================
``on_success_callback`` Invoked when the :ref:`DAG succeeds <dag-run:dag-run-status>` or :ref:`task succeeds <concepts:task-instances>`.
Available at the DAG or task level.
``on_success_callback`` Invoked when the :ref:`Dag succeeds <dag-run:dag-run-status>` or :ref:`task succeeds <concepts:task-instances>`.
Available at the Dag or task level.
``on_failure_callback`` Invoked when the task :ref:`fails <concepts:task-instances>`.
Available at the DAG or task level.
Available at the Dag or task level.
``on_retry_callback`` Invoked when the task is :ref:`up for retry <concepts:task-instances>`.
Available only at the task level.
``on_execute_callback`` Invoked right before the task begins executing.
Available only at the task level.
``on_skipped_callback`` Invoked when the task is :ref:`running <concepts:task-instances>` and AirflowSkipException raised.
Explicitly it is NOT called if a task is not started to be executed because of a preceding branching
decision in the DAG or a trigger rule which causes execution to skip so that the task execution
decision in the Dag or a trigger rule which causes execution to skip so that the task execution
is never scheduled.
Available only at the task level.
=========================================== ================================================================


Example
-------
Examples
--------

Using Custom Callback Methods
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

In the following example, failures in ``task1`` call the ``task_failure_alert`` function, and success at DAG level calls the ``dag_success_alert`` function.
In the following example, failures in ``task1`` call the ``task_failure_alert`` function, and success at Dag level calls the ``dag_success_alert`` function.
Before each task begins to execute, the ``task_execute_callback`` function will be called:

.. code-block:: python

import datetime
import pendulum

from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator

Expand All @@ -90,27 +96,48 @@ Before each task begins to execute, the ``task_execute_callback`` function will


def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
print(f"Dag has succeeded, run_id: {context['run_id']}")


with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=dag_success_alert,
default_args={"on_execute_callback": task_execute_callback},
tags=["example"],
):
task1 = EmptyOperator(task_id="task1", on_failure_callback=[task_failure_alert])
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task1 >> task2 >> task3

.. note::
As of Airflow 2.6.0, callbacks now supports a list of callback functions, allowing users to specify multiple functions
to be executed in the desired event. Simply pass a list of callback functions to the callback args when defining your DAG/task
callbacks: e.g ``on_failure_callback=[callback_func_1, callback_func_2]``

Full list of variables available in ``context`` in :doc:`docs <../../templates-ref>` and `code <https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/context.py>`_.


Using Notifiers
^^^^^^^^^^^^^^^

You can use Notifiers in your Dag definition by passing it as an argument to the ``on_*_callbacks``.
For example, you can use it with ``on_success_callback`` or ``on_failure_callback`` to send notifications based
on the status of a task or a Dag run.

Here's an example of using a custom notifier:

.. code-block:: python

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

from myprovider.notifier import MyNotifier

with DAG(
dag_id="example_notifier",
on_success_callback=MyNotifier(message="Success!"),
on_failure_callback=MyNotifier(message="Failure!"),
):
task = BashOperator(
task_id="example_task",
bash_command="exit 1",
on_success_callback=MyNotifier(message="Task Succeeded!"),
)

For a list of community-managed Notifiers, see :doc:`apache-airflow-providers:core-extensions/notifications`.
For more information on writing a custom Notifier, see the :doc:`Notifiers <../../howto/notifications>` how-to page.
47 changes: 14 additions & 33 deletions airflow-core/docs/howto/notifications.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
specific language governing permissions and limitations
under the License.

Creating a notifier
Creating a Notifier
===================

The :class:`~airflow.sdk.definitions.notifier.BaseNotifier` is an abstract class that provides a basic
structure for sending notifications in Airflow using the various ``on_*__callback``.
It is intended for providers to extend and customize for their specific needs.
Expand All @@ -32,49 +33,29 @@ Here's an example of how you can create a Notifier class:
.. code-block:: python

from airflow.sdk import BaseNotifier
from my_provider import send_message
from my_provider import async_send_message, send_message


class MyNotifier(BaseNotifier):
template_fields = ("message",)

def __init__(self, message):
def __init__(self, message: str):
self.message = message

def notify(self, context):
# Send notification here, below is an example
def notify(self, context: Context) -> None:
# Send notification here. For example:
title = f"Task {context['task_instance'].task_id} failed"
send_message(title, self.message)

Using a notifier
----------------
Once you have a notifier implementation, you can use it in your ``DAG`` definition by passing it as an argument to
the ``on_*_callbacks``. For example, you can use it with ``on_success_callback`` or ``on_failure_callback`` to send
notifications based on the status of a task or a DAG run.

Here's an example of using the above notifier:

.. code-block:: python

from datetime import datetime
async def async_notify(self, context: Context) -> None:
# Only required if your Notifier is going to support asynchronous code. For example:
title = f"Task {context['task_instance'].task_id} failed"
await async_send_message(title, self.message)

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

from myprovider.notifier import MyNotifier
For a list of community-managed notifiers, see :doc:`apache-airflow-providers:core-extensions/notifications`.

with DAG(
dag_id="example_notifier",
start_date=datetime(2022, 1, 1),
schedule=None,
on_success_callback=MyNotifier(message="Success!"),
on_failure_callback=MyNotifier(message="Failure!"),
):
task = BashOperator(
task_id="example_task",
bash_command="exit 1",
on_success_callback=MyNotifier(message="Task Succeeded!"),
)
Using Notifiers
===============

For a list of community-managed notifiers, see
:doc:`apache-airflow-providers:core-extensions/notifications`.
For using Notifiers in event-based DAG callbacks, see :doc:`../administration-and-deployment/logging-monitoring/callbacks`.
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/triggers/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
try:
callback = import_string(self.callback_path)
yield TriggerEvent({PAYLOAD_STATUS_KEY: DeadlineCallbackState.RUNNING})
result = await callback(**self.callback_kwargs)

# TODO: get airflow context
context: dict = {}

result = await callback(**self.callback_kwargs, context=context)
log.info("Deadline callback completed with return value: %s", result)
yield TriggerEvent({PAYLOAD_STATUS_KEY: DeadlineCallbackState.SUCCESS, PAYLOAD_BODY_KEY: result})
except Exception as e:
Expand Down
Loading