diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py index e682083ca96c1..632a8394f3698 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py @@ -19,8 +19,8 @@ from collections.abc import Iterable, Sequence from typing import IO, TYPE_CHECKING, Any, AnyStr -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook, AzureDataLakeStorageV2Hook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adx.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adx.py index 700d4a963e0fe..289db346fc70a 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adx.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adx.py @@ -24,8 +24,8 @@ from typing import TYPE_CHECKING from airflow.configuration import conf -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.adx import AzureDataExplorerHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from azure.kusto.data._models import KustoResultTable diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/asb.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/asb.py index d74006df4dc8e..6d94d07623d69 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/asb.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/asb.py @@ -20,8 +20,8 @@ from typing import TYPE_CHECKING, Any from uuid import UUID -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: import datetime diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py index 5b366b5b6bdc0..9b6b3d212ec22 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/batch.py @@ -24,8 +24,8 @@ from azure.batch import models as batch_models from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.batch import AzureBatchHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py index ea1cfc4eb2cd5..1f125daab2287 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py @@ -40,10 +40,10 @@ from msrestazure.azure_exceptions import CloudError from airflow.exceptions import AirflowException, AirflowTaskTimeout -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.container_instance import AzureContainerInstanceHook from airflow.providers.microsoft.azure.hooks.container_registry import AzureContainerRegistryHook from airflow.providers.microsoft.azure.hooks.container_volume import AzureContainerVolumeHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/cosmos.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/cosmos.py index 99cee311f4102..a708aa4e20da3 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/cosmos.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/cosmos.py @@ -20,8 +20,8 @@ from collections.abc import Sequence from typing import TYPE_CHECKING -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py index 71ca318b63652..20c5006d08004 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/data_factory.py @@ -25,7 +25,6 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.data_factory import ( AzureDataFactoryHook, AzureDataFactoryPipelineRunException, @@ -33,6 +32,7 @@ get_field, ) from airflow.providers.microsoft.azure.triggers.data_factory import AzureDataFactoryTrigger +from airflow.providers.microsoft.azure.version_compat import BaseOperator from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py index 4f4b3b6dc00cb..f4f6e91fea4de 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py @@ -27,12 +27,12 @@ ) from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, TaskDeferred -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook from airflow.providers.microsoft.azure.triggers.msgraph import ( MSGraphTrigger, ResponseSerializer, ) +from airflow.providers.microsoft.azure.version_compat import BaseOperator from airflow.utils.xcom import XCOM_RETURN_KEY if TYPE_CHECKING: @@ -307,7 +307,7 @@ def push_xcom(self, context: Any, value) -> None: self.key, value, ) - self.xcom_push(context=context, key=self.key, value=value) + context["ti"].xcom_push(key=self.key, value=value) @staticmethod def paginate( diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py index 444100dc667f7..24d32eab18d3a 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py @@ -21,13 +21,13 @@ from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIHook from airflow.providers.microsoft.azure.triggers.powerbi import ( PowerBIDatasetListTrigger, PowerBITrigger, PowerBIWorkspaceListTrigger, ) +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from msgraph_core import APIVersion @@ -141,8 +141,7 @@ def get_refresh_status(self, context: Context, event: dict[str, str] | None = No dataset_refresh_id = event["dataset_refresh_id"] if dataset_refresh_id: - self.xcom_push( - context=context, + context["ti"].xcom_push( key=f"{self.task_id}.powerbi_dataset_refresh_Id", value=dataset_refresh_id, ) @@ -168,8 +167,7 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> Any: Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if event: - self.xcom_push( - context=context, + context["ti"].xcom_push( key=f"{self.task_id}.powerbi_dataset_refresh_status", value=event["dataset_refresh_status"], ) @@ -235,8 +233,7 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> Any: Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if event: - self.xcom_push( - context=context, + context["ti"].xcom_push( key=f"{self.task_id}.powerbi_workspace_ids", value=event["workspace_ids"], ) @@ -306,8 +303,7 @@ def execute_complete(self, context: Context, event: dict[str, str]) -> Any: Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ if event: - self.xcom_push( - context=context, + context["ti"].xcom_push( key=f"{self.task_id}.powerbi_dataset_ids", value=event["dataset_ids"], ) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py index 1a096baece470..f83bc00054c3f 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/synapse.py @@ -23,7 +23,6 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.synapse import ( AzureSynapseHook, AzureSynapsePipelineHook, @@ -31,6 +30,7 @@ AzureSynapsePipelineRunStatus, AzureSynapseSparkBatchRunStatus, ) +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from azure.synapse.spark.models import SparkBatchJobOptions diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py index 7b9c3dfd61ee9..7831aa9eb82f1 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/wasb_delete_blob.py @@ -20,8 +20,8 @@ from collections.abc import Sequence from typing import TYPE_CHECKING, Any -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_adls.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_adls.py index 016a6e2b0bda4..8f007cf93643a 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_adls.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_adls.py @@ -20,8 +20,8 @@ from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_wasb.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_wasb.py index 53ac418c8ba6e..4f024294646d4 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_wasb.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/local_to_wasb.py @@ -20,8 +20,8 @@ from collections.abc import Sequence from typing import TYPE_CHECKING -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py index db2e22de7c158..3fb34705554c9 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py @@ -23,8 +23,8 @@ from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Any -from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator from airflow.providers.oracle.hooks.oracle import OracleHook if TYPE_CHECKING: diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py index 46c99381e7955..97926b1c6c7db 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/s3_to_wasb.py @@ -22,9 +22,9 @@ from functools import cached_property from typing import TYPE_CHECKING -from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.providers.microsoft.azure.version_compat import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 009c53935065c..9b82e64876ea3 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -32,6 +32,7 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS from airflow.providers.sftp.hooks.sftp import SFTPHook WILDCARD = "*" @@ -98,6 +99,8 @@ def __init__( self.create_container = create_container def dry_run(self) -> None: + if not AIRFLOW_V_3_0_PLUS: + raise NotImplementedError("Not implemented for Airflow 3.") super().dry_run() sftp_files: list[SftpFile] = self.get_sftp_files_map() for file in sftp_files: diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py index 48d122b669696..8a5eeb817bfea 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py @@ -33,3 +33,20 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import ( + BaseOperator, + BaseOperatorLink, + BaseSensorOperator, + ) +else: + from airflow.models import BaseOperator, BaseOperatorLink # type: ignore[no-redef] + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] + +__all__ = [ + "AIRFLOW_V_3_0_PLUS", + "BaseOperator", + "BaseOperatorLink", + "BaseSensorOperator", +]