Skip to content

Commit

Permalink
Add PrestoToSlackOperator (#23979)
Browse files Browse the repository at this point in the history
* Add `PrestoToSlackOperator`
Adding the funcitonality to run a single query against presto and send the result as slack message.
Similar to `SnowflakeToSlackOperator`
  • Loading branch information
eladkal authored Jun 6, 2022
1 parent 8d6f4ed commit 2226e64
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ hashicorp google
microsoft.azure google,oracle,sftp
mysql amazon,presto,trino,vertica
postgres amazon
presto google
presto google,slack
salesforce tableau
sftp ssh
slack http
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
"amazon"
],
"presto": [
"google"
"google",
"slack"
],
"salesforce": [
"tableau"
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/presto/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ transfers:
how-to-guide: /docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
python-module: airflow.providers.presto.transfers.gcs_to_presto

- source-integration-name: Presto
target-integration-name: Slack
how-to-guide: /docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst
python-module: airflow.providers.presto.transfers.presto_to_slack

hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+
- airflow.providers.presto.hooks.presto.PrestoHook

Expand Down
141 changes: 141 additions & 0 deletions airflow/providers/presto/transfers/presto_to_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union

from pandas import DataFrame
from tabulate import tabulate

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.presto.hooks.presto import PrestoHook
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class PrestoToSlackOperator(BaseOperator):
"""
Executes a single SQL statement in Presto and sends the results to Slack. The results of the query are
rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
results_df }}'. The 'results_df' variable name can be changed by specifying a different
'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PrestoToSlackOperator`
:param sql: The SQL statement to execute on Presto (templated)
:param slack_message: The templated Slack message to send with the data returned from Presto.
You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the
SQL results
:param presto_conn_id: destination presto connection
:param slack_conn_id: The connection id for Slack
:param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df'
:param parameters: The parameters to pass to the SQL query
:param slack_token: The token to use to authenticate to Slack. If this is not provided, the
'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id
:param slack_channel: The channel to send message. Override default from Slack connection.
"""

template_fields: Sequence[str] = ('sql', 'slack_message', 'slack_channel')
template_ext: Sequence[str] = ('.sql', '.jinja', '.j2')
template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
times_rendered = 0

def __init__(
self,
*,
sql: str,
slack_message: str,
presto_conn_id: str = 'presto_default',
slack_conn_id: str = 'slack_default',
results_df_name: str = 'results_df',
parameters: Optional[Union[Iterable, Mapping]] = None,
slack_token: Optional[str] = None,
slack_channel: Optional[str] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)

self.presto_conn_id = presto_conn_id
self.sql = sql
self.parameters = parameters
self.slack_conn_id = slack_conn_id
self.slack_token = slack_token
self.slack_message = slack_message
self.results_df_name = results_df_name
self.slack_channel = slack_channel

def _get_query_results(self) -> DataFrame:
presto_hook = self._get_presto_hook()

self.log.info('Running SQL query: %s', self.sql)
df = presto_hook.get_pandas_df(self.sql, parameters=self.parameters)
return df

def _render_and_send_slack_message(self, context, df) -> None:
# Put the dataframe into the context and render the JINJA template fields
context[self.results_df_name] = df
self.render_template_fields(context)

slack_hook = self._get_slack_hook()
self.log.info('Sending slack message: %s', self.slack_message)
slack_hook.execute()

def _get_presto_hook(self) -> PrestoHook:
return PrestoHook(presto_conn_id=self.presto_conn_id)

def _get_slack_hook(self) -> SlackWebhookHook:
return SlackWebhookHook(
http_conn_id=self.slack_conn_id,
message=self.slack_message,
webhook_token=self.slack_token,
slack_channel=self.slack_channel,
)

def render_template_fields(self, context, jinja_env=None) -> None:
# If this is the first render of the template fields, exclude slack_message from rendering since
# the presto results haven't been retrieved yet.
if self.times_rendered == 0:
fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields)
else:
fields_to_render = self.template_fields

if not jinja_env:
jinja_env = self.get_template_env()

# Add the tabulate library into the JINJA environment
jinja_env.filters['tabulate'] = tabulate

self._do_render_template_fields(self, fields_to_render, context, jinja_env, set())
self.times_rendered += 1

def execute(self, context: 'Context') -> None:
if not self.sql.strip():
raise AirflowException("Expected 'sql' parameter is missing.")
if not self.slack_message.strip():
raise AirflowException("Expected 'slack_message' parameter is missing.")

