diff --git a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py index dd494e855aef2..596765c4a5c53 100644 --- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py +++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py @@ -25,9 +25,14 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.providers.airbyte.hooks.airbyte import AirbyteHook from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger +from airflow.providers.airbyte.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/links/maxcompute.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/links/maxcompute.py index 5147b5fc0f6d4..0c28c538ef6ce 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/links/maxcompute.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/links/maxcompute.py @@ -54,18 +54,15 @@ def get_link( @staticmethod def persist( context: Context, - task_instance: BaseOperator, log_view_url: str, ): """ Persist the log view URL to XCom for later retrieval. :param context: The context of the task instance. - :param task_instance: The task instance. :param log_view_url: The log view URL to persist. """ - task_instance.xcom_push( - context, + context["task_instance"].xcom_push( key=MaxComputeLogViewLink.key, value=log_view_url, ) diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py index 3cf414230f519..bf8228bdc84be 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py @@ -23,8 +23,13 @@ from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.providers.alibaba.cloud.hooks.analyticdb_spark import AnalyticDBSparkHook, AppState +from airflow.providers.alibaba.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py index d0b16d1eb6eeb..32c3113d3965b 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/maxcompute.py @@ -22,9 +22,14 @@ from collections.abc import Sequence from typing import TYPE_CHECKING -from airflow.models import BaseOperator from airflow.providers.alibaba.cloud.hooks.maxcompute import MaxComputeHook from airflow.providers.alibaba.cloud.links.maxcompute import MaxComputeLogViewLink +from airflow.providers.alibaba.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator if TYPE_CHECKING: from odps.models import Instance @@ -124,9 +129,7 @@ def execute(self, context: Context) -> str: quota_name=self.quota_name, ) - MaxComputeLogViewLink.persist( - context=context, task_instance=self, log_view_url=self.instance.get_logview_address() - ) + MaxComputeLogViewLink.persist(context=context, log_view_url=self.instance.get_logview_address()) self.instance.wait_for_success() diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/oss.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/oss.py index 1e07a4cce85a2..e967c5f2a73d0 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/oss.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/operators/oss.py @@ -21,8 +21,13 @@ from typing import TYPE_CHECKING -from airflow.models import BaseOperator from airflow.providers.alibaba.cloud.hooks.oss import OSSHook +from airflow.providers.alibaba.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/alibaba/tests/unit/alibaba/cloud/links/test_maxcompute.py b/providers/alibaba/tests/unit/alibaba/cloud/links/test_maxcompute.py index 2820cf7c1223a..db0e9b6104579 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/links/test_maxcompute.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/links/test_maxcompute.py @@ -49,18 +49,16 @@ def test_get_link(self, mock_xcom, xcom_value, expected_link): assert link == expected_link def test_persist(self): - mock_context = mock.MagicMock() mock_task_instance = mock.MagicMock() + mock_context = {"task_instance": mock_task_instance} mock_url = "mock_url" MaxComputeLogViewLink.persist( context=mock_context, - task_instance=mock_task_instance, log_view_url=mock_url, ) mock_task_instance.xcom_push.assert_called_once_with( - mock_context, key=MaxComputeLogViewLink.key, value=mock_url, ) diff --git a/providers/alibaba/tests/unit/alibaba/cloud/operators/test_maxcompute.py b/providers/alibaba/tests/unit/alibaba/cloud/operators/test_maxcompute.py index 4bbd6f1cd66f7..af49b24810941 100644 --- a/providers/alibaba/tests/unit/alibaba/cloud/operators/test_maxcompute.py +++ b/providers/alibaba/tests/unit/alibaba/cloud/operators/test_maxcompute.py @@ -60,7 +60,6 @@ def test_execute(self, mock_hook, mock_log_view_link): mock_log_view_link.persist.assert_called_once_with( context=mock.ANY, - task_instance=op, log_view_url=instance_mock.get_logview_address.return_value, )