diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index 9f96954fda146..cf2a406c70718 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -444,6 +444,7 @@ class TaskInstanceInfo(InfoJsonEncodable): includes = ["duration", "try_number", "pool", "queued_dttm", "log_url"] casts = { + "log_url": lambda ti: getattr(ti, "log_url", None), "map_index": lambda ti: ti.map_index if getattr(ti, "map_index", -1) != -1 else None, "dag_bundle_version": lambda ti: ( ti.bundle_instance.version if hasattr(ti, "bundle_instance") else None diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index 27b64daa63031..2ee3560d2360b 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -1499,6 +1499,7 @@ def test_taskinstance_info_af3(): runtime_ti.bundle_instance = bundle_instance assert dict(TaskInstanceInfo(runtime_ti)) == { + "log_url": None, "map_index": 2, "try_number": 1, "dag_bundle_version": "bundle_version", diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 66a3d02cd8c48..31969c85ffb75 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -137,6 +137,8 @@ class RuntimeTaskInstance(TaskInstance): rendered_map_index: str | None = None + log_url: str | None = None + def __rich_repr__(self): yield "id", self.id yield "task_id", self.task_id @@ -546,6 +548,23 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, value: Any) -> None: ) +def get_log_url_from_ti(ti: RuntimeTaskInstance) -> str: + from urllib.parse import quote + + from airflow.configuration import conf + + run_id = quote(ti.run_id) + base_url = conf.get("api", "base_url", fallback="http://localhost:8080/") + map_index_value = getattr(ti, "map_index", -1) + map_index = f"/mapped/{map_index_value}" if map_index_value is not None and map_index_value >= 0 else "" + try_number_value = getattr(ti, "try_number", 0) + try_number = ( + f"?try_number={try_number_value}" if try_number_value is not None and try_number_value > 0 else "" + ) + _log_uri = f"{base_url}dags/{ti.dag_id}/runs/{run_id}/tasks/{ti.task_id}{map_index}{try_number}" + return _log_uri + + def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: # TODO: Task-SDK: # Using DagBag here is about 98% wrong, but it'll do for now @@ -703,6 +722,7 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): ti = parse(msg, log) + ti.log_url = get_log_url_from_ti(ti) log.debug("DAG file parsed", file=msg.dag_rel_path) else: raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}") diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 8df65b88ebf48..1d8c8ad490f94 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -106,6 +106,7 @@ _push_xcom_if_needed, _xcom_push, finalize, + get_log_url_from_ti, parse, run, startup, @@ -2144,6 +2145,8 @@ def execute(self, context): mocked_parse(what, "basic_dag", task) runtime_ti, context, log = startup() + assert runtime_ti is not None + assert runtime_ti.log_url == get_log_url_from_ti(runtime_ti) assert isinstance(listener.component, TaskRunnerMarker) del listener.component