Skip to content

Commit

Permalink
Refactor SlackWebhookOperator: Get rid of mandatory http-provider d…
Browse files Browse the repository at this point in the history
…ependency
  • Loading branch information
Taragolis committed Sep 24, 2022
1 parent 3b61769 commit cf15073
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 60 deletions.
89 changes: 73 additions & 16 deletions airflow/providers/slack/operators/slack_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,40 @@
# under the License.
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Sequence

from airflow.compat.functools import cached_property
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class SlackWebhookOperator(SimpleHttpOperator):
class SlackWebhookOperator(BaseOperator):
"""
This operator allows you to post messages to Slack using incoming webhooks.
Takes both Slack webhook token directly and connection that has Slack webhook token.
If both supplied, http_conn_id will be used as base_url,
and webhook_token will be taken as endpoint, the relative path of the url.
This operator allows you to post messages to Slack using Incoming Webhooks.
Each Slack webhook token can be pre-configured to use a specific channel, username and
icon. You can override these defaults in this hook.
.. note::
You cannot override the default channel (chosen by the user who installed your app),
username, or icon when you're using Incoming Webhooks to post messages.
Instead, these values will always inherit from the associated Slack App configuration
(`link <https://api.slack.com/messaging/webhooks#advanced_message_formatting>`_).
It is possible to change this values only in `Legacy Slack Integration Incoming Webhook
<https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations>`_.
:param http_conn_id: connection that has Slack webhook token in the extra field
:param webhook_token: Slack webhook token
:param message: The message you want to send on Slack
.. warning::
This operator could take Slack Webhook Token from ``webhook_token``
as well as from :ref:`Slack Incoming Webhook connection <howto/connection:slack-incoming-webhook>`.
However, provide ``webhook_token`` it is not secure and this attribute
will be removed in the future version of provider.
:param slack_webhook_conn_id: :ref:`Slack Incoming Webhook <howto/connection:slack>`
connection id that has Incoming Webhook token in the password field.
:param message: The formatted text of the message to be published.
If ``blocks`` are included, this will become the fallback text used in notifications.
:param attachments: The attachments to send on Slack. Should be a list of
dictionaries representing Slack attachments.
:param blocks: The blocks to send on Slack. Should be a list of
Expand All @@ -51,6 +62,8 @@ class SlackWebhookOperator(SimpleHttpOperator):
:param link_names: Whether or not to find and link channel and usernames in your
message
:param proxy: Proxy to use to make the Slack webhook call
:param webhook_token: (deprecated) Slack Incoming Webhook token.
Please use ``slack_webhook_conn_id`` instead.
"""

