Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
731f11d
Add version_compat for HTTP provider
karenbraganz May 28, 2025
44a473c
Create HTTPEventTrigger class with constructor
karenbraganz May 28, 2025
a64ce8e
First draft of HttpEventTrigger
karenbraganz May 30, 2025
6b39291
Remove trailing whitespace
karenbraganz May 30, 2025
b61926e
Merge branch 'apache:main' into httpeventtrigger
karenbraganz Jun 19, 2025
5957568
Pass response_check path instead of callable for serialization
karenbraganz Jun 19, 2025
292de2c
Create unit tests for HttpEventTrigger
karenbraganz Jul 1, 2025
3007611
Merge branch 'main' into httpeventtrigger
karenbraganz Jul 2, 2025
8dd1199
First draft of HttpEventTrigger docs
karenbraganz Jul 7, 2025
4fc7121
Merge branch 'main' into httpeventtrigger
karenbraganz Jul 17, 2025
5ea52e1
Add version_compat for HTTP provider
karenbraganz May 28, 2025
5ea140e
Create HTTPEventTrigger class with constructor
karenbraganz May 28, 2025
4f6dec5
Change log formatting
karenbraganz Aug 4, 2025
478c35a
Add parameters to docs
karenbraganz Aug 4, 2025
88dc4c6
Ruff formatting
karenbraganz Aug 4, 2025
b9977d4
Ruff formatting
karenbraganz Aug 4, 2025
5a08c93
Add check to ensure that response_check is async
karenbraganz Aug 5, 2025
2ff0794
Correct failing tests
karenbraganz Aug 5, 2025
7d84ae8
Merge branch 'main' into httpeventtrigger
karenbraganz Aug 5, 2025
2aa6bb7
Change imports for Airflow 2 compatibility tests
karenbraganz Aug 5, 2025
22bf232
Merge branch 'main' into httpeventtrigger
karenbraganz Aug 5, 2025
388a448
Add usage example in docs
karenbraganz Aug 6, 2025
88bbfae
Add usage example to docs
karenbraganz Aug 6, 2025
09a6771
Add comment about token to docs
karenbraganz Aug 6, 2025
400beda
Modify parameters desc
karenbraganz Aug 6, 2025
1c04cac
Merge branch 'main' into httpeventtrigger
karenbraganz Aug 6, 2025
0f1dc1d
Make response_check_path mandatory
karenbraganz Aug 6, 2025
991739f
Add comments to docs
karenbraganz Aug 6, 2025
bd63c9c
Change notes order
karenbraganz Aug 6, 2025
038be0a
Make _import_from_response_check_path async
karenbraganz Aug 6, 2025
e504dd9
Modify imports
karenbraganz Aug 7, 2025
c9d30c0
Correction in docs
karenbraganz Aug 7, 2025
e1fe5bd
Remove annotations import
karenbraganz Aug 7, 2025
c327a9e
Merge branch 'main' into httpeventtrigger
karenbraganz Aug 7, 2025
3b3f399
Merge branch 'main' into httpeventtrigger
karenbraganz Sep 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions providers/http/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

Connection types <connections/http>
Operators <operators>
Triggers <triggers>

.. toctree::
:hidden:
Expand Down
143 changes: 143 additions & 0 deletions providers/http/docs/triggers.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@

.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

HTTP Event Trigger
==================

.. _howto/trigger:HttpEventTrigger:

The ``HttpEventTrigger`` is an event-based trigger that monitors whether responses
from an API meet the conditions set by the user in the ``response_check`` callable.

It is designed for **Airflow 3.0+** to be used in combination with the ``AssetWatcher`` system,
enabling event-driven DAGs based on API responses.

How It Works
------------

1. Sends requests to an API.
2. Uses the callable at ``response_check_path`` to evaluate the API response.
3. If the callable returns ``True``, a ``TriggerEvent`` is emitted. This will trigger DAGs using this ``AssetWatcher`` for scheduling.

