Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/databricks/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.10.0",
"apache-airflow-providers-common-compat>=1.6.0",
"apache-airflow-providers-common-compat>=1.7.4", # TODO: bump to next version
"apache-airflow-providers-common-sql>=1.27.0",
"requests>=2.32.0,<3",
"databricks-sql-connector>=4.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.common.compat.sdk import BaseOperator, BaseOperatorLink, XCom
from airflow.providers.databricks.hooks.databricks import (
DatabricksHook,
RunLifeCycleState,
Expand All @@ -51,7 +52,7 @@
validate_trigger_event,
)
from airflow.providers.databricks.utils.mixins import DatabricksSQLStatementsMixin
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
Expand All @@ -62,13 +63,6 @@
from airflow.sdk import TaskGroup
from airflow.sdk.types import Context, Logger

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
from airflow.sdk.execution_time.xcom import XCom
else:
from airflow.models import XCom
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]

DEFER_METHOD_NAME = "execute_complete"
XCOM_RUN_ID_KEY = "run_id"
XCOM_JOB_ID_KEY = "job_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@
from urllib.parse import urlsplit

from airflow.exceptions import AirflowException
from airflow.providers.common.compat.sdk import BaseOperator
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.version_compat import BaseOperator

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
except ImportError:
# TODO: Remove once provider drops support for Airflow 2
from airflow.utils.context import Context
from airflow.providers.common.compat.sdk import Context


class DatabricksReposCreateOperator(BaseOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
from databricks.sql.utils import ParamEscaper

from airflow.exceptions import AirflowException
from airflow.providers.common.compat.sdk import BaseOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
from airflow.providers.databricks.version_compat import BaseOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
from airflow.providers.common.compat.sdk import Context


class DatabricksSqlOperator(SQLExecuteQueryOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,20 @@
from mergedeep import merge

from airflow.exceptions import AirflowException
from airflow.providers.common.compat.sdk import BaseOperator, TaskGroup
from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState
from airflow.providers.databricks.plugins.databricks_workflow import (
WorkflowJobRepairAllFailedLink,
WorkflowJobRunLink,
store_databricks_job_run_link,
)
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator

try:
from airflow.sdk import TaskGroup
except ImportError:
from airflow.utils.task_group import TaskGroup # type: ignore[no-redef]
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from types import TracebackType

from airflow.models.taskmixin import DAGNode
from airflow.utils.context import Context
from airflow.providers.common.compat.sdk import Context


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey, clear_task_instances
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.common.compat.sdk import BaseOperatorLink, TaskGroup, XCom
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperatorLink, TaskGroup, XCom
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.models import BaseOperator
from airflow.providers.common.compat.sdk import Context
from airflow.providers.databricks.operators.databricks import DatabricksTaskBaseOperator
from airflow.sdk.types import Logger
from airflow.utils.context import Context


REPAIR_WAIT_ATTEMPTS = os.getenv("DATABRICKS_REPAIR_WAIT_ATTEMPTS", 20)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,13 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.common.compat.sdk import BaseSensorOperator
from airflow.providers.databricks.hooks.databricks import DatabricksHook, SQLStatementState
from airflow.providers.databricks.operators.databricks import DEFER_METHOD_NAME
from airflow.providers.databricks.utils.mixins import DatabricksSQLStatementsMixin
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.bases.sensor import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
from airflow.utils.context import Context
from airflow.providers.common.compat.sdk import Context

XCOM_STATEMENT_ID_KEY = "statement_id"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,12 @@
from databricks.sql.utils import ParamEscaper

from airflow.exceptions import AirflowException
from airflow.providers.common.compat.sdk import BaseSensorOperator
from airflow.providers.common.sql.hooks.handlers import fetch_all_handler
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
except ImportError:
# TODO: Remove once provider drops support for Airflow 2
from airflow.utils.context import Context
from airflow.providers.common.compat.sdk import Context


class DatabricksPartitionSensor(BaseSensorOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,12 @@
from typing import TYPE_CHECKING, Any

from airflow.exceptions import AirflowException
from airflow.providers.common.compat.sdk import BaseSensorOperator
from airflow.providers.common.sql.hooks.handlers import fetch_all_handler
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
except ImportError:
# TODO: Remove once provider drops support for Airflow 2
from airflow.utils.context import Context
from airflow.providers.common.compat.sdk import Context


class DatabricksSqlSensor(BaseSensorOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:

AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperator, BaseOperatorLink, TaskGroup
from airflow.sdk.execution_time.xcom import XCom
else:
from airflow.models import BaseOperator, XCom
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]
from airflow.utils.task_group import TaskGroup # type: ignore[no-redef]

__all__ = [
"AIRFLOW_V_3_0_PLUS",
"BaseOperator",
"BaseOperatorLink",
"TaskGroup",
"XCom",
]