template_fields: Sequence[str] = (
Expand All @@ -66,7 +79,7 @@ class SlackWebhookOperator(SimpleHttpOperator):
def __init__(
self,
*,
http_conn_id: str,
slack_webhook_conn_id: str | None = None,
webhook_token: str | None = None,
message: str = "",
attachments: list | None = None,
Expand All @@ -79,8 +92,50 @@ def __init__(
proxy: str | None = None,
**kwargs,
) -> None:
super().__init__(endpoint=webhook_token, **kwargs)
self.http_conn_id = http_conn_id
http_conn_id = kwargs.pop("http_conn_id", None)
if http_conn_id:
warnings.warn(
'Parameter `http_conn_id` is deprecated. Please use `slack_webhook_conn_id` instead.',
DeprecationWarning,
stacklevel=2,
)
if slack_webhook_conn_id:
raise AirflowException("You cannot provide both `slack_webhook_conn_id` and `http_conn_id`.")
slack_webhook_conn_id = http_conn_id

# Compatibility with previous version of operator which based on SimpleHttpOperator.
# Users might pass these arguments previously, however its never pass to SlackWebhookHook.
# We remove this arguments if found in ``kwargs`` and notify users if found any.
deprecated_class_attrs = []
for deprecated_attr in (
"endpoint",
"method",
"data",
"headers",
"response_check",
"response_filter",
"extra_options",
"log_response",
"auth_type",
"tcp_keep_alive",
"tcp_keep_alive_idle",
"tcp_keep_alive_count",
"tcp_keep_alive_interval",
):
if deprecated_attr in kwargs:
deprecated_class_attrs.append(deprecated_attr)
kwargs.pop(deprecated_attr)
if deprecated_class_attrs:
warnings.warn(
f"Provide {','.join(repr(a) for a in deprecated_class_attrs)} is deprecated "
f"and as has no affect, please remove it from {self.__class__.__name__} "
"constructor attributes otherwise in future version of provider it might cause an issue.",
DeprecationWarning,
stacklevel=2,
)

super().__init__(**kwargs)
self.slack_webhook_conn_id = slack_webhook_conn_id
self.webhook_token = webhook_token
self.proxy = proxy
self.message = message
Expand All @@ -94,10 +149,12 @@ def __init__(

@cached_property
def hook(self) -> SlackWebhookHook:
"""Create and return an SlackWebhookHook (cached)."""
return SlackWebhookHook(
http_conn_id=self.http_conn_id,
webhook_token=self.webhook_token,
slack_webhook_conn_id=self.slack_webhook_conn_id,
proxy=self.proxy,
# Deprecated. SlackWebhookHook will notify user if user provide non-empty ``webhook_token``.
webhook_token=self.webhook_token,
)

def execute(self, context: Context) -> None:
Expand Down
1 change: 0 additions & 1 deletion airflow/providers/slack/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ versions:
dependencies:
- apache-airflow>=2.2.0
- apache-airflow-providers-common-sql>=1.1.0
- apache-airflow-providers-http
- slack_sdk>=3.0.0

integrations:
Expand Down
4 changes: 1 addition & 3 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,11 @@
"slack": {
"deps": [
"apache-airflow-providers-common-sql>=1.1.0",
"apache-airflow-providers-http",
"apache-airflow>=2.2.0",
"slack_sdk>=3.0.0"
],
"cross-providers-deps": [
"common.sql",
"http"
"common.sql"
]
},
"snowflake": {
Expand Down
169 changes: 129 additions & 40 deletions tests/providers/slack/operators/test_slack_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,97 @@
# under the License.
from __future__ import annotations

import unittest
from typing import Sequence
from unittest import mock

from airflow.models.dag import DAG
import pytest

from airflow.exceptions import AirflowException
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2017, 1, 1)

class TestSlackWebhookOperator:
def setup_method(self):
self.default_op_kwargs = {
"channel": None,
"username": None,
"icon_emoji": None,
"icon_url": None,
}

class TestSlackWebhookOperator(unittest.TestCase):
_config = {
'http_conn_id': 'slack-webhook-default',
'webhook_token': 'manual_token',
'message': 'your message here',
'attachments': [{'fallback': 'Required plain-text summary'}],
'blocks': [{'type': 'section', 'text': {'type': 'mrkdwn', 'text': '*bold text*'}}],
'channel': '#general',
'username': 'SlackMcSlackFace',
'icon_emoji': ':hankey',
'icon_url': 'https://airflow.apache.org/_images/pin_large.png',
'link_names': True,
'proxy': 'https://my-horrible-proxy.proxyist.com:8080',
}
@pytest.mark.parametrize(
"simple_http_op_attr",
[
"endpoint",
"method",
"data",
"headers",
"response_check",
"response_filter",
"extra_options",
"log_response",
"auth_type",
"tcp_keep_alive",
"tcp_keep_alive_idle",
"tcp_keep_alive_count",
"tcp_keep_alive_interval",
],
)
def test_unused_deprecated_http_operator_kwargs(self, simple_http_op_attr):
"""
Test remove deprecated (and unused) SimpleHttpOperator keyword arguments.
No error should happen if provide any of attribute, unless operator allow to provide this attributes.
"""
kw = {simple_http_op_attr: "foo-bar"}
warning_message = fr"Provide '{simple_http_op_attr}' is deprecated and as has no affect"
with pytest.warns(DeprecationWarning, match=warning_message):
SlackWebhookOperator(task_id="test_unused_args", **kw)

def setUp(self):
args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
self.dag = DAG('test_dag_id', default_args=args)
def test_deprecated_http_conn_id(self):
"""Test resolve deprecated http_conn_id."""
warning_message = (
r"Parameter `http_conn_id` is deprecated. Please use `slack_webhook_conn_id` instead."
)
with pytest.warns(DeprecationWarning, match=warning_message):
op = SlackWebhookOperator(
task_id='test_deprecated_http_conn_id', slack_webhook_conn_id=None, http_conn_id="http_conn"
)
assert op.slack_webhook_conn_id == "http_conn"

def test_execute(self):
# Given / When
operator = SlackWebhookOperator(task_id='slack_webhook_job', dag=self.dag, **self._config)
error_message = "You cannot provide both `slack_webhook_conn_id` and `http_conn_id`."
with pytest.raises(AirflowException, match=error_message):
with pytest.warns(DeprecationWarning, match=warning_message):
SlackWebhookOperator(
task_id='test_both_conn_ids',
slack_webhook_conn_id="slack_webhook_conn_id",
http_conn_id="http_conn",
)

assert self._config['http_conn_id'] == operator.http_conn_id
assert self._config['webhook_token'] == operator.webhook_token
assert self._config['message'] == operator.message
assert self._config['attachments'] == operator.attachments
assert self._config['blocks'] == operator.blocks
assert self._config['channel'] == operator.channel
assert self._config['username'] == operator.username
assert self._config['icon_emoji'] == operator.icon_emoji
assert self._config['icon_url'] == operator.icon_url
assert self._config['link_names'] == operator.link_names
assert self._config['proxy'] == operator.proxy
@pytest.mark.parametrize(
"slack_webhook_conn_id,webhook_token",
[
("test_conn_id", None),
(None, "https://hooks.slack.com/services/T000/B000/XXX"),
("test_conn_id", "https://hooks.slack.com/services/T000/B000/XXX"),
],
)
@pytest.mark.parametrize("proxy", [None, "https://localhost:9999"])
@mock.patch("airflow.providers.slack.operators.slack_webhook.SlackWebhookHook")
def test_hook(self, mock_slackwebhook_cls, slack_webhook_conn_id, webhook_token, proxy):
"""Test get cached ``SlackWebhookHook`` hook."""
op_kw = {
"slack_webhook_conn_id": slack_webhook_conn_id,
"proxy": proxy,
"webhook_token": webhook_token,
}
op = SlackWebhookOperator(task_id='test_hook', **op_kw)
hook = op.hook
assert hook is op.hook, "Expected cached hook"
mock_slackwebhook_cls.assert_called_once_with(**op_kw)

def test_assert_templated_fields(self):
operator = SlackWebhookOperator(task_id='slack_webhook_job', dag=self.dag, **self._config)

template_fields: Sequence[str] = (
"""Test expected templated fields."""
operator = SlackWebhookOperator(task_id='test_assert_templated_fields', **self.default_op_kwargs)
template_fields = (
'webhook_token',
'message',
'attachments',
Expand All @@ -74,5 +116,52 @@ def test_assert_templated_fields(self):
'username',
'proxy',
)

assert operator.template_fields == template_fields

@pytest.mark.parametrize(
"message,blocks,attachments",
[
("Test Text", ["Dummy Block"], ["Test Attachments"]),
("Test Text", ["Dummy Block"], None),
("Test Text", None, None),
(None, ["Dummy Block"], None),
(None, ["Dummy Block"], ["Test Attachments"]),
(None, None, ["Test Attachments"]),
],
)
@pytest.mark.parametrize(
"channel,username,icon_emoji,icon_url",
[
(None, None, None, None),
("legacy-channel", "legacy-username", "legacy-icon_emoji", "legacy-icon-url"),
],
ids=["webhook-attrs", "legacy-webhook-attrs"],
)
@mock.patch("airflow.providers.slack.operators.slack_webhook.SlackWebhookHook")
def test_execute_operator(
self, mock_slackwebhook_cls, message, blocks, attachments, channel, username, icon_emoji, icon_url
):
mock_slackwebhook = mock_slackwebhook_cls.return_value
mock_slackwebhook_send = mock_slackwebhook.send
op = SlackWebhookOperator(
task_id="test_execute",
slack_webhook_conn_id="test_conn_id",
message=message,
blocks=blocks,
attachments=attachments,
channel=channel,
username=username,
icon_emoji=icon_emoji,
icon_url=icon_url,
)
op.execute(mock.MagicMock())
mock_slackwebhook_send.assert_called_once_with(
text=message,
blocks=blocks,
attachments=attachments,
channel=channel,
username=username,
icon_emoji=icon_emoji,
icon_url=icon_url,
link_names=mock.ANY,
)

0 comments on commit cf15073

Please sign in to comment.