.. note::
This trigger requires **Airflow >= 3.0** due to dependencies on ``AssetWatcher`` and event-driven scheduling infrastructure.

Usage Example with AssetWatcher
-------------------------------

Here's an example of using the HttpEventTrigger in an AssetWatcher to monitor the GitHub API for new Airflow releases.

.. code-block:: python


import datetime
import os

from asgiref.sync import sync_to_async

from airflow.providers.http.triggers.http import HttpEventTrigger
from airflow.sdk import Asset, AssetWatcher, Variable, dag, task

# This token must be generated through GitHub and added as an environment variable
token = os.getenv("GITHUB_TOKEN")

headers = {
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": "2022-11-28",
}


async def check_github_api_response(response):
data = response.json()
release_id = str(data["id"])
get_variable_sync = sync_to_async(Variable.get)
previous_release_id = await get_variable_sync(key="release_id_var", default=None)
if release_id == previous_release_id:
return False
release_name = data["name"]
release_html_url = data["html_url"]
set_variable_sync = sync_to_async(Variable.set)
await set_variable_sync(key="release_id_var", value=str(release_id))
await set_variable_sync(key="release_name_var", value=release_name)
await set_variable_sync(key="release_html_url_var", value=release_html_url)
return True


trigger = HttpEventTrigger(
endpoint="repos/apache/airflow/releases/latest",
method="GET",
http_conn_id="http_default", # HTTP connection with https://api.github.com/ as the Host
headers=headers,
response_check_path="dags.check_airflow_releases.check_github_api_response", # Path to the check_github_api_response callable
)

asset = Asset(
"airflow_releases_asset", watchers=[AssetWatcher(name="airflow_releases_watcher", trigger=trigger)]
)


@dag(start_date=datetime.datetime(2024, 10, 1), schedule=asset, catchup=False)
def check_airflow_releases():
@task()
def print_airflow_release_info():
release_name = Variable.get("release_name_var")
release_html_url = Variable.get("release_html_url_var")
print(f"{release_name} has been released. Check it out at {release_html_url}")

print_airflow_release_info()


check_airflow_releases()

Parameters
----------

``http_conn_id``
http connection id that has the base API url i.e https://www.google.com/ and optional authentication credentials.
Default headers can also be specified in the Extra field in json format.

``auth_type``
The auth type for the service

``method``
the API method to be called

``endpoint``
Endpoint to be called, i.e. ``resource/v1/query?``

``headers``
Additional headers to be passed through as a dict

``data``
Payload to be uploaded or request parameters

``extra_options``
Additional kwargs to pass when creating a request.

``response_check_path``
Path to callable that evaluates whether the API response passes the conditions set by the user to trigger DAGs


Important Notes
---------------

1. A ``response_check_path`` value is required.
2. The ``response_check_path`` must contain the path to an asynchronous callable. Synchronous callables will raise an exception.
3. This trigger does not automatically record the previous API response.
4. The previous response may have to be persisted manually though ``Variable.set()`` in the ``response_check_path`` callable to prevent the trigger from emitting events repeatedly for the same API response.
126 changes: 112 additions & 14 deletions providers/http/src/airflow/providers/http/triggers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,30 @@

import asyncio
import base64
import importlib
import inspect
import pickle
import sys
from collections.abc import AsyncIterator
from importlib import import_module
from typing import TYPE_CHECKING, Any

import aiohttp
import requests
from asgiref.sync import sync_to_async
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpAsyncHook
from airflow.providers.http.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.triggers.base import BaseTrigger, TriggerEvent

if AIRFLOW_V_3_0_PLUS:
from airflow.triggers.base import BaseEventTrigger
else:
from airflow.triggers.base import BaseTrigger as BaseEventTrigger # type: ignore

if TYPE_CHECKING:
from aiohttp.client_reqrep import ClientResponse

Expand Down Expand Up @@ -105,21 +115,9 @@ def serialize(self) -> tuple[str, dict[str, Any]]:

