diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 81e24f2e8a3b3..46d505f55ab68 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -413,6 +413,7 @@ repos: ^airflow-ctl.*\.py$| ^airflow-core/src/airflow/models/.*\.py$| ^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$| + ^providers/openlineage/.*\.py$| ^task_sdk.*\.py$ pass_filenames: true - id: update-supported-versions diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index 6f70672516cc5..7368bc8e8a16d 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -531,7 +531,7 @@ def is_selective_lineage_enabled(obj: DAG | SerializedDAG | AnyOperator) -> bool @provide_session def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION): - from sqlalchemy import exists + from sqlalchemy import exists, select if not isinstance(ti.task, BaseSensorOperator): return False @@ -540,21 +540,27 @@ def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION): return False if AIRFLOW_V_3_0_PLUS: return ( - session.query( - exists().where(TaskReschedule.ti_id == ti.id, TaskReschedule.try_number == ti.try_number) - ).scalar() + session.scalar( + select( + exists().where( + TaskReschedule.ti_id == ti.id, TaskReschedule.try_number == ti.try_number + ) + ) + ) is True ) return ( - session.query( - exists().where( - TaskReschedule.dag_id == ti.dag_id, - TaskReschedule.task_id == ti.task_id, - TaskReschedule.run_id == ti.run_id, - TaskReschedule.map_index == ti.map_index, - TaskReschedule.try_number == ti.try_number, + session.scalar( + select( + exists().where( + TaskReschedule.dag_id == ti.dag_id, + TaskReschedule.task_id == ti.task_id, + TaskReschedule.run_id == ti.run_id, + TaskReschedule.map_index == ti.map_index, + TaskReschedule.try_number == ti.try_number, + ) ) - ).scalar() + ) is True )