diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index abab6658d339b..234960a5ef8ff 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -371,7 +371,7 @@ def dag_versions(self) -> list[DagVersion]: """Return the DAG versions associated with the TIs of this DagRun.""" # when the dag is in a versioned bundle, we keep the dag version fixed if self.bundle_version: - return [self.created_dag_version] + return [self.created_dag_version] if self.created_dag_version is not None else [] dag_versions = [ dv for dv in dict.fromkeys(list(self._tih_dag_versions) + list(self._ti_dag_versions)) diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 227c14cfd3822..ced147148d70c 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -1260,6 +1260,27 @@ def test_dag_run_version_number(self, dag_maker, session): # the latest task instance dag_version assert dag_run.version_number == dag_v.version_number + def test_dag_run_dag_versions_with_null_created_dag_version(self, dag_maker, session): + """Test that dag_versions returns empty list when created_dag_version is None and bundle_version is populated.""" + with dag_maker( + "test_dag_run_null_created_dag_version", + schedule=datetime.timedelta(days=1), + start_date=DEFAULT_DATE, + ): + EmptyOperator(task_id="empty") + dag_run = dag_maker.create_dagrun() + + dag_run.bundle_version = "some_bundle_version" + dag_run.created_dag_version_id = None + dag_run.created_dag_version = None + session.merge(dag_run) + session.flush() + + # This should return empty list, not [None] + assert dag_run.dag_versions == [] + assert isinstance(dag_run.dag_versions, list) + assert len(dag_run.dag_versions) == 0 + def test_dagrun_success_deadline(self, dag_maker, session): def on_success_callable(context): assert context["dag_run"].dag_id == "test_dagrun_success_callback"