From bf087596ed69f8f7a9f0aee2efc1b73b88df9d63 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 17 Oct 2025 09:17:09 +0100 Subject: [PATCH 1/5] Ensure dag.test uses serialized dag for testing While `dag test` command uses serialized dag, dag.test was using in-memory serialized dag making direct usage of dag.test method resulting in error. This PR fixes this and ensures dag.test parses dag if the dag is not parsed --- airflow-core/tests/unit/models/test_dag.py | 24 ++++++++++++++ task-sdk/src/airflow/sdk/definitions/dag.py | 36 ++++++++++++++++----- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index ee0445bc86916..5746ee7fd410c 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -80,6 +80,7 @@ from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.asserts import assert_queries_count +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.dag import create_scheduler_dag, sync_dag_to_db from tests_common.test_utils.db import ( clear_db_assets, @@ -179,6 +180,29 @@ def setup_method(self) -> None: clear_db_dags() clear_db_assets() + @conf_vars({("core", "load_examples"): "false"}) + def test_dag_test_auto_parses_when_not_serialized(self, testing_dag_bundle, session): + """ + DAG.test() should auto-parse and sync the DAG if it's not serialized yet. + """ + from airflow.models.dagbag import DBDagBag + + dag_id = "test_example_bash_operator" + + # Ensure not serialized yet + assert DBDagBag().get_latest_version_of_dag(dag_id, session=session) is None + assert session.query(DagRun).filter(DagRun.dag_id == dag_id).scalar() is None + + dag = DAG(dag_id=dag_id, schedule=None) + + dr = dag.test() + assert dr is not None + + # Serialized DAG should now exist and DagRun would be created + ser = DBDagBag().get_latest_version_of_dag(dag_id, session=session) + assert ser is not None + assert session.query(DagRun).filter(DagRun.dag_id == dag_id).scalar() is not None + def teardown_method(self) -> None: clear_db_runs() clear_db_dags() diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 4a5f9bd92f11b..eea694dcbc658 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1197,17 +1197,37 @@ def test( data_interval = ( self.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None ) - scheduler_dag = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self)) # type: ignore[arg-type] - # Preserve callback functions from original Dag since they're lost during serialization - # and yes it is a hack for now! It is a tradeoff for code simplicity. - # Without it, we need "Scheduler Dag" (Serialized dag) for the scheduler bits - # -- dep check, scheduling tis - # and need real dag to get and run callbacks without having to load the dag model + from airflow.models.dagbag import DBDagBag + + scheduler_dag = DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session) + if not scheduler_dag: + from airflow.dag_processing.bundles.manager import DagBundlesManager + from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db + from airflow.sdk.definitions._internal.dag_parsing_context import ( + _airflow_parsing_context_manager, + ) + manager = DagBundlesManager() + manager.sync_bundles_to_db(session=session) + # sync all bundles? or use the dags-folder bundle? + # What if the test dag is in a different bundle? + for bundle in manager.get_all_dag_bundles(): + if not bundle.is_initialized: + bundle.initialize() + with _airflow_parsing_context_manager(dag_id=self.dag_id): + dagbag = DagBag( + dag_folder=bundle.path, bundle_path=bundle.path, include_examples=False + ) + sync_bag_to_db(dagbag, bundle.name, bundle.version) + scheduler_dag = DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session) + if scheduler_dag: + break + if not scheduler_dag: + raise RuntimeError("Dag not found after syncing to DB") # Scheduler DAG shouldn't have these attributes, but assigning them # here is an easy hack to get this test() thing working. - scheduler_dag.on_success_callback = self.on_success_callback # type: ignore[attr-defined] - scheduler_dag.on_failure_callback = self.on_failure_callback # type: ignore[attr-defined] + scheduler_dag.on_success_callback = self.on_success_callback # type: ignore[attr-defined, union-attr] + scheduler_dag.on_failure_callback = self.on_failure_callback # type: ignore[attr-defined, union-attr] dr: DagRun = get_or_create_dagrun( dag=scheduler_dag, From c4c490786f9869f08dbc95c1856e3404c664c408 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 17 Oct 2025 13:27:13 +0100 Subject: [PATCH 2/5] fixup! Ensure dag.test uses serialized dag for testing --- airflow-core/tests/unit/models/test_dag.py | 7 ++++--- task-sdk/src/airflow/sdk/definitions/dag.py | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 5746ee7fd410c..22e882fa006b0 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -181,7 +181,7 @@ def setup_method(self) -> None: clear_db_assets() @conf_vars({("core", "load_examples"): "false"}) - def test_dag_test_auto_parses_when_not_serialized(self, testing_dag_bundle, session): + def test_dag_test_auto_parses_when_not_serialized(self, test_dags_bundle, session): """ DAG.test() should auto-parse and sync the DAG if it's not serialized yet. """ @@ -189,12 +189,13 @@ def test_dag_test_auto_parses_when_not_serialized(self, testing_dag_bundle, sess dag_id = "test_example_bash_operator" + dagbag = DagBag(dag_folder=os.fspath(TEST_DAGS_FOLDER), include_examples=False) + dag = dagbag.dags.get(dag_id) + # Ensure not serialized yet assert DBDagBag().get_latest_version_of_dag(dag_id, session=session) is None assert session.query(DagRun).filter(DagRun.dag_id == dag_id).scalar() is None - dag = DAG(dag_id=dag_id, schedule=None) - dr = dag.test() assert dr is not None diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index eea694dcbc658..38f57c3bedf03 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1209,6 +1209,7 @@ def test( manager = DagBundlesManager() manager.sync_bundles_to_db(session=session) + session.commit() # sync all bundles? or use the dags-folder bundle? # What if the test dag is in a different bundle? for bundle in manager.get_all_dag_bundles(): From 4ee51d444618ab4909fd928211303062cb7ac708 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 17 Oct 2025 13:52:07 +0100 Subject: [PATCH 3/5] Update query to v2 and do backcompat for 3.1 --- airflow-core/tests/unit/models/test_dag.py | 6 +++--- task-sdk/src/airflow/sdk/definitions/dag.py | 10 ++++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 22e882fa006b0..09835f58e2ccd 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -53,6 +53,7 @@ get_asset_triggered_next_run_info, get_next_data_interval, ) +from airflow.models.dagbag import DBDagBag from airflow.models.dagbundle import DagBundleModel from airflow.models.dagrun import DagRun from airflow.models.serialized_dag import SerializedDagModel @@ -185,7 +186,6 @@ def test_dag_test_auto_parses_when_not_serialized(self, test_dags_bundle, sessio """ DAG.test() should auto-parse and sync the DAG if it's not serialized yet. """ - from airflow.models.dagbag import DBDagBag dag_id = "test_example_bash_operator" @@ -194,7 +194,7 @@ def test_dag_test_auto_parses_when_not_serialized(self, test_dags_bundle, sessio # Ensure not serialized yet assert DBDagBag().get_latest_version_of_dag(dag_id, session=session) is None - assert session.query(DagRun).filter(DagRun.dag_id == dag_id).scalar() is None + assert session.scalar(select(DagRun).where(DagRun.dag_id == dag_id)) is None dr = dag.test() assert dr is not None @@ -202,7 +202,7 @@ def test_dag_test_auto_parses_when_not_serialized(self, test_dags_bundle, sessio # Serialized DAG should now exist and DagRun would be created ser = DBDagBag().get_latest_version_of_dag(dag_id, session=session) assert ser is not None - assert session.query(DagRun).filter(DagRun.dag_id == dag_id).scalar() is not None + assert session.scalar(select(DagRun).where(DagRun.dag_id == dag_id)) is not None def teardown_method(self) -> None: clear_db_runs() diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 38f57c3bedf03..3a9cade6b9ef8 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1202,7 +1202,7 @@ def test( scheduler_dag = DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session) if not scheduler_dag: from airflow.dag_processing.bundles.manager import DagBundlesManager - from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db + from airflow.dag_processing.dagbag import DagBag from airflow.sdk.definitions._internal.dag_parsing_context import ( _airflow_parsing_context_manager, ) @@ -1219,7 +1219,13 @@ def test( dagbag = DagBag( dag_folder=bundle.path, bundle_path=bundle.path, include_examples=False ) - sync_bag_to_db(dagbag, bundle.name, bundle.version) + try: + from airflow.dag_processing.dagbag import sync_bag_to_db + + sync_bag_to_db(dagbag, bundle.name, bundle.version) + except ImportError: + # backwards compatibility for 3.1 + dagbag.sync_bag_to_db(bundle.name, bundle.version) # type: ignore[attr-defined] scheduler_dag = DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session) if scheduler_dag: break From 670fe1254f496e673626f36b9b10307dda29dd29 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 17 Oct 2025 14:14:06 +0100 Subject: [PATCH 4/5] Remove backcompat --- task-sdk/src/airflow/sdk/definitions/dag.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 3a9cade6b9ef8..2b7ef9f9e464d 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1202,7 +1202,7 @@ def test( scheduler_dag = DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session) if not scheduler_dag: from airflow.dag_processing.bundles.manager import DagBundlesManager - from airflow.dag_processing.dagbag import DagBag + from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db from airflow.sdk.definitions._internal.dag_parsing_context import ( _airflow_parsing_context_manager, ) @@ -1219,13 +1219,8 @@ def test( dagbag = DagBag( dag_folder=bundle.path, bundle_path=bundle.path, include_examples=False ) - try: - from airflow.dag_processing.dagbag import sync_bag_to_db - - sync_bag_to_db(dagbag, bundle.name, bundle.version) - except ImportError: - # backwards compatibility for 3.1 - dagbag.sync_bag_to_db(bundle.name, bundle.version) # type: ignore[attr-defined] + sync_bag_to_db(dagbag, bundle.name, bundle.version) + # type: ignore[attr-defined] scheduler_dag = DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session) if scheduler_dag: break From 7f5e18c917c41a76e53a644875fcfdb9e3490003 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 17 Oct 2025 16:38:57 +0100 Subject: [PATCH 5/5] Return previous behaviour with SerializedDag --- task-sdk/src/airflow/sdk/definitions/dag.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 2b7ef9f9e464d..305e5bcbdd342 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -1197,10 +1197,10 @@ def test( data_interval = ( self.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None ) - from airflow.models.dagbag import DBDagBag + from airflow.models.dag_version import DagVersion - scheduler_dag = DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session) - if not scheduler_dag: + version = DagVersion.get_version(self.dag_id) + if not version: from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db from airflow.sdk.definitions._internal.dag_parsing_context import ( @@ -1220,12 +1220,16 @@ def test( dag_folder=bundle.path, bundle_path=bundle.path, include_examples=False ) sync_bag_to_db(dagbag, bundle.name, bundle.version) - # type: ignore[attr-defined] - scheduler_dag = DBDagBag().get_latest_version_of_dag(dag_id=self.dag_id, session=session) - if scheduler_dag: + version = DagVersion.get_version(self.dag_id) + if version: break - if not scheduler_dag: - raise RuntimeError("Dag not found after syncing to DB") + scheduler_dag = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self)) + # Preserve callback functions from original Dag since they're lost during serialization + # and yes it is a hack for now! It is a tradeoff for code simplicity. + # Without it, we need "Scheduler Dag" (Serialized dag) for the scheduler bits + # -- dep check, scheduling tis + # and need real dag to get and run callbacks without having to load the dag model + # Scheduler DAG shouldn't have these attributes, but assigning them # here is an easy hack to get this test() thing working. scheduler_dag.on_success_callback = self.on_success_callback # type: ignore[attr-defined, union-attr]