async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make a series of asynchronous http calls via a http hook."""
hook = HttpAsyncHook(
method=self.method,
http_conn_id=self.http_conn_id,
auth_type=self.auth_type,
)
hook = self._get_async_hook()
try:
async with aiohttp.ClientSession() as session:
client_response = await hook.run(
session=session,
endpoint=self.endpoint,
data=self.data,
headers=self.headers,
extra_options=self.extra_options,
)
response = await self._convert_response(client_response)
response = await self._get_response(hook)
yield TriggerEvent(
{
"status": "success",
Expand All @@ -129,6 +127,25 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})

def _get_async_hook(self) -> HttpAsyncHook:
return HttpAsyncHook(
method=self.method,
http_conn_id=self.http_conn_id,
auth_type=self.auth_type,
)

async def _get_response(self, hook):
async with aiohttp.ClientSession() as session:
client_response = await hook.run(
session=session,
endpoint=self.endpoint,
data=self.data,
headers=self.headers,
extra_options=self.extra_options,
)
response = await self._convert_response(client_response)
return response

@staticmethod
async def _convert_response(client_response: ClientResponse) -> requests.Response:
"""Convert aiohttp.client_reqrep.ClientResponse to requests.Response."""
Expand Down Expand Up @@ -219,3 +236,84 @@ def _get_async_hook(self) -> HttpAsyncHook:
method=self.method,
http_conn_id=self.http_conn_id,
)


class HttpEventTrigger(HttpTrigger, BaseEventTrigger):
"""
HttpEventTrigger for event-based DAG scheduling when the API response satisfies the response check.

:param response_check_path: Path to the function that evaluates whether the API response
passes the conditions set by the user to fire the trigger. The method must be asynchronous.
:param http_conn_id: http connection id that has the base
API url i.e https://www.google.com/ and optional authentication credentials. Default
headers can also be specified in the Extra field in json format.
:param auth_type: The auth type for the service
:param method: The API method to be called
:param endpoint: Endpoint to be called, i.e. ``resource/v1/query?``.
:param headers: Additional headers to be passed through as a dict.
:param data: Payload to be uploaded or request parameters.
:param extra_options: Additional kwargs to pass when creating a request.
"""

def __init__(
self,
response_check_path: str,
http_conn_id: str = "http_default",
auth_type: Any = None,
method: str = "GET",
endpoint: str | None = None,
headers: dict[str, str] | None = None,
data: dict[str, Any] | str | None = None,
extra_options: dict[str, Any] | None = None,
):
super().__init__(http_conn_id, auth_type, method, endpoint, headers, data, extra_options)
self.response_check_path = response_check_path

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize HttpEventTrigger arguments and classpath."""
return (
self.__class__.__module__ + "." + self.__class__.__qualname__,
{
"http_conn_id": self.http_conn_id,
"method": self.method,
"auth_type": self.auth_type,
"endpoint": self.endpoint,
"headers": self.headers,
"data": self.data,
"extra_options": self.extra_options,
"response_check_path": self.response_check_path,
},
)

async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make a series of asynchronous http calls via a http hook until the response passes the response check."""
hook = super()._get_async_hook()
try:
while True:
response = await super()._get_response(hook)
if await self._run_response_check(response):
break
yield TriggerEvent(
{
"status": "success",
"response": base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
}
)
except Exception as e:
self.log.error("status: error, message: %s", str(e))

async def _import_from_response_check_path(self):
"""Import the response check callable from the path provided by the user."""
module_path, func_name = self.response_check_path.rsplit(".", 1)
if module_path in sys.modules:
module = await sync_to_async(importlib.reload)(sys.modules[module_path])
module = await sync_to_async(importlib.import_module)(module_path)
return getattr(module, func_name)

async def _run_response_check(self, response) -> bool:
"""Run the response_check callable provided by the user."""
response_check = await self._import_from_response_check_path()
if not inspect.iscoroutinefunction(response_check):
raise AirflowException("The response_check callable is not asynchronous.")
check = await response_check(response)
return check
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:


AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)

AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)

if AIRFLOW_V_3_1_PLUS:
Expand Down
Loading