Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
66db7b5
feat(hitl): add utility function generate_link_to_ui
Lee-W Aug 22, 2025
9a15ff6
refactor(hitl): rewrite generate_link_to_ui signature to take only th…
Lee-W Aug 25, 2025
8b2dde5
feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
faf3400
feat(HITL): add static method generate_link_to_ui_from_context for ea…
Lee-W Aug 26, 2025
2fab8a4
docs(hitl): extend example dag to get hitl Url
Lee-W Aug 26, 2025
da95079
feat(hitl): allow user to genreate url with single option
Lee-W Aug 26, 2025
d2e23b3
docs(hitl): add usage of HITLOperator.generate_link_to_ui_from_contex…
Lee-W Aug 26, 2025
a2bb367
fix(hitl): rename argument as params_input
Lee-W Aug 26, 2025
07bb147
docs(hitl): improve docstring
Lee-W Aug 26, 2025
8ef41b4
fixup! feat(hitl): add utility function generate_link_to_ui
Lee-W Aug 26, 2025
90f9bd3
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
05772d9
fixup! feat(hitl): add utility function generate_link_to_ui
Lee-W Aug 26, 2025
514a77a
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
ce62b3a
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
7104390
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
25f26d3
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
97e1acc
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
2b65fdd
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
d2a89a4
fixup! refactor(hitl): rewrite generate_link_to_ui signature to take …
Lee-W Aug 26, 2025
20f1d06
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
67a7282
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
5edfafa
fixup! feat(hitl): simplify generate_link_to_ui funciton signature
Lee-W Aug 26, 2025
6731408
test(hitl): refactor test cases
Lee-W Aug 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions airflow-core/docs/tutorial/hitl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,18 @@ After the branch is chosen, the workflow will proceed along the selected path.
Notifiers
---------

A notifier is a callback mechanism that allows you to handle HITL events, such as when a task is waiting for human input, succeeds, or fails.
The example uses a notifier ``LocalLogNotifier`` that logs messages for demonstration.
You can implement your own notifier for different functionalities.
A notifier is a callback mechanism for handling HITL events, such as when a task is waiting for human input, succeeds, or fails.
The example uses the ``LocalLogNotifier``, which logs messages for demonstration purposes.

The method ``HITLOperator.generate_link_to_ui_from_context`` can be used to generate a direct link to the UI page where the user should respond. It accepts four arguments:

- ``context`` – automatically passed to ``notify`` by the notifier
- ``base_url`` – (optional) the base URL of the Airflow UI; if not provided, ``api.base_url`` in the configuration will be used
- ``options`` – (optional) pre-selected options for the UI page
- ``params_inputs`` – (optional) pre-loaded inputs for the UI page

This makes it easy to include actionable links in notifications or logs.
You can also implement your own notifier to provide different functionalities.
For more details, please refer to `Creating a notifier <https://airflow.apache.org/docs/apache-airflow/stable/howto/notifications.html>`_ and `Notifications <https://airflow.apache.org/docs/apache-airflow-providers/core-extensions/notifications.html>`_.

In the example Dag, the notifier is defined as follows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ def __init__(self, message: str) -> None:
self.message = message

def notify(self, context: Context) -> None:
url = HITLOperator.generate_link_to_ui_from_context(
context=context,
base_url="http://localhost:28080",
)
self.log.info(self.message)
self.log.info("Url to respond %s", url)


