Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class DagRun(StrictBaseModel):
clear_number: int = 0
run_type: DagRunType
state: DagRunState
conf: Annotated[dict[str, Any], Field(default_factory=dict)]
conf: dict[str, Any] | None = None
triggering_user_name: str | None = None
consumed_asset_events: list[AssetEventDagRunReference]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
AddIncludePriorDatesToGetXComSlice,
)
from airflow.api_fastapi.execution_api.versions.v2025_09_23 import AddDagVersionIdField
from airflow.api_fastapi.execution_api.versions.v2025_10_10 import AddTriggeringUserNameField
from airflow.api_fastapi.execution_api.versions.v2025_10_10 import (
AddTriggeringUserNameField,
MakeDagRunConfNullable,
)

bundle = VersionBundle(
HeadVersion(),
Version("2025-10-10", AddTriggeringUserNameField),
Version("2025-10-10", AddTriggeringUserNameField, MakeDagRunConfNullable),
Version("2025-09-23", AddDagVersionIdField),
Version(
"2025-08-10",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,18 @@ def remove_triggering_user_name_from_dag_run(response: ResponseInfo) -> None: #
"""Remove the `triggering_user_name` field from the dag_run object when converting to the previous version."""
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
response.body["dag_run"].pop("triggering_user_name", None)


class MakeDagRunConfNullable(VersionChange):
"""Make DagRun.conf field nullable to match database schema."""

description = __doc__

instructions_to_migrate_to_previous_version = ()

@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
def ensure_conf_is_dict_in_dag_run(response: ResponseInfo) -> None: # type: ignore[misc]
"""Ensure conf is always a dict (never None) in previous versions."""
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
if response.body["dag_run"].get("conf") is None:
response.body["dag_run"]["conf"] = {}
Original file line number Diff line number Diff line change
Expand Up @@ -2408,3 +2408,30 @@ def test_ti_inactive_inlets_and_outlets_without_inactive_assets(self, logical_da
response = client.get(f"/execution/task-instances/{task1_ti.id}/validate-inlets-and-outlets")
assert response.status_code == 200
assert response.json() == {"inactive_assets": []}

def test_ti_run_with_null_conf(self, client, session, create_task_instance):
"""Test that task instances can start when dag_run.conf is NULL."""
ti = create_task_instance(
task_id="test_ti_run_with_null_conf",
state=State.QUEUED,
dagrun_state=DagRunState.RUNNING,
session=session,
)
# Set conf to NULL to simulate Airflow 2.x upgrade or offline migration
ti.dag_run.conf = None
session.commit()

response = client.patch(
f"/execution/task-instances/{ti.id}/run",
json={
"state": "running",
"pid": 100,
"hostname": "test-hostname",
"unixname": "test-user",
"start_date": timezone.utcnow().isoformat(),
},
)

assert response.status_code == 200, f"Response: {response.text}"
context = response.json()
assert context["dag_run"]["conf"] is None
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,60 @@ def test_ti_run_excludes_triggering_user_name(
assert dag_run["run_id"] == "test"
assert dag_run["state"] == "running"
assert dag_run["conf"] == {}


class TestTIRunConfV20250923:
"""Test that API version 2025-09-23 converts NULL conf to empty dict."""

def setup_method(self):
clear_db_runs()

def teardown_method(self):
clear_db_runs()

def test_ti_run_null_conf_converted_to_dict(
self,
ver_client,
session,
create_task_instance,
):
"""
Test that NULL conf is converted to empty dict in API version 2025-09-23.

In version 2025-10-10, the conf field became nullable to match database schema.
Older API clients (2025-09-23 and earlier) should receive an empty dict instead
of None for backward compatibility.
"""
ti = create_task_instance(
task_id="test_ti_run_null_conf_v2",
state=State.QUEUED,
dagrun_state=DagRunState.RUNNING,
session=session,
)
# Set conf to NULL to simulate Airflow 2.x upgrade or offline migration
ti.dag_run.conf = None
session.commit()

response = ver_client.patch(
f"/execution/task-instances/{ti.id}/run",
json={
"state": "running",
"pid": 100,
"hostname": "test-hostname",
"unixname": "test-user",
"start_date": timezone.utcnow().isoformat(),
},
)

assert response.status_code == 200
json_response = response.json()

assert "dag_run" in json_response
dag_run = json_response["dag_run"]

# In older API versions, None should be converted to empty dict
assert dag_run["conf"] == {}, "NULL conf should be converted to empty dict in API version 2025-09-23"

# Verify other expected fields
assert dag_run["dag_id"] == ti.dag_id
assert dag_run["run_id"] == "test"
Loading