diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 46d505f55ab68..f2221ed27e15e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -413,6 +413,7 @@ repos: ^airflow-ctl.*\.py$| ^airflow-core/src/airflow/models/.*\.py$| ^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$| + ^dev/airflow_perf/scheduler_dag_execution_timing.py$| ^providers/openlineage/.*\.py$| ^task_sdk.*\.py$ pass_filenames: true diff --git a/dev/airflow_perf/scheduler_dag_execution_timing.py b/dev/airflow_perf/scheduler_dag_execution_timing.py index 47eaf92d7c85c..48427f35ce06e 100755 --- a/dev/airflow_perf/scheduler_dag_execution_timing.py +++ b/dev/airflow_perf/scheduler_dag_execution_timing.py @@ -27,6 +27,7 @@ from operator import attrgetter import rich_click as click +from sqlalchemy import delete, update from airflow.jobs.job import run_job from airflow.utils.types import DagRunTriggeredByType @@ -135,9 +136,9 @@ def reset_dag(dag, session): TI = airflow.models.TaskInstance dag_id = dag.dag_id - session.query(DM).filter(DM.dag_id == dag_id).update({"is_paused": False}) - session.query(DR).filter(DR.dag_id == dag_id).delete() - session.query(TI).filter(TI.dag_id == dag_id).delete() + session.execute(update(DM).where(DM.dag_id == dag_id).values(is_paused=False)) + session.execute(delete(DR).where(DR.dag_id == dag_id)) + session.execute(delete(TI).where(TI.dag_id == dag_id)) def pause_all_dags(session): @@ -146,7 +147,7 @@ def pause_all_dags(session): """ from airflow.models.dag import DagModel - session.query(DagModel).update({"is_paused": True}) + session.execute(update(DagModel).values(is_paused=True)) def create_dag_runs(dag, num_runs, session):