Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ repos:
^airflow-ctl.*\.py$|
^airflow-core/src/airflow/models/.*\.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py$|
^airflow-core/tests/unit/cli/commands/test_task_command.py$|
^airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py$|
^airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py$|
^airflow-core/tests/unit/models/test_renderedtifields.py$|
^airflow-core/tests/unit/models/test_timestamp.py$|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from unittest.mock import ANY

import pytest
from sqlalchemy import select

from airflow.models.team import Team
from airflow.models.variable import Variable
Expand Down Expand Up @@ -65,7 +66,7 @@ def create_file_upload(content: dict) -> BytesIO:

@provide_session
def _create_variables(session) -> None:
team = session.query(Team).where(Team.name == "test").one()
team = session.scalars(select(Team).where(Team.name == "test")).one()

Variable.set(
key=TEST_VARIABLE_KEY,
Expand Down Expand Up @@ -130,13 +131,13 @@ def create_variables(self):
class TestDeleteVariable(TestVariableEndpoint):
def test_delete_should_respond_204(self, test_client, session):
self.create_variables()
variables = session.query(Variable).all()
variables = session.scalars(select(Variable)).all()
assert len(variables) == 5
response = test_client.delete(f"/variables/{TEST_VARIABLE_KEY}")
assert response.status_code == 204
response = test_client.delete(f"/variables/{TEST_VARIABLE_KEY4}")
assert response.status_code == 204
variables = session.query(Variable).all()
variables = session.scalars(select(Variable)).all()
assert len(variables) == 3
check_last_log(session, dag_id=None, event="delete_variable", logical_date=None)

Expand Down
11 changes: 5 additions & 6 deletions airflow-core/tests/unit/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from unittest import mock

import pytest
from sqlalchemy import delete

from airflow._shared.timezones import timezone
from airflow.cli import cli_parser
Expand Down Expand Up @@ -63,12 +64,10 @@

def reset(dag_id):
with create_session() as session:
tis = session.query(TaskInstance).filter_by(dag_id=dag_id)
tis.delete()
runs = session.query(DagRun).filter_by(dag_id=dag_id)
runs.delete()
session.query(DagModel).filter_by(dag_id=dag_id).delete()
session.query(SerializedDagModel).filter_by(dag_id=dag_id).delete()
session.execute(delete(TaskInstance).where(TaskInstance.dag_id == dag_id))
session.execute(delete(DagRun).where(DagRun.dag_id == dag_id))
session.execute(delete(DagModel).where(DagModel.dag_id == dag_id))
session.execute(delete(SerializedDagModel).where(SerializedDagModel.dag_id == dag_id))


@contextmanager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from unittest.mock import patch

import pytest
from sqlalchemy import func, select

from airflow.dag_processing.bundles.base import BaseDagBundle
from airflow.dag_processing.bundles.manager import DagBundlesManager
Expand Down Expand Up @@ -155,7 +156,9 @@ def clear_db():
@conf_vars({("core", "LOAD_EXAMPLES"): "False"})
def test_sync_bundles_to_db(clear_db, session):
def _get_bundle_names_and_active():
return session.query(DagBundleModel.name, DagBundleModel.active).order_by(DagBundleModel.name).all()
return session.execute(
select(DagBundleModel.name, DagBundleModel.active).order_by(DagBundleModel.name)
).all()

# Initial add
with patch.dict(
Expand All @@ -182,7 +185,7 @@ def _get_bundle_names_and_active():
("my-test-bundle", False),
]
# Since my-test-bundle is inactive, the associated import errors should be deleted
assert session.query(ParseImportError).count() == 0
assert session.scalar(select(func.count(ParseImportError.id))) == 0

# Re-enable one that reappears in config
with patch.dict(
Expand Down Expand Up @@ -250,7 +253,7 @@ def test_sync_bundles_to_db_with_template(clear_db, session):
manager.sync_bundles_to_db()

# Check that the template and parameters were stored
bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first()
bundle_model = session.scalars(select(DagBundleModel).filter_by(name="template-bundle").limit(1)).first()

session.merge(bundle_model)

Expand All @@ -269,7 +272,9 @@ def test_bundle_model_render_url(clear_db, session):
):
manager = DagBundlesManager()
manager.sync_bundles_to_db()
bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first()
bundle_model = session.scalars(
select(DagBundleModel).filter_by(name="template-bundle").limit(1)
).first()

session.merge(bundle_model)
assert bundle_model is not None
Expand All @@ -291,7 +296,7 @@ def test_template_params_update_on_sync(clear_db, session):
manager.sync_bundles_to_db()

# Verify initial template and parameters
bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first()
bundle_model = session.scalars(select(DagBundleModel).filter_by(name="template-bundle").limit(1)).first()
url = bundle_model._unsign_url()
assert url == "https://github.com/example/repo/tree/{version}/{subdir}"
assert bundle_model.template_params == {"subdir": "dags"}
Expand All @@ -316,7 +321,7 @@ def test_template_params_update_on_sync(clear_db, session):
manager.sync_bundles_to_db()

# Verify the template and parameters were updated
bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first()
bundle_model = session.scalars(select(DagBundleModel).filter_by(name="template-bundle").limit(1)).first()
url = bundle_model._unsign_url()
assert url == "https://gitlab.com/example/repo/-/tree/{version}/{subdir}"
assert bundle_model.template_params == {"subdir": "workflows"}
Expand All @@ -335,7 +340,7 @@ def test_template_update_on_sync(clear_db, session):
manager.sync_bundles_to_db()

# Verify initial template
bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first()
bundle_model = session.scalars(select(DagBundleModel).filter_by(name="template-bundle").limit(1)).first()
url = bundle_model._unsign_url()
assert url == "https://github.com/example/repo/tree/{version}/{subdir}"
assert bundle_model.render_url(version="v1") == "https://github.com/example/repo/tree/v1/dags"
Expand All @@ -360,7 +365,7 @@ def test_template_update_on_sync(clear_db, session):
manager.sync_bundles_to_db()

# Verify the template was updated
bundle_model = session.query(DagBundleModel).filter_by(name="template-bundle").first()
bundle_model = session.scalars(select(DagBundleModel).filter_by(name="template-bundle").limit(1)).first()
url = bundle_model._unsign_url()
assert url == "https://gitlab.com/example/repo/-/tree/{version}/{subdir}"
assert bundle_model.render_url("v1") == "https://gitlab.com/example/repo/-/tree/v1/dags"
Expand Down Expand Up @@ -438,7 +443,7 @@ def test_multiple_bundles_one_fails(clear_db, session):
assert isinstance(bundles[0], BasicBundle)

manager.sync_bundles_to_db()
bundle_names = {b.name for b in session.query(DagBundleModel).all()}
bundle_names = {b.name for b in session.scalars(select(DagBundleModel)).all()}
assert bundle_names == {"my-test-bundle"}


Expand Down
Loading