df = self._get_query_results()

self._render_and_send_slack_message(context, df)

self.log.debug('Finished sending Presto data to Slack')
7 changes: 7 additions & 0 deletions docs/apache-airflow-providers-presto/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ Content

PrestoTransferOperator types <operators/transfer/gcs_to_presto>

.. toctree::
:maxdepth: 1
:caption: Guides

PrestoToSlackOperator types <operators/transfer/presto_to_slack>

.. toctree::
:maxdepth: 1
:caption: References
Expand Down Expand Up @@ -100,6 +106,7 @@ You can install such cross-provider dependencies when installing from PyPI. For
Dependent package Extra
==================================================================================================== ==========
`apache-airflow-providers-google <https://airflow.apache.org/docs/apache-airflow-providers-google>`_ ``google``
`apache-airflow-providers-slack <https://airflow.apache.org/docs/apache-airflow-providers-slack>`_ ``slack``
==================================================================================================== ==========

Downloading official packages
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _howto/operator:PrestoToSlackOperator:

PrestoToSlackOperator
========================

Use the :class:`~airflow.providers.presto.transfers.presto_to_slack.presto_to_slack` to post messages to predefined Slack
channels.

Using the Operator
^^^^^^^^^^^^^^^^^^

This operator will execute a custom query in Presto and publish a Slack message that can be formatted
and contain the resulting dataset (e.g. ASCII formatted dataframe).

An example usage of the PrestoToSlackOperator is as follows:

.. exampleinclude:: /../../tests/system/providers/presto/example_presto_to_slack.py
:language: python
:dedent: 4
:start-after: [START howto_operator_presto_to_slack]
:end-before: [END howto_operator_presto_to_slack]
77 changes: 77 additions & 0 deletions tests/providers/presto/transfers/test_presto_to_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from unittest import mock

from airflow.models import DAG
from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator
from airflow.utils import timezone
from tests.test_utils.db import clear_db_runs

TEST_DAG_ID = 'presto_to_slack_unit_test'
DEFAULT_DATE = timezone.datetime(2022, 1, 1)


class TestPrestoToSlackOperator:
def setup_class(self):
clear_db_runs()

def setup_method(self):
self.example_dag = DAG('unit_test_dag_presto_to_slack', start_date=DEFAULT_DATE)

def teardown_method(self):
clear_db_runs()

@staticmethod
def _construct_operator(**kwargs):
operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
return operator

@mock.patch('airflow.providers.presto.transfers.presto_to_slack.PrestoHook')
@mock.patch('airflow.providers.presto.transfers.presto_to_slack.SlackWebhookHook')
def test_hooks_and_rendering(self, mock_slack_hook_class, mock_presto_hook_class):
operator_args = {
'presto_conn_id': 'presto_connection',
'slack_conn_id': 'slack_connection',
'sql': "sql {{ ds }}",
'results_df_name': 'xxxx',
'parameters': ['1', '2', '3'],
'slack_message': 'message: {{ ds }}, {{ xxxx }}',
'slack_token': 'test_token',
'slack_channel': 'my_channel',
'dag': self.example_dag,
}
presto_to_slack_operator = self._construct_operator(**operator_args)
presto_hook = mock_presto_hook_class.return_value
presto_hook.get_pandas_df.return_value = '1234'
slack_webhook_hook = mock_slack_hook_class.return_value
presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

mock_presto_hook_class.assert_called_once_with(
presto_conn_id='presto_connection',
)

presto_hook.get_pandas_df.assert_called_once_with('sql 2022-01-01', parameters=['1', '2', '3'])

mock_slack_hook_class.assert_called_once_with(
http_conn_id='slack_connection',
message='message: 2022-01-01, 1234',
webhook_token='test_token',
slack_channel='my_channel',
)

slack_webhook_hook.execute.assert_called_once()
52 changes: 52 additions & 0 deletions tests/system/providers/presto/example_presto_to_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example DAG using PrestoToSlackOperator.
"""

import os
from datetime import datetime

from airflow import models
from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator

PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_presto_to_slack"

with models.DAG(
dag_id=DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# [START howto_operator_presto_to_slack]
PrestoToSlackOperator(
task_id="presto_to_slack",
sql=f"SELECT col FROM {PRESTO_TABLE}",
slack_channel="my_channel",
slack_message="message: {{ ds }}, {{ results_df }}",
)
# [END howto_operator_presto_to_slack]


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 2226e64

Please sign in to comment.