diff --git a/providers/slack/src/airflow/providers/slack/operators/slack.py b/providers/slack/src/airflow/providers/slack/operators/slack.py index e4035d057a19e..93e2be87450f7 100644 --- a/providers/slack/src/airflow/providers/slack/operators/slack.py +++ b/providers/slack/src/airflow/providers/slack/operators/slack.py @@ -24,8 +24,8 @@ from typing_extensions import Literal -from airflow.models import BaseOperator from airflow.providers.slack.hooks.slack import SlackHook +from airflow.providers.slack.version_compat import BaseOperator if TYPE_CHECKING: from slack_sdk.http_retry import RetryHandler diff --git a/providers/slack/src/airflow/providers/slack/operators/slack_webhook.py b/providers/slack/src/airflow/providers/slack/operators/slack_webhook.py index 58c6906c5ea02..3bdb7a55e9d12 100644 --- a/providers/slack/src/airflow/providers/slack/operators/slack_webhook.py +++ b/providers/slack/src/airflow/providers/slack/operators/slack_webhook.py @@ -21,8 +21,8 @@ from functools import cached_property from typing import TYPE_CHECKING -from airflow.models import BaseOperator from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook +from airflow.providers.slack.version_compat import BaseOperator if TYPE_CHECKING: from slack_sdk.http_retry import RetryHandler diff --git a/providers/slack/src/airflow/providers/slack/transfers/base_sql_to_slack.py b/providers/slack/src/airflow/providers/slack/transfers/base_sql_to_slack.py index 43221e601935a..ba544965ea255 100644 --- a/providers/slack/src/airflow/providers/slack/transfers/base_sql_to_slack.py +++ b/providers/slack/src/airflow/providers/slack/transfers/base_sql_to_slack.py @@ -21,7 +21,7 @@ from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook -from airflow.models import BaseOperator +from airflow.providers.slack.version_compat import BaseOperator if TYPE_CHECKING: import pandas as pd diff --git a/providers/slack/src/airflow/providers/slack/version_compat.py b/providers/slack/src/airflow/providers/slack/version_compat.py new file mode 100644 index 0000000000000..e7a259afb357c --- /dev/null +++ b/providers/slack/src/airflow/providers/slack/version_compat.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator + +__all__ = [ + "AIRFLOW_V_3_0_PLUS", + "BaseOperator", +] diff --git a/providers/slack/tests/unit/slack/transfers/test_sql_to_slack_webhook.py b/providers/slack/tests/unit/slack/transfers/test_sql_to_slack_webhook.py index c3d8481f4847a..a8f846b8c7f9c 100644 --- a/providers/slack/tests/unit/slack/transfers/test_sql_to_slack_webhook.py +++ b/providers/slack/tests/unit/slack/transfers/test_sql_to_slack_webhook.py @@ -21,7 +21,6 @@ import pandas as pd import pytest -from airflow import DAG from airflow.models import Connection from airflow.providers.slack.transfers.sql_to_slack_webhook import SqlToSlackWebhookOperator from airflow.utils import timezone @@ -40,7 +39,6 @@ def mocked_hook(): @pytest.mark.db_test class TestSqlToSlackWebhookOperator: def setup_method(self): - self.example_dag = DAG(TEST_DAG_ID, schedule=None, start_date=DEFAULT_DATE) self.default_hook_parameters = {"timeout": None, "proxy": None, "retry_handlers": None} @staticmethod @@ -61,27 +59,31 @@ def _construct_operator(**kwargs): ), ], ) - def test_rendering_and_message_execution(self, slack_op_kwargs, hook_extra_kwargs, mocked_hook): + def test_rendering_and_message_execution( + self, slack_op_kwargs, hook_extra_kwargs, mocked_hook, dag_maker + ): mock_dbapi_hook = mock.Mock() test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1]) get_df_mock = mock_dbapi_hook.return_value.get_df get_df_mock.return_value = test_df - operator_args = { - "sql_conn_id": "snowflake_connection", - "slack_webhook_conn_id": "slack_connection", - "slack_message": "message: {{ ds }}, {{ results_df }}", - "slack_channel": "#test", - "sql": "sql {{ ds }}", - "dag": self.example_dag, - **slack_op_kwargs, - } - sql_to_slack_operator = self._construct_operator(**operator_args) + with dag_maker(dag_id=TEST_DAG_ID, start_date=DEFAULT_DATE): + operator_args = { + "sql_conn_id": "snowflake_connection", + "slack_webhook_conn_id": "slack_connection", + "slack_message": "message: {{ ds }}, {{ results_df }}", + "slack_channel": "#test", + "sql": "sql {{ ds }}", + **slack_op_kwargs, + } + sql_to_slack_operator = self._construct_operator(**operator_args) slack_webhook_hook = mocked_hook.return_value sql_to_slack_operator._get_hook = mock_dbapi_hook - sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + ti = dag_maker.create_dagrun().task_instances[0] + ti.run() # Test that the Slack hook is instantiated with the right parameters mocked_hook.assert_called_once_with(slack_webhook_conn_id="slack_connection", **hook_extra_kwargs) @@ -92,26 +94,28 @@ def test_rendering_and_message_execution(self, slack_op_kwargs, hook_extra_kwarg channel="#test", ) - def test_rendering_and_message_execution_with_slack_hook(self, mocked_hook): + def test_rendering_and_message_execution_with_slack_hook(self, mocked_hook, dag_maker): mock_dbapi_hook = mock.Mock() test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1]) get_df_mock = mock_dbapi_hook.return_value.get_df get_df_mock.return_value = test_df - operator_args = { - "sql_conn_id": "snowflake_connection", - "slack_webhook_conn_id": "slack_connection", - "slack_message": "message: {{ ds }}, {{ results_df }}", - "slack_channel": "#test", - "sql": "sql {{ ds }}", - "dag": self.example_dag, - } - sql_to_slack_operator = self._construct_operator(**operator_args) + with dag_maker(dag_id=TEST_DAG_ID, start_date=DEFAULT_DATE): + operator_args = { + "sql_conn_id": "snowflake_connection", + "slack_webhook_conn_id": "slack_connection", + "slack_message": "message: {{ ds }}, {{ results_df }}", + "slack_channel": "#test", + "sql": "sql {{ ds }}", + } + sql_to_slack_operator = self._construct_operator(**operator_args) slack_webhook_hook = mocked_hook.return_value sql_to_slack_operator._get_hook = mock_dbapi_hook - sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + ti = dag_maker.create_dagrun().task_instances[0] + ti.run() # Test that the Slack hook is instantiated with the right parameters mocked_hook.assert_called_once_with( @@ -153,27 +157,29 @@ def test_non_existing_slack_webhook_conn_id(self): with pytest.raises(ValueError, match="Got an empty `slack_webhook_conn_id` value"): self._construct_operator(**operator_args) - def test_rendering_custom_df_name_message_execution(self, mocked_hook): + def test_rendering_custom_df_name_message_execution(self, mocked_hook, dag_maker): mock_dbapi_hook = mock.Mock() test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1]) get_df_mock = mock_dbapi_hook.return_value.get_df get_df_mock.return_value = test_df - operator_args = { - "sql_conn_id": "snowflake_connection", - "slack_webhook_conn_id": "slack_connection", - "slack_message": "message: {{ ds }}, {{ testing }}", - "slack_channel": "#test", - "sql": "sql {{ ds }}", - "results_df_name": "testing", - "dag": self.example_dag, - } - sql_to_slack_operator = self._construct_operator(**operator_args) + with dag_maker(dag_id=TEST_DAG_ID, start_date=DEFAULT_DATE): + operator_args = { + "sql_conn_id": "snowflake_connection", + "slack_webhook_conn_id": "slack_connection", + "slack_message": "message: {{ ds }}, {{ testing }}", + "slack_channel": "#test", + "sql": "sql {{ ds }}", + "results_df_name": "testing", + } + sql_to_slack_operator = self._construct_operator(**operator_args) slack_webhook_hook = mocked_hook.return_value sql_to_slack_operator._get_hook = mock_dbapi_hook - sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + ti = dag_maker.create_dagrun().task_instances[0] + ti.run() # Test that the Slack hook is instantiated with the right parameters mocked_hook.assert_called_once_with( @@ -202,7 +208,6 @@ def test_hook_params_building(self, mocked_get_connection): "slack_webhook_conn_id": "slack_connection", "parameters": ["1", "2", "3"], "slack_message": "message: {{ ds }}, {{ xxxx }}", - "dag": self.example_dag, } sql_to_slack_operator = SqlToSlackWebhookOperator(task_id=TEST_TASK_ID, **operator_args)