hitl_request_callback = LocalLogNotifier(
Expand Down
116 changes: 114 additions & 2 deletions providers/standard/src/airflow/providers/standard/operators/hitl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,28 @@

from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_PLUS
from airflow.sdk.bases.notifier import BaseNotifier

if not AIRFLOW_V_3_1_PLUS:
raise AirflowOptionalProviderFeatureException("Human in the loop functionality needs Airflow 3.1+.")


from collections.abc import Collection, Mapping, Sequence
from typing import TYPE_CHECKING, Any
from urllib.parse import ParseResult, urlencode, urlparse, urlunparse

from airflow.configuration import conf
from airflow.providers.standard.exceptions import HITLTimeoutError, HITLTriggerEventError
from airflow.providers.standard.operators.branch import BranchMixIn
from airflow.providers.standard.triggers.hitl import HITLTrigger, HITLTriggerEventSuccessPayload
from airflow.providers.standard.utils.skipmixin import SkipMixin
from airflow.providers.standard.version_compat import BaseOperator
from airflow.sdk.bases.notifier import BaseNotifier
from airflow.sdk.definitions.param import ParamsDict
from airflow.sdk.execution_time.hitl import upsert_hitl_detail
from airflow.sdk.timezone import utcnow

if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context
from airflow.sdk.types import RuntimeTaskInstanceProtocol


class HITLOperator(BaseOperator):
Expand Down Expand Up @@ -87,12 +89,33 @@ def __init__(
self.respondents = [respondents] if isinstance(respondents, str) else respondents

self.validate_options()
self.validate_params()
self.validate_defaults()

def validate_options(self) -> None:
"""
Validate the `options` attribute of the instance.

Raises:
ValueError: If `options` is empty.
ValueError: If any option contains a comma (`,`), which is not allowed.
"""
if not self.options:
raise ValueError('"options" cannot be empty.')

if any("," in option for option in self.options):
raise ValueError('"," is not allowed in option')

def validate_params(self) -> None:
"""
Validate the `params` attribute of the instance.

Raises:
ValueError: If `"_options"` key is present in `params`, which is not allowed.
"""
if "_options" in self.params:
raise ValueError('"_options" is not allowed in params')

def validate_defaults(self) -> None:
"""
Validate whether the given defaults pass the following criteria.
Expand Down Expand Up @@ -181,6 +204,95 @@ def validate_params_input(self, params_input: Mapping) -> None:
):
raise ValueError(f"params_input {params_input} does not match params {self.params}")

def generate_link_to_ui(
self,
*,
task_instance: RuntimeTaskInstanceProtocol,
base_url: str | None = None,
options: str | list[str] | None = None,
params_input: dict[str, Any] | None = None,
) -> str:
"""
Generate a URL link to the "required actions" page for a specific task instance.

This URL includes query parameters based on allowed options and parameters.

Args:
task_instance: The task instance to generate the link for.
base_url: Optional base URL to use. Defaults to ``api.base_url`` from config.
options: Optional subset of allowed options to include in the URL.
params_input: Optional subset of allowed params to include in the URL.

Raises:
ValueError: If any provided option or parameter is invalid.
ValueError: If no base_url can be determined.

Returns:
The full URL pointing to the required actions page with query parameters.
"""
query_param: dict[str, Any] = {}
options = [options] if isinstance(options, str) else options
if options:
if diff := set(options) - set(self.options):
raise ValueError(f"options {diff} are not valid options")
query_param["_options"] = ",".join(options)

if params_input:
if diff := set(params_input.keys()) - set(self.params.keys()):
raise ValueError(f"params {diff} are not valid params")
query_param.update(params_input)

if not (base_url := base_url or conf.get("api", "base_url", fallback=None)):
raise ValueError("Not able to retrieve base_url")

query_param["map_index"] = task_instance.map_index

parsed_base_url: ParseResult = urlparse(base_url)
return urlunparse(
(
parsed_base_url.scheme,
parsed_base_url.netloc,
f"/dags/{task_instance.dag_id}/runs/{task_instance.run_id}/tasks/{task_instance.task_id}/required_actions",
"",
urlencode(query_param) if query_param else "",
"",
)
)

@staticmethod
def generate_link_to_ui_from_context(
*,
context: Context,
base_url: str | None = None,
options: list[str] | None = None,
params_input: dict[str, Any] | None = None,
) -> str:
"""
Generate a "required actions" page URL from a task context.

Delegates to ``generate_link_to_ui`` using the task and task_instance extracted from
the provided context.

Args:
context: The Airflow task context containing 'task' and 'task_instance'.
base_url: Optional base URL to use.
options: Optional list of allowed options to include.
params_input: Optional dictionary of allowed parameters to include.

Returns:
The full URL pointing to the required actions page with query parameters.
"""
hitl_op = context["task"]
if not isinstance(hitl_op, HITLOperator):
raise ValueError("This method only supports HITLOperator")

return hitl_op.generate_link_to_ui(
task_instance=context["task_instance"],
base_url=base_url,
options=options,
params_input=params_input,
)


class ApprovalOperator(HITLOperator, SkipMixin):
"""Human-in-the-loop Operator that has only 'Approval' and 'Reject' options."""
Expand Down
Loading