diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index 0e75dca5f1e63..07d1215b58de7 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -670,7 +670,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "providers/http/tests/file.py", ), { - "selected-providers-list-as-string": "amazon apache.livy atlassian.jira dbt.cloud dingding discord google http pagerduty", + "selected-providers-list-as-string": "amazon apache.livy atlassian.jira common.compat dbt.cloud dingding discord google http pagerduty", "all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']", "all-python-versions-list-as-string": DEFAULT_PYTHON_MAJOR_MINOR_VERSION, "python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']", @@ -691,7 +691,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): [ { "description": "amazon...google", - "test_types": "Providers[amazon] Providers[apache.livy,atlassian.jira,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]", + "test_types": "Providers[amazon] Providers[apache.livy,atlassian.jira,common.compat,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]", } ] ), @@ -702,18 +702,21 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "test_types": "Providers[amazon] Providers[apache.livy]", }, { - "description": "atlassian.jir...dbt.cloud", - "test_types": "Providers[atlassian.jira] Providers[dbt.cloud]", + "description": "atlassian.jir...common.compat", + "test_types": "Providers[atlassian.jira] Providers[common.compat]", }, { - "description": "dingding...discord", - "test_types": "Providers[dingding] Providers[discord]", + "description": "dbt.cloud...dingding", + "test_types": "Providers[dbt.cloud] Providers[dingding]", }, { - "description": "google...http", - "test_types": "Providers[google] Providers[http]", + "description": "discord...google", + "test_types": "Providers[discord] Providers[google]", + }, + { + "description": "http...pagerduty", + "test_types": "Providers[http] Providers[pagerduty]", }, - {"description": "pagerduty", "test_types": "Providers[pagerduty]"}, ] ), "run-mypy": "true", diff --git a/providers/http/provider.yaml b/providers/http/provider.yaml index 62e71bf3bff3a..46201c0b12a47 100644 --- a/providers/http/provider.yaml +++ b/providers/http/provider.yaml @@ -96,6 +96,9 @@ hooks: python-modules: - airflow.providers.http.hooks.http +notifications: + - airflow.providers.http.notifications.HttpNotifier + triggers: - integration-name: Hypertext Transfer Protocol (HTTP) python-modules: diff --git a/providers/http/pyproject.toml b/providers/http/pyproject.toml index 19db63d03d3bf..55a76793db424 100644 --- a/providers/http/pyproject.toml +++ b/providers/http/pyproject.toml @@ -66,11 +66,19 @@ dependencies = [ "asgiref>=2.3.0", ] +# The optional dependencies should be modified in place in the generated file +# Any change in the dependencies is preserved when the file is regenerated +[project.optional-dependencies] +"common.compat" = [ + "apache-airflow-providers-common-compat" +] + [dependency-groups] dev = [ "apache-airflow", "apache-airflow-task-sdk", "apache-airflow-devel-common", + "apache-airflow-providers-common-compat", # Additional devel dependencies (do not remove this line and add extra development dependencies) ] diff --git a/providers/http/src/airflow/providers/http/get_provider_info.py b/providers/http/src/airflow/providers/http/get_provider_info.py index 290339ef43612..37b48c286f025 100644 --- a/providers/http/src/airflow/providers/http/get_provider_info.py +++ b/providers/http/src/airflow/providers/http/get_provider_info.py @@ -53,6 +53,7 @@ def get_provider_info(): "python-modules": ["airflow.providers.http.hooks.http"], } ], + "notifications": ["airflow.providers.http.notifications.HttpNotifier"], "triggers": [ { "integration-name": "Hypertext Transfer Protocol (HTTP)", diff --git a/providers/http/src/airflow/providers/http/notifications/__init__.py b/providers/http/src/airflow/providers/http/notifications/__init__.py new file mode 100644 index 0000000000000..74517c85ef7f6 --- /dev/null +++ b/providers/http/src/airflow/providers/http/notifications/__init__.py @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.providers.http.notifications.http import HttpNotifier + +__all__ = ["HttpNotifier"] diff --git a/providers/http/src/airflow/providers/http/notifications/http.py b/providers/http/src/airflow/providers/http/notifications/http.py new file mode 100644 index 0000000000000..2f1adeadca58a --- /dev/null +++ b/providers/http/src/airflow/providers/http/notifications/http.py @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any + +import aiohttp + +from airflow.providers.common.compat.notifier import BaseNotifier +from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook + +if TYPE_CHECKING: + from airflow.sdk.definitions.context import Context + + +class HttpNotifier(BaseNotifier): + """ + HTTP Notifier. + + Sends HTTP requests to notify external systems. + + :param http_conn_id: HTTP connection id that has the base URL and optional authentication credentials. + :param endpoint: The endpoint to be called i.e. resource/v1/query? + :param method: The HTTP method to use. Defaults to POST. + :param data: Payload to be uploaded or request parameters + :param json: JSON payload to be uploaded + :param headers: Additional headers to be passed through as a dictionary + :param extra_options: Additional options to be used when executing the request + """ + + template_fields = ("http_conn_id", "endpoint", "data", "json", "headers", "extra_options") + + def __init__( + self, + *, + http_conn_id: str = HttpHook.default_conn_name, + endpoint: str | None = None, + method: str = "POST", + data: dict[str, Any] | str | None = None, + json: dict[str, Any] | str | None = None, + headers: dict[str, Any] | None = None, + extra_options: dict[str, Any] | None = None, + **kwargs, + ): + super().__init__(**kwargs) + self.http_conn_id = http_conn_id + self.endpoint = endpoint + self.method = method + self.data = data + self.json = json + self.headers = headers + self.extra_options = extra_options or {} + + @cached_property + def hook(self) -> HttpHook: + """HTTP Hook.""" + return HttpHook(method=self.method, http_conn_id=self.http_conn_id) + + @cached_property + def async_hook(self) -> HttpAsyncHook: + """HTTP Async Hook.""" + return HttpAsyncHook(method=self.method, http_conn_id=self.http_conn_id) + + def notify(self, context: Context) -> None: + """Send HTTP notification (sync).""" + resp = self.hook.run( + endpoint=self.endpoint, + data=self.data, + headers=self.headers, + extra_options=self.extra_options, + json=self.json, + ) + self.log.debug("HTTP notification sent: %s %s", resp.status_code, resp.url) + + async def async_notify(self, context: Context) -> None: + """Send HTTP notification (async).""" + async with aiohttp.ClientSession() as session: + resp = await self.async_hook.run( + session=session, + endpoint=self.endpoint, + data=self.data, + json=self.json, + headers=self.headers, + extra_options=self.extra_options, + ) + self.log.debug("HTTP notification sent (async): %s %s", resp.status, resp.url) + + +send_http_notification = HttpNotifier diff --git a/providers/http/tests/unit/http/notifications/__init__.py b/providers/http/tests/unit/http/notifications/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/http/tests/unit/http/notifications/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/http/tests/unit/http/notifications/test_http.py b/providers/http/tests/unit/http/notifications/test_http.py new file mode 100644 index 0000000000000..eb4eecff46699 --- /dev/null +++ b/providers/http/tests/unit/http/notifications/test_http.py @@ -0,0 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.providers.http.notifications.http import HttpNotifier, send_http_notification + + +class TestHttpNotifier: + def test_class_and_notifier_are_same(self): + assert send_http_notification is HttpNotifier + + @mock.patch("airflow.providers.http.notifications.http.HttpHook") + def test_http_notifier(self, mock_http_hook): + notifier = HttpNotifier( + http_conn_id="test_conn_id", + endpoint="/testing", + method="POST", + json={"message": "testing"}, + headers={"Content-Type": "application/json"}, + ) + notifier.notify({}) + + mock_http_hook.return_value.run.assert_called_once_with( + endpoint="/testing", + data=None, + headers={"Content-Type": "application/json"}, + extra_options={}, + json={"message": "testing"}, + ) + mock_http_hook.assert_called_once_with(method="POST", http_conn_id="test_conn_id") + + @pytest.mark.asyncio + @mock.patch("airflow.providers.http.notifications.http.HttpAsyncHook") + @mock.patch("aiohttp.ClientSession") + async def test_async_http_notifier(self, mock_session, mock_http_async_hook): + mock_hook = mock_http_async_hook.return_value + mock_hook.run = mock.AsyncMock() + + notifier = HttpNotifier( + http_conn_id="test_conn_id", + endpoint="/test", + method="POST", + json={"message": "test"}, + ) + + await notifier.async_notify({}) + + mock_hook.run.assert_called_once_with( + session=mock_session.return_value.__aenter__.return_value, + endpoint="/test", + data=None, + json={"message": "test"}, + headers=None, + extra_options={}, + ) + + @mock.patch("airflow.providers.http.notifications.http.HttpHook") + def test_http_notifier_templated(self, mock_http_hook, create_dag_without_db): + notifier = HttpNotifier( + endpoint="/{{ dag.dag_id }}", + json={"dag_id": "{{ dag.dag_id }}", "user": "{{ username }}"}, + ) + notifier( + { + "dag": create_dag_without_db("test_http_notification_templated"), + "username": "test-user", + } + ) + + mock_http_hook.return_value.run.assert_called_once_with( + endpoint="/test_http_notification_templated", + data=None, + headers=None, + extra_options={}, + json={"dag_id": "test_http_notification_templated", "user": "test-user"}, + )