diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f2221ed27e15e..c24a71e6f9449 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -413,6 +413,7 @@ repos: ^airflow-ctl.*\.py$| ^airflow-core/src/airflow/models/.*\.py$| ^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$| + ^providers/cncf/kubernetes/.*\.py$| ^dev/airflow_perf/scheduler_dag_execution_timing.py$| ^providers/openlineage/.*\.py$| ^task_sdk.*\.py$ diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py index 2013e1c05813f..3646ff8b75c31 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py @@ -27,6 +27,7 @@ from kubernetes.client.rest import ApiException as SyncApiException from kubernetes_asyncio.client.exceptions import ApiException as AsyncApiException from slugify import slugify +from sqlalchemy import select from urllib3.exceptions import HTTPError from airflow.configuration import conf @@ -175,15 +176,14 @@ def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey: raise RuntimeError("Session not configured. Call configure_orm() first.") session = Session() - task_instance_run_id = ( - session.query(TaskInstance.run_id) + task_instance_run_id = session.scalar( + select(TaskInstance.run_id) .join(TaskInstance.dag_run) - .filter( + .where( TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id, getattr(DagRun, logical_date_key) == logical_date, ) - .scalar() ) else: task_instance_run_id = annotation_run_id