diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 939039b0827fb..f30c05e80f2c6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -425,6 +425,7 @@ repos: ^airflow-ctl.*\.py$| ^airflow-core/src/airflow/models/.*\.py$| ^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$| + ^airflow-core/tests/unit/models/test_xcom.py$| ^airflow-core/tests/unit/utils/test_db_cleanup.py$| ^dev/airflow_perf/scheduler_dag_execution_timing.py$| ^providers/openlineage/.*\.py$| diff --git a/airflow-core/tests/unit/models/test_xcom.py b/airflow-core/tests/unit/models/test_xcom.py index 522e736f4efac..f1da8e48d27a3 100644 --- a/airflow-core/tests/unit/models/test_xcom.py +++ b/airflow-core/tests/unit/models/test_xcom.py @@ -23,6 +23,7 @@ from unittest.mock import MagicMock import pytest +from sqlalchemy import delete, func, select from airflow._shared.timezones import timezone from airflow.configuration import conf @@ -88,7 +89,7 @@ def func(*, dag_id, task_id, logical_date, run_after=None): def cleanup_database(): # This should also clear task instances by cascading. - session.query(DagRun).filter_by(id=run.id).delete() + session.execute(delete(DagRun).where(DagRun.id == run.id)) session.commit() request.addfinalizer(cleanup_database) @@ -384,7 +385,7 @@ def test_xcom_set(self, session, task_instance, key, value, expected_value): run_id=task_instance.run_id, session=session, ) - stored_xcoms = session.query(XComModel).all() + stored_xcoms = session.scalars(select(XComModel)).all() assert stored_xcoms[0].key == key assert isinstance(stored_xcoms[0].value, type(json.dumps(expected_value))) assert stored_xcoms[0].value == json.dumps(expected_value) @@ -398,7 +399,7 @@ def setup_for_xcom_set_again_replace(self, task_instance, push_simple_json_xcom) @pytest.mark.usefixtures("setup_for_xcom_set_again_replace") def test_xcom_set_again_replace(self, session, task_instance): - assert session.query(XComModel).one().value == json.dumps({"key1": "value1"}) + assert session.scalar(select(XComModel)).value == json.dumps({"key1": "value1"}) XComModel.set( key="xcom_1", value={"key2": "value2"}, @@ -407,7 +408,7 @@ def test_xcom_set_again_replace(self, session, task_instance): run_id=task_instance.run_id, session=session, ) - assert session.query(XComModel).one().value == json.dumps({"key2": "value2"}) + assert session.scalar(select(XComModel)).value == json.dumps({"key2": "value2"}) def test_xcom_set_invalid_key(self, session, task_instance): """Test that setting an XCom with an invalid key raises a ValueError.""" @@ -440,14 +441,14 @@ def setup_for_xcom_clear(self, task_instance, push_simple_json_xcom): @pytest.mark.usefixtures("setup_for_xcom_clear") @mock.patch("airflow.models.xcom.XCom.purge") def test_xcom_clear(self, mock_purge, session, task_instance): - assert session.query(XComModel).count() == 1 + assert session.scalar(select(func.count()).select_from(XComModel)) == 1 XComModel.clear( dag_id=task_instance.dag_id, task_id=task_instance.task_id, run_id=task_instance.run_id, session=session, ) - assert session.query(XComModel).count() == 0 + assert session.scalar(select(func.count()).select_from(XComModel)) == 0 # purge will not be done when we clear, will be handled in task sdk assert mock_purge.call_count == 0 @@ -459,7 +460,7 @@ def test_xcom_clear_different_run(self, session, task_instance): run_id="different_run", session=session, ) - assert session.query(XComModel).count() == 1 + assert session.scalar(select(func.count()).select_from(XComModel)) == 1 class TestXComRoundTrip: