Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/slack/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ PIP package Version required
========================================== ==================
``apache-airflow`` ``>=2.10.0``
``apache-airflow-providers-common-compat`` ``>=1.6.1``
``apache-airflow-providers-common-sql`` ``>=1.20.0``
``apache-airflow-providers-common-sql`` ``>=1.27.0``
``slack_sdk`` ``>=3.19.0``
========================================== ==================

Expand Down
4 changes: 2 additions & 2 deletions providers/slack/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ requires-python = "~=3.9"
dependencies = [
"apache-airflow>=2.10.0",
"apache-airflow-providers-common-compat>=1.6.1",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow-providers-common-sql>=1.27.0",
"slack_sdk>=3.19.0",
]

Expand All @@ -71,7 +71,7 @@ dev = [
"apache-airflow-providers-common-compat",
"apache-airflow-providers-common-sql",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
"apache-airflow-providers-common-sql[pandas]",
"apache-airflow-providers-common-sql[pandas,polars]",
"apache-airflow-providers-postgres",
"apache-airflow-providers-snowflake",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,13 @@ def _get_hook(self) -> DbApiHook:
self.log.debug("Get connection for %s", self.sql_conn_id)
conn = BaseHook.get_connection(self.sql_conn_id)
hook = conn.get_hook(hook_params=self.sql_hook_params)
if not callable(getattr(hook, "get_pandas_df", None)):
raise AirflowException(
"This hook is not supported. The hook class must have get_pandas_df method."
)
if not callable(getattr(hook, "get_df", None)):
raise AirflowException("This hook is not supported. The hook class must have get_df method.")
return hook

def _get_query_results(self) -> pd.DataFrame:
sql_hook = self._get_hook()

self.log.info("Running SQL query: %s", self.sql)
df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
df = sql_hook.get_df(self.sql, parameters=self.parameters)
return df
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ def test_execute_not_implemented(self):
@pytest.mark.parametrize("sql_hook_params", [None, {"foo": "bar"}])
def test_get_hook(self, mock_get_hook, mock_get_conn, conn_type, sql_hook_params):
class SomeDummyHook:
"""Hook which implements ``get_pandas_df`` method"""
"""Hook which implements ``get_df`` method"""

def get_pandas_df(self):
def get_df(self):
pass

expected_hook = SomeDummyHook()
Expand All @@ -68,12 +68,12 @@ def get_pandas_df(self):
@mock.patch("airflow.models.connection.Connection.get_hook")
def test_get_not_supported_hook(self, mock_get_hook, mock_get_conn):
class SomeDummyHook:
"""Hook which not implemented ``get_pandas_df`` method"""
"""Hook which not implemented ``get_df`` method"""

mock_get_conn.return_value = Connection(conn_id="test_connection", conn_type="test_connection")
mock_get_hook.return_value = SomeDummyHook()
op = BaseSqlToSlackOperator(task_id="test_get_not_supported_hook", **self.default_op_kwargs)
error_message = r"This hook is not supported. The hook class must have get_pandas_df method\."
error_message = r"This hook is not supported. The hook class must have get_df method\."
with pytest.raises(AirflowException, match=error_message):
op._get_hook()

Expand All @@ -82,9 +82,9 @@ class SomeDummyHook:
@pytest.mark.parametrize("parameters", [None, {"col": "spam-egg"}])
def test_get_query_results(self, mock_op_get_hook, sql, parameters):
test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
mock_get_pandas_df = mock.MagicMock(return_value=test_df)
mock_get_df = mock.MagicMock(return_value=test_df)
mock_hook = mock.MagicMock()
mock_hook.get_pandas_df = mock_get_pandas_df
mock_hook.get_df = mock_get_df
mock_op_get_hook.return_value = mock_hook
op_kwargs = {
**self.default_op_kwargs,
Expand All @@ -93,5 +93,5 @@ def test_get_query_results(self, mock_op_get_hook, sql, parameters):
}
op = BaseSqlToSlackOperator(task_id="test_get_query_results", **op_kwargs)
df = op._get_query_results()
mock_get_pandas_df.assert_called_once_with(sql, parameters=parameters)
mock_get_df.assert_called_once_with(sql, parameters=parameters)
assert df is test_df
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def test_rendering_and_message_execution(self, slack_op_kwargs, hook_extra_kwarg
mock_dbapi_hook = mock.Mock()

test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
get_pandas_df_mock.return_value = test_df
get_df_mock = mock_dbapi_hook.return_value.get_df
get_df_mock.return_value = test_df

operator_args = {
"sql_conn_id": "snowflake_connection",
Expand Down Expand Up @@ -96,8 +96,8 @@ def test_rendering_and_message_execution_with_slack_hook(self, mocked_hook):
mock_dbapi_hook = mock.Mock()

test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
get_pandas_df_mock.return_value = test_df
get_df_mock = mock_dbapi_hook.return_value.get_df
get_df_mock.return_value = test_df

operator_args = {
"sql_conn_id": "snowflake_connection",
Expand Down Expand Up @@ -157,8 +157,8 @@ def test_rendering_custom_df_name_message_execution(self, mocked_hook):
mock_dbapi_hook = mock.Mock()

test_df = pd.DataFrame({"a": "1", "b": "2"}, index=[0, 1])
get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
get_pandas_df_mock.return_value = test_df
get_df_mock = mock_dbapi_hook.return_value.get_df
get_df_mock.return_value = test_df

operator_args = {
"sql_conn_id": "snowflake_connection",
Expand Down