Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7df4416
refactor: Make set_xcom route more DRY be removing duplicated logic a…
dabla Sep 5, 2025
76f9ee6
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 5, 2025
66f5305
refactor: set method on XComModel should not serialize the value when…
dabla Sep 5, 2025
772894a
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 5, 2025
7a08c75
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 13, 2025
58e1e0e
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 13, 2025
56c5567
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 15, 2025
2fb7791
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 15, 2025
38d182a
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 15, 2025
dd7130b
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 16, 2025
6bf284b
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 17, 2025
5a51cb4
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 17, 2025
f4a106f
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 19, 2025
5c43a1f
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 24, 2025
59f2dc9
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Sep 28, 2025
de855b2
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 4, 2025
7229cde
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 5, 2025
fc8b569
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 5, 2025
b150d52
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 7, 2025
d6f329d
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 7, 2025
1bfeef0
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 8, 2025
dd9186d
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 8, 2025
a9e016f
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 11, 2025
2a58dad
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 29, 2025
3466f94
Update airflow-core/src/airflow/api_fastapi/execution_api/routes/xcom…
dabla Oct 29, 2025
9f88089
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Oct 30, 2025
ff64ea5
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Nov 5, 2025
79f2c25
Merge branch 'main' into refactor/make-fast-api-route-set-xcoms-dry
dabla Nov 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -385,39 +385,20 @@ def set_xcom(
# TODO: Can/should we check if a client _hasn't_ provided this for an upstream of a mapped task? That
# means loading the serialized dag and that seems like a relatively costly operation for minimal benefit
# (the mapped task would fail in a moment as it can't be expanded anyway.)
from airflow.models.dagrun import DagRun

if not run_id:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Run with ID: `{run_id}` was not found")

dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id, run_id=run_id).scalar()
if dag_run_id is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG run not found on DAG {dag_id} with ID {run_id}")

# Remove duplicate XComs and insert a new one.
session.execute(
delete(XComModel).where(
XComModel.key == key,
XComModel.run_id == run_id,
XComModel.task_id == task_id,
XComModel.dag_id == dag_id,
XComModel.map_index == map_index,
)
)

try:
# We expect serialised value from the caller - sdk, do not serialise in here
new = XComModel(
dag_run_id=dag_run_id,
XComModel.set(
key=key,
value=value,
run_id=run_id,
task_id=task_id,
dag_id=dag_id,
map_index=map_index,
serialize=False,
session=session,
)
session.add(new)
session.flush()
except ValueError as e:
raise HTTPException(status.HTTP_404_NOT_FOUND, str(e))
except TypeError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
Expand Down
21 changes: 12 additions & 9 deletions airflow-core/src/airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def set(
task_id: str,
run_id: str,
map_index: int = -1,
serialize: bool = True,
session: Session = NEW_SESSION,
) -> None:
"""
Expand All @@ -178,7 +179,8 @@ def set(
:param task_id: Task ID.
:param run_id: DAG run ID for the task.
:param map_index: Optional map index to assign XCom for a mapped task.
The default is ``-1`` (set for a non-mapped task).
:param serialize: Optional parameter to specify if value should be serialized or not.
The default is ``True``.
:param session: Database session. If not given, a new session will be
created for this function.
"""
Expand Down Expand Up @@ -215,14 +217,15 @@ def set(
)
value = list(value)

value = cls.serialize_value(
value=value,
key=key,
task_id=task_id,
dag_id=dag_id,
run_id=run_id,
map_index=map_index,
)
if serialize:
value = cls.serialize_value(
value=value,
key=key,
task_id=task_id,
dag_id=dag_id,
run_id=run_id,
map_index=map_index,
)

# Remove duplicate XComs and insert a new one.
session.execute(
Expand Down
Loading