diff --git a/providers/databricks/pyproject.toml b/providers/databricks/pyproject.toml index da312a725b5e4..0bf95edcd7b7e 100644 --- a/providers/databricks/pyproject.toml +++ b/providers/databricks/pyproject.toml @@ -58,7 +58,7 @@ requires-python = ">=3.10" # After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` dependencies = [ "apache-airflow>=2.10.0", - "apache-airflow-providers-common-compat>=1.6.0", + "apache-airflow-providers-common-compat>=1.7.4", # TODO: bump to next version "apache-airflow-providers-common-sql>=1.27.0", "requests>=2.32.0,<3", "databricks-sql-connector>=4.0.0", diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 2c1607bb53df0..3e16daf80cda3 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -28,6 +28,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException +from airflow.providers.common.compat.sdk import BaseOperator, BaseOperatorLink, XCom from airflow.providers.databricks.hooks.databricks import ( DatabricksHook, RunLifeCycleState, @@ -51,7 +52,7 @@ validate_trigger_event, ) from airflow.providers.databricks.utils.mixins import DatabricksSQLStatementsMixin -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey @@ -62,13 +63,6 @@ from airflow.sdk import TaskGroup from airflow.sdk.types import Context, Logger -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseOperatorLink - from airflow.sdk.execution_time.xcom import XCom -else: - from airflow.models import XCom - from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] - DEFER_METHOD_NAME = "execute_complete" XCOM_RUN_ID_KEY = "run_id" XCOM_JOB_ID_KEY = "job_id" diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py index 2d5f6db840480..55a29d0e41e92 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks_repos.py @@ -26,15 +26,11 @@ from urllib.parse import urlsplit from airflow.exceptions import AirflowException +from airflow.providers.common.compat.sdk import BaseOperator from airflow.providers.databricks.hooks.databricks import DatabricksHook -from airflow.providers.databricks.version_compat import BaseOperator if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.providers.common.compat.sdk import Context class DatabricksReposCreateOperator(BaseOperator): diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py index 5d9b08a662652..4c7e113e9d9c6 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks_sql.py @@ -28,12 +28,12 @@ from databricks.sql.utils import ParamEscaper from airflow.exceptions import AirflowException +from airflow.providers.common.compat.sdk import BaseOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook -from airflow.providers.databricks.version_compat import BaseOperator if TYPE_CHECKING: - from airflow.utils.context import Context + from airflow.providers.common.compat.sdk import Context class DatabricksSqlOperator(SQLExecuteQueryOperator): diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py index 836858bb969ed..a7fc847199b98 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py @@ -26,24 +26,20 @@ from mergedeep import merge from airflow.exceptions import AirflowException +from airflow.providers.common.compat.sdk import BaseOperator, TaskGroup from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState from airflow.providers.databricks.plugins.databricks_workflow import ( WorkflowJobRepairAllFailedLink, WorkflowJobRunLink, store_databricks_job_run_link, ) -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator - -try: - from airflow.sdk import TaskGroup -except ImportError: - from airflow.utils.task_group import TaskGroup # type: ignore[no-redef] +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from types import TracebackType from airflow.models.taskmixin import DAGNode - from airflow.utils.context import Context + from airflow.providers.common.compat.sdk import Context @dataclass diff --git a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py index c0fc3360e1931..651eaf18fdca2 100644 --- a/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py @@ -25,8 +25,9 @@ from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance, TaskInstanceKey, clear_task_instances from airflow.plugins_manager import AirflowPlugin +from airflow.providers.common.compat.sdk import BaseOperatorLink, TaskGroup, XCom from airflow.providers.databricks.hooks.databricks import DatabricksHook -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperatorLink, TaskGroup, XCom +from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -34,9 +35,9 @@ from sqlalchemy.orm.session import Session from airflow.models import BaseOperator + from airflow.providers.common.compat.sdk import Context from airflow.providers.databricks.operators.databricks import DatabricksTaskBaseOperator from airflow.sdk.types import Logger - from airflow.utils.context import Context REPAIR_WAIT_ATTEMPTS = os.getenv("DATABRICKS_REPAIR_WAIT_ATTEMPTS", 20) diff --git a/providers/databricks/src/airflow/providers/databricks/sensors/databricks.py b/providers/databricks/src/airflow/providers/databricks/sensors/databricks.py index a10c5bdad4b3d..671aa9ec33779 100644 --- a/providers/databricks/src/airflow/providers/databricks/sensors/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/sensors/databricks.py @@ -24,18 +24,13 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException +from airflow.providers.common.compat.sdk import BaseSensorOperator from airflow.providers.databricks.hooks.databricks import DatabricksHook, SQLStatementState from airflow.providers.databricks.operators.databricks import DEFER_METHOD_NAME from airflow.providers.databricks.utils.mixins import DatabricksSQLStatementsMixin -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.bases.sensor import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] if TYPE_CHECKING: - from airflow.utils.context import Context + from airflow.providers.common.compat.sdk import Context XCOM_STATEMENT_ID_KEY = "statement_id" diff --git a/providers/databricks/src/airflow/providers/databricks/sensors/databricks_partition.py b/providers/databricks/src/airflow/providers/databricks/sensors/databricks_partition.py index 49f9ed874e6bc..579cc500bcd7d 100644 --- a/providers/databricks/src/airflow/providers/databricks/sensors/databricks_partition.py +++ b/providers/databricks/src/airflow/providers/databricks/sensors/databricks_partition.py @@ -28,21 +28,12 @@ from databricks.sql.utils import ParamEscaper from airflow.exceptions import AirflowException +from airflow.providers.common.compat.sdk import BaseSensorOperator from airflow.providers.common.sql.hooks.handlers import fetch_all_handler from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.providers.common.compat.sdk import Context class DatabricksPartitionSensor(BaseSensorOperator): diff --git a/providers/databricks/src/airflow/providers/databricks/sensors/databricks_sql.py b/providers/databricks/src/airflow/providers/databricks/sensors/databricks_sql.py index 1eca4c7adae4b..574d9ad1cf2e2 100644 --- a/providers/databricks/src/airflow/providers/databricks/sensors/databricks_sql.py +++ b/providers/databricks/src/airflow/providers/databricks/sensors/databricks_sql.py @@ -25,21 +25,12 @@ from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException +from airflow.providers.common.compat.sdk import BaseSensorOperator from airflow.providers.common.sql.hooks.handlers import fetch_all_handler from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook -from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseSensorOperator -else: - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.providers.common.compat.sdk import Context class DatabricksSqlSensor(BaseSensorOperator): diff --git a/providers/databricks/src/airflow/providers/databricks/version_compat.py b/providers/databricks/src/airflow/providers/databricks/version_compat.py index 3d5385a1eb814..0956edd21112f 100644 --- a/providers/databricks/src/airflow/providers/databricks/version_compat.py +++ b/providers/databricks/src/airflow/providers/databricks/version_compat.py @@ -34,18 +34,6 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseOperator, BaseOperatorLink, TaskGroup - from airflow.sdk.execution_time.xcom import XCom -else: - from airflow.models import BaseOperator, XCom - from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] - from airflow.utils.task_group import TaskGroup # type: ignore[no-redef] - __all__ = [ "AIRFLOW_V_3_0_PLUS", - "BaseOperator", - "BaseOperatorLink", - "TaskGroup", - "XCom", ]