Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

from airflow import settings
from airflow._shared.timezones import timezone
from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as DRDataModel, TIRunContext
from airflow.callbacks.callback_requests import DagCallbackRequest, DagRunContext, TaskCallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
Expand Down Expand Up @@ -888,7 +888,7 @@ def process_executor_events(
ti=ti,
msg=msg,
context_from_server=TIRunContext(
dag_run=ti.dag_run,
dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True),
max_tries=ti.max_tries,
variables=[],
connections=[],
Expand Down Expand Up @@ -2266,7 +2266,7 @@ def _purge_task_instances_without_heartbeats(
ti=ti,
msg=str(task_instance_heartbeat_timeout_message_details),
context_from_server=TIRunContext(
dag_run=ti.dag_run,
dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True),
max_tries=ti.max_tries,
variables=[],
connections=[],
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ class AssetEvent(Base):
created_dagruns = relationship(
"DagRun",
secondary=association_table,
backref="consumed_asset_events",
back_populates="consumed_asset_events",
)

source_aliases = relationship(
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ class DagRun(Base, LoggingMixin):
back_populates="dag_run",
cascade="save-update, merge, delete, delete-orphan",
)

consumed_asset_events = relationship(
"AssetEvent",
secondary="dagrun_asset_event",
back_populates="created_dagruns",
lazy="selectin",
)
task_instances_histories = relationship(
TIH,
primaryjoin="and_(DagRun.dag_id == TaskInstanceHistory.dag_id, DagRun.run_id == TaskInstanceHistory.run_id)",
Expand Down
38 changes: 19 additions & 19 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6876,33 +6876,33 @@ def test_execute_queries_count_with_harvested_dags(
# One DAG with one task per DAG file.
([10, 10, 10, 10], 1, 1, "1d", "None", "no_structure"),
([10, 10, 10, 10], 1, 1, "1d", "None", "linear"),
([24, 14, 14, 14], 1, 1, "1d", "@once", "no_structure"),
([24, 14, 14, 14], 1, 1, "1d", "@once", "linear"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "no_structure"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "linear"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "binary_tree"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "star"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "grid"),
([40, 14, 14, 14], 1, 1, "1d", "@once", "no_structure"),
([40, 14, 14, 14], 1, 1, "1d", "@once", "linear"),
([40, 26, 29, 32], 1, 1, "1d", "30m", "no_structure"),
([40, 26, 29, 32], 1, 1, "1d", "30m", "linear"),
([40, 26, 29, 32], 1, 1, "1d", "30m", "binary_tree"),
([40, 26, 29, 32], 1, 1, "1d", "30m", "star"),
([40, 26, 29, 32], 1, 1, "1d", "30m", "grid"),
# One DAG with five tasks per DAG file.
([10, 10, 10, 10], 1, 5, "1d", "None", "no_structure"),
([10, 10, 10, 10], 1, 5, "1d", "None", "linear"),
([24, 14, 14, 14], 1, 5, "1d", "@once", "no_structure"),
([25, 15, 15, 15], 1, 5, "1d", "@once", "linear"),
([24, 26, 29, 32], 1, 5, "1d", "30m", "no_structure"),
([25, 28, 32, 36], 1, 5, "1d", "30m", "linear"),
([25, 28, 32, 36], 1, 5, "1d", "30m", "binary_tree"),
([25, 28, 32, 36], 1, 5, "1d", "30m", "star"),
([25, 28, 32, 36], 1, 5, "1d", "30m", "grid"),
([40, 14, 14, 14], 1, 5, "1d", "@once", "no_structure"),
([42, 15, 15, 15], 1, 5, "1d", "@once", "linear"),
([40, 26, 29, 32], 1, 5, "1d", "30m", "no_structure"),
([42, 28, 32, 36], 1, 5, "1d", "30m", "linear"),
([42, 28, 32, 36], 1, 5, "1d", "30m", "binary_tree"),
([42, 28, 32, 36], 1, 5, "1d", "30m", "star"),
([42, 28, 32, 36], 1, 5, "1d", "30m", "grid"),
# 10 DAGs with 10 tasks per DAG file.
([10, 10, 10, 10], 10, 10, "1d", "None", "no_structure"),
([10, 10, 10, 10], 10, 10, "1d", "None", "linear"),
([218, 69, 69, 69], 10, 10, "1d", "@once", "no_structure"),
([228, 84, 84, 84], 10, 10, "1d", "@once", "linear"),
([218, 87, 87, 87], 10, 10, "1d", "@once", "no_structure"),
([249, 104, 104, 104], 10, 10, "1d", "@once", "linear"),
([217, 119, 119, 119], 10, 10, "1d", "30m", "no_structure"),
([2227, 145, 145, 145], 10, 10, "1d", "30m", "linear"),
([227, 139, 139, 139], 10, 10, "1d", "30m", "binary_tree"),
([227, 139, 139, 139], 10, 10, "1d", "30m", "star"),
([227, 259, 259, 259], 10, 10, "1d", "30m", "grid"),
([249, 139, 139, 139], 10, 10, "1d", "30m", "binary_tree"),
([249, 139, 139, 139], 10, 10, "1d", "30m", "star"),
([249, 259, 259, 259], 10, 10, "1d", "30m", "grid"),
],
)
def test_process_dags_queries_count(
Expand Down
Loading