diff --git a/providers/http/tests/unit/http/hooks/test_http.py b/providers/http/tests/unit/http/hooks/test_http.py index 2bbf9f91ada7c..f3e9f8a81d059 100644 --- a/providers/http/tests/unit/http/hooks/test_http.py +++ b/providers/http/tests/unit/http/hooks/test_http.py @@ -71,9 +71,39 @@ def get_airflow_connection_with_login_and_password(conn_id: str = "http_default" return Connection(conn_id=conn_id, conn_type="http", host="test.com", login="username", password="pass") +@pytest.fixture +def setup_connections_with_extras(request, create_connection_without_db): + extra = request.param if hasattr(request, "param") else {} + create_connection_without_db( + Connection( + conn_id="http_conn_with_extras", conn_type="http", host="test:8080/", extra=json.dumps(extra) + ) + ) + + class TestHttpHook: """Test get, post and raise_for_status""" + @pytest.fixture(autouse=True) + def setup_connections(self, create_connection_without_db): + create_connection_without_db( + Connection( + conn_id="http_default", conn_type="http", host="test:8080/", extra='{"bearer": "test"}' + ) + ) + create_connection_without_db( + Connection(conn_id="http_conn_without_bearer", conn_type="http", host="test.com", port=1234) + ) + create_connection_without_db( + Connection( + conn_id="http_conn_with_user_pwd", + conn_type="http", + host="test.com", + login="username", + password="pass", + ) + ) + def setup_method(self): import requests_mock @@ -88,9 +118,8 @@ def test_raise_for_status_with_200(self, requests_mock): requests_mock.get( "http://test:8080/v1/test", status_code=200, text='{"status":{"status": 200}}', reason="OK" ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - resp = self.get_hook.run("v1/test") - assert resp.text == '{"status":{"status": 200}}' + resp = self.get_hook.run("v1/test") + assert resp.text == '{"status":{"status": 200}}' def test_get_request_with_port(self, requests_mock): from requests.exceptions import MissingSchema @@ -102,19 +131,17 @@ def test_get_request_with_port(self, requests_mock): reason="OK", ) - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ): - expected_url = "http://test.com:1234/some/endpoint" - for endpoint in ["some/endpoint", "/some/endpoint"]: - with contextlib.suppress(MissingSchema): - self.get_hook.run(endpoint) + expected_url = "http://test.com:1234/some/endpoint" + get_hook = HttpHook(method="GET", http_conn_id="http_conn_without_bearer") + for endpoint in ["some/endpoint", "/some/endpoint"]: + with contextlib.suppress(MissingSchema): + get_hook.run(endpoint) - assert requests_mock.call_count == 1 - assert requests_mock.last_request.url == expected_url - assert requests_mock.last_request.method == "GET" + assert requests_mock.call_count == 1 + assert requests_mock.last_request.url == expected_url + assert requests_mock.last_request.method == "GET" - requests_mock.reset() + requests_mock.reset() def test_get_request_do_not_raise_for_status_if_check_response_is_false(self, requests_mock): requests_mock.get( @@ -123,15 +150,14 @@ def test_get_request_do_not_raise_for_status_if_check_response_is_false(self, re text='{"status":{"status": 404}}', reason="Bad request", ) + resp = self.get_hook.run("v1/test", extra_options={"check_response": False}) + assert resp.text == '{"status":{"status": 404}}' - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - resp = self.get_hook.run("v1/test", extra_options={"check_response": False}) - assert resp.text == '{"status":{"status": 404}}' - + # need to check last + @pytest.mark.parametrize("setup_connections_with_extras", [{"check_response": False}], indirect=True) def test_get_request_do_not_raise_for_status_if_check_response_is_false_in_connection( - self, requests_mock + self, setup_connections_with_extras, requests_mock ): - airflow_connection = get_airflow_connection_with_extra(extra={"check_response": False}) requests_mock.get( "http://test:8080/v1/test", status_code=404, @@ -139,95 +165,104 @@ def test_get_request_do_not_raise_for_status_if_check_response_is_false_in_conne reason="Bad request", ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - resp = self.get_hook.run("v1/test") - assert resp.text == '{"status":{"status": 404}}' + get_hook = HttpHook(method="GET", http_conn_id="http_conn_with_extras") + resp = get_hook.run("v1/test") + assert resp.text == '{"status":{"status": 404}}' def test_hook_contains_header_from_extra_field(self): - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - expected_conn = get_airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) == conn.headers - assert conn.headers.get("bearer") == "test" - - def test_hook_ignore_max_redirects_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "max_redirects": 3}) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("allow_redirects") is None - assert conn.proxies == {} - assert conn.stream is False - assert conn.verify is True - assert conn.cert is None - assert conn.max_redirects == 3 - assert conn.trust_env is True - - def test_hook_ignore_proxies_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}} - ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("proxies") is None - assert conn.proxies == {"http": "http://proxy:80", "https": "https://proxy:80"} - assert conn.stream is False - assert conn.verify is True - assert conn.cert is None - assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT - assert conn.trust_env is True - - def test_hook_ignore_verify_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "verify": False}) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("verify") is None - assert conn.proxies == {} - assert conn.stream is False - assert conn.verify is False - assert conn.cert is None - assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT - assert conn.trust_env is True - - def test_hook_ignore_cert_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "cert": "cert.crt", "stream": True} + conn = self.get_hook.get_conn() + assert dict(conn.headers, **{"bearer": "test"}) == conn.headers + assert conn.headers.get("bearer") == "test" + + @pytest.mark.parametrize( + "setup_connections_with_extras", [{"bearer": "test", "max_redirects": 3}], indirect=True + ) + def test_hook_ignore_max_redirects_from_extra_field_as_header(self, setup_connections_with_extras): + get_hook = HttpHook(method="GET", http_conn_id="http_conn_with_extras") + conn = get_hook.get_conn() + assert dict(conn.headers, **{"bearer": "test", "max_redirects": 3}) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("allow_redirects") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is True + assert conn.cert is None + assert conn.max_redirects == 3 + assert conn.trust_env is True + + @pytest.mark.parametrize( + "setup_connections_with_extras", + [{"bearer": "test", "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}}], + indirect=True, + ) + def test_hook_ignore_proxies_from_extra_field_as_header(self, setup_connections_with_extras): + get_hook = HttpHook(method="GET", http_conn_id="http_conn_with_extras") + conn = get_hook.get_conn() + assert ( + dict( + conn.headers, + **{"bearer": "test", "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}}, + ) + != conn.headers ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("cert") is None - assert conn.proxies == {} - assert conn.stream is True - assert conn.verify is True - assert conn.cert == "cert.crt" - assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT - assert conn.trust_env is True - - def test_hook_ignore_trust_env_from_extra_field_as_header(self): - airflow_connection = get_airflow_connection_with_extra(extra={"bearer": "test", "trust_env": False}) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - expected_conn = airflow_connection() - conn = self.get_hook.get_conn() - assert dict(conn.headers, **json.loads(expected_conn.extra)) != conn.headers - assert conn.headers.get("bearer") == "test" - assert conn.headers.get("cert") is None - assert conn.proxies == {} - assert conn.stream is False - assert conn.verify is True - assert conn.cert is None - assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT - assert conn.trust_env is False + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("proxies") is None + assert conn.proxies == {"http": "http://proxy:80", "https": "https://proxy:80"} + assert conn.stream is False + assert conn.verify is True + assert conn.cert is None + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT + assert conn.trust_env is True + + @pytest.mark.parametrize( + "setup_connections_with_extras", [{"bearer": "test", "verify": False}], indirect=True + ) + def test_hook_ignore_verify_from_extra_field_as_header(self, setup_connections_with_extras): + get_hook = HttpHook(method="GET", http_conn_id="http_conn_with_extras") + conn = get_hook.get_conn() + assert dict(conn.headers, **{"bearer": "test", "verify": False}) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("verify") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is False + assert conn.cert is None + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT + assert conn.trust_env is True + + @pytest.mark.parametrize( + "setup_connections_with_extras", + [{"bearer": "test", "cert": "cert.crt", "stream": True}], + indirect=True, + ) + def test_hook_ignore_cert_from_extra_field_as_header(self, setup_connections_with_extras): + get_hook = HttpHook(method="GET", http_conn_id="http_conn_with_extras") + conn = get_hook.get_conn() + assert dict(conn.headers, **{"bearer": "test", "cert": "cert.crt", "stream": True}) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("cert") is None + assert conn.proxies == {} + assert conn.stream is True + assert conn.verify is True + assert conn.cert == "cert.crt" + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT + assert conn.trust_env is True + + @pytest.mark.parametrize( + "setup_connections_with_extras", [{"bearer": "test", "trust_env": False}], indirect=True + ) + def test_hook_ignore_trust_env_from_extra_field_as_header(self, setup_connections_with_extras): + get_hook = HttpHook(method="GET", http_conn_id="http_conn_with_extras") + conn = get_hook.get_conn() + assert dict(conn.headers, **{"bearer": "test", "trust_env": False}) != conn.headers + assert conn.headers.get("bearer") == "test" + assert conn.headers.get("cert") is None + assert conn.proxies == {} + assert conn.stream is False + assert conn.verify is True + assert conn.cert is None + assert conn.max_redirects == DEFAULT_REDIRECT_LIMIT + assert conn.trust_env is False def test_hook_with_method_in_lowercase(self, requests_mock): from requests.exceptions import InvalidURL, MissingSchema @@ -238,41 +273,35 @@ def test_hook_with_method_in_lowercase(self, requests_mock): text='{"status":{"status": 200}}', reason="OK", ) + get_lowercase_hook = HttpHook(method="get", http_conn_id="http_conn_without_bearer") + data = "test params" + with contextlib.suppress(MissingSchema, InvalidURL): + get_lowercase_hook.run("v1/test", data=data) - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ): - data = "test params" - with contextlib.suppress(MissingSchema, InvalidURL): - self.get_lowercase_hook.run("v1/test", data=data) + assert requests_mock.call_count == 1 + assert requests_mock.last_request.url == "http://test.com:1234/v1/test?test%20params" + assert requests_mock.last_request.method == "GET" - assert requests_mock.call_count == 1 - assert requests_mock.last_request.url == "http://test.com:1234/v1/test?test%20params" - assert requests_mock.last_request.method == "GET" - - @pytest.mark.db_test def test_hook_uses_provided_header(self): conn = self.get_hook.get_conn(headers={"bearer": "newT0k3n"}) assert conn.headers.get("bearer") == "newT0k3n" - @pytest.mark.db_test def test_hook_has_no_header_from_extra(self): + self.get_hook.http_conn_id = "http_conn_without_bearer" conn = self.get_hook.get_conn() assert conn.headers.get("bearer") is None def test_hooks_header_from_extra_is_overridden(self): - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - conn = self.get_hook.get_conn(headers={"bearer": "newT0k3n"}) - assert conn.headers.get("bearer") == "newT0k3n" + conn = self.get_hook.get_conn(headers={"bearer": "newT0k3n"}) + assert conn.headers.get("bearer") == "newT0k3n" def test_post_request(self, requests_mock): requests_mock.post( "http://test:8080/v1/test", status_code=200, text='{"status":{"status": 200}}', reason="OK" ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - resp = self.post_hook.run("v1/test") - assert resp.status_code == 200 + resp = self.post_hook.run("v1/test") + assert resp.status_code == 200 def test_post_request_with_error_code(self, requests_mock): requests_mock.post( @@ -281,10 +310,8 @@ def test_post_request_with_error_code(self, requests_mock): text='{"status":{"status": 418}}', reason="I'm a teapot", ) - - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - with pytest.raises(AirflowException): - self.post_hook.run("v1/test") + with pytest.raises(AirflowException): + self.post_hook.run("v1/test") def test_post_request_do_not_raise_for_status_if_check_response_is_false(self, requests_mock): requests_mock.post( @@ -294,9 +321,8 @@ def test_post_request_do_not_raise_for_status_if_check_response_is_false(self, r reason="I'm a teapot", ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - resp = self.post_hook.run("v1/test", extra_options={"check_response": False}) - assert resp.status_code == 418 + resp = self.post_hook.run("v1/test", extra_options={"check_response": False}) + assert resp.status_code == 418 def test_post_request_raises_error_when_redirects_with_max_redirects_set_to_0(self, requests_mock): requests_mock.post( @@ -310,37 +336,31 @@ def test_post_request_raises_error_when_redirects_with_max_redirects_set_to_0(se status_code=200, text='{"message": "OK"}', ) + with pytest.raises(requests.exceptions.TooManyRedirects) as err: + self.post_hook.run("v1/test", extra_options={"max_redirects": 0}) - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_dummy_connection - ): - with pytest.raises(requests.exceptions.TooManyRedirects) as err: - self.post_hook.run("v1/test", extra_options={"max_redirects": 0}) - - assert str(err.value) == "Exceeded 0 redirects." - history = requests_mock.request_history - assert len(history) == 1 - assert history[0].url == "http://test:8080/v1/test" - assert history[0].method == "POST" + assert str(err.value) == "Exceeded 0 redirects." + history = requests_mock.request_history + assert len(history) == 1 + assert history[0].url == "http://test:8080/v1/test" + assert history[0].method == "POST" + @pytest.mark.parametrize( + "setup_connections_with_extras", [{"bearer": "test", "check_response": False}], indirect=True + ) def test_post_request_do_not_raise_for_status_if_check_response_is_false_within_extra( - self, requests_mock + self, setup_connections_with_extras, requests_mock ): - airflow_connection = get_airflow_connection_with_extra( - extra={"bearer": "test", "check_response": False} - ) requests_mock.post( "http://test:8080/v1/test", status_code=418, text='{"status":{"status": 418}}', reason="I'm a teapot", ) + post_hook = HttpHook(method="POST", http_conn_id="http_conn_with_extras") + resp = post_hook.run("v1/test") + assert resp.status_code == 418 - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - resp = self.post_hook.run("v1/test") - assert resp.status_code == 418 - - @pytest.mark.db_test @mock.patch("requests.Session.send") def test_retry_on_conn_error(self, mock_session_send): retry_args = dict( @@ -367,9 +387,8 @@ def test_run_with_advanced_retry(self, requests_mock): retry=tenacity.retry_if_exception_type(Exception), reraise=True, ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - response = self.get_hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args) - assert isinstance(response, requests.Response) + response = self.get_hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args) + assert isinstance(response, requests.Response) def test_header_from_extra_and_run_method_are_merged(self): def run_and_return(unused_session, prepped_request, unused_extra_options, **kwargs): @@ -379,11 +398,10 @@ def run_and_return(unused_session, prepped_request, unused_extra_options, **kwar with mock.patch( "airflow.providers.http.hooks.http.HttpHook.run_and_check", side_effect=run_and_return ): - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - prepared_request = self.get_hook.run("v1/test", headers={"some_other_header": "test"}) - actual = dict(prepared_request.headers) - assert actual.get("bearer") == "test" - assert actual.get("some_other_header") == "test" + prepared_request = self.get_hook.run("v1/test", headers={"some_other_header": "test"}) + actual = dict(prepared_request.headers) + assert actual.get("bearer") == "test" + assert actual.get("some_other_header") == "test" @mock.patch("airflow.providers.http.hooks.http.HttpHook.get_connection") def test_http_connection(self, mock_get_connection): @@ -448,137 +466,112 @@ def match_obj1(request): requests_mock.request(method=method, url="//test:8080/v1/test", additional_matcher=match_obj1) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - # will raise NoMockAddress exception if obj1 != request.json() - HttpHook(method=method).run("v1/test", json=obj1) + # will raise NoMockAddress exception if obj1 != request.json() + HttpHook(method=method).run("v1/test", json=obj1) @mock.patch("requests.Session.send") def test_verify_set_to_true_by_default(self, mock_session_send): - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ): - self.get_hook.run("/some/endpoint") - mock_session_send.assert_called_once_with( - mock.ANY, - allow_redirects=True, - cert=None, - proxies={}, - stream=False, - timeout=None, - verify=True, - ) + self.get_hook.run("/some/endpoint") + mock_session_send.assert_called_once_with( + mock.ANY, + allow_redirects=True, + cert=None, + proxies={}, + stream=False, + timeout=None, + verify=True, + ) @mock.patch("requests.Session.send") @mock.patch.dict(os.environ, {"REQUESTS_CA_BUNDLE": "/tmp/test.crt"}) def test_requests_ca_bundle_env_var(self, mock_session_send): - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ): - self.get_hook.run("/some/endpoint") - - mock_session_send.assert_called_once_with( - mock.ANY, - allow_redirects=True, - cert=None, - proxies={}, - stream=False, - timeout=None, - verify="/tmp/test.crt", - ) + self.get_hook.run("/some/endpoint") + + mock_session_send.assert_called_once_with( + mock.ANY, + allow_redirects=True, + cert=None, + proxies={}, + stream=False, + timeout=None, + verify="/tmp/test.crt", + ) @mock.patch("requests.Session.send") @mock.patch.dict(os.environ, {"REQUESTS_CA_BUNDLE": "/tmp/test.crt"}) def test_verify_respects_requests_ca_bundle_env_var(self, mock_session_send): - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ): - self.get_hook.run("/some/endpoint", extra_options={"verify": True}) - - mock_session_send.assert_called_once_with( - mock.ANY, - allow_redirects=True, - cert=None, - proxies={}, - stream=False, - timeout=None, - verify="/tmp/test.crt", - ) + self.get_hook.run("/some/endpoint", extra_options={"verify": True}) + + mock_session_send.assert_called_once_with( + mock.ANY, + allow_redirects=True, + cert=None, + proxies={}, + stream=False, + timeout=None, + verify="/tmp/test.crt", + ) @mock.patch("requests.Session.send") @mock.patch.dict(os.environ, {"REQUESTS_CA_BUNDLE": "/tmp/test.crt"}) def test_verify_false_parameter_overwrites_set_requests_ca_bundle_env_var(self, mock_session_send): - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ): - self.get_hook.run("/some/endpoint", extra_options={"verify": False}) - - mock_session_send.assert_called_once_with( - mock.ANY, - allow_redirects=True, - cert=None, - proxies={}, - stream=False, - timeout=None, - verify=False, - ) + self.get_hook.run("/some/endpoint", extra_options={"verify": False}) + + mock_session_send.assert_called_once_with( + mock.ANY, + allow_redirects=True, + cert=None, + proxies={}, + stream=False, + timeout=None, + verify=False, + ) def test_connection_success(self, requests_mock): requests_mock.get("http://test:8080", status_code=200, json={"status": {"status": 200}}, reason="OK") - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - status, msg = self.get_hook.test_connection() - assert status is True - assert msg == "Connection successfully tested" + status, msg = self.get_hook.test_connection() + assert status is True + assert msg == "Connection successfully tested" def test_connection_failure(self, requests_mock): requests_mock.get( "http://test:8080", status_code=500, json={"message": "internal server error"}, reason="NOT_OK" ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - status, msg = self.get_hook.test_connection() - assert status is False - assert msg == "500:NOT_OK" + status, msg = self.get_hook.test_connection() + assert status is False + assert msg == "500:NOT_OK" @mock.patch("requests.auth.AuthBase.__init__") def test_loginless_custom_auth_initialized_with_no_args(self, auth): - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - auth.return_value = None - hook = HttpHook("GET", "http_default", AuthBase) - hook.get_conn() - auth.assert_called_once_with() + auth.return_value = None + hook = HttpHook("GET", "http_default", AuthBase) + hook.get_conn() + auth.assert_called_once_with() @mock.patch("requests.auth.AuthBase.__init__") def test_loginless_custom_auth_initialized_with_args(self, auth): - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - auth.return_value = None - auth_with_args = functools.partial(AuthBase, "test_arg") - hook = HttpHook("GET", "http_default", auth_with_args) - hook.get_conn() - auth.assert_called_once_with("test_arg") + auth.return_value = None + auth_with_args = functools.partial(AuthBase, "test_arg") + hook = HttpHook("GET", "http_default", auth_with_args) + hook.get_conn() + auth.assert_called_once_with("test_arg") @mock.patch("requests.auth.HTTPBasicAuth.__init__") def test_login_password_basic_auth_initialized(self, auth): - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", - side_effect=get_airflow_connection_with_login_and_password, - ): - auth.return_value = None - hook = HttpHook("GET", "http_default", HTTPBasicAuth) - hook.get_conn() - auth.assert_called_once_with("username", "pass") + auth.return_value = None + hook = HttpHook("GET", "http_conn_with_user_pwd", HTTPBasicAuth) + hook.get_conn() + auth.assert_called_once_with("username", "pass") @mock.patch("requests.auth.HTTPBasicAuth.__init__") def test_default_auth_not_initialized(self, auth): - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - auth.return_value = None - hook = HttpHook("GET", "http_default") - hook.get_conn() - auth.assert_not_called() + auth.return_value = None + hook = HttpHook("GET", "http_default") + hook.get_conn() + auth.assert_not_called() def test_keep_alive_enabled(self): with ( - mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ), mock.patch( "requests_toolbelt.adapters.socket_options.TCPKeepAliveAdapter.send" ) as tcp_keep_alive_send, @@ -595,9 +588,6 @@ def test_keep_alive_enabled(self): def test_keep_alive_disabled(self): with ( - mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ), mock.patch( "requests_toolbelt.adapters.socket_options.TCPKeepAliveAdapter.send" ) as tcp_keep_alive_send, @@ -625,18 +615,15 @@ def test_url_from_endpoint(self, base_url: str, endpoint: str, expected_url: str assert hook.url_from_endpoint(endpoint) == expected_url def test_custom_adapter(self): - with mock.patch( - "airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection_with_port - ): - custom_adapter = HTTPAdapter() - hook = HttpHook(method="GET", adapter=custom_adapter) - session = hook.get_conn() - assert isinstance(session.adapters["http://"], type(custom_adapter)), ( - "Custom HTTP adapter not correctly mounted" - ) - assert isinstance(session.adapters["https://"], type(custom_adapter)), ( - "Custom HTTPS adapter not correctly mounted" - ) + custom_adapter = HTTPAdapter() + hook = HttpHook(method="GET", adapter=custom_adapter) + session = hook.get_conn() + assert isinstance(session.adapters["http://"], type(custom_adapter)), ( + "Custom HTTP adapter not correctly mounted" + ) + assert isinstance(session.adapters["https://"], type(custom_adapter)), ( + "Custom HTTPS adapter not correctly mounted" + ) def test_process_extra_options_from_connection(self): extra_options = {} @@ -671,6 +658,15 @@ def test_process_extra_options_from_connection(self): class TestHttpAsyncHook: + @pytest.fixture(autouse=True) + def setup_connections(self, create_connection_without_db): + create_connection_without_db( + Connection( + conn_id="http_default", conn_type="http", host="test:8080/", extra='{"bearer": "test"}' + ) + ) + create_connection_without_db(Connection(conn_id="http_empty_conn", conn_type="http")) + @pytest.mark.asyncio async def test_do_api_call_async_non_retryable_error(self, aioresponse): """Test api call asynchronously with non retryable error.""" @@ -706,7 +702,6 @@ async def test_do_api_call_async_retryable_error(self, caplog, aioresponse): assert "[Try 3 of 3] Request to http://httpbin.org/non_existent_endpoint failed" in caplog.text - @pytest.mark.db_test @pytest.mark.asyncio async def test_do_api_call_async_unknown_method(self): """Test api call asynchronously for unknown http method.""" @@ -729,11 +724,9 @@ async def test_async_post_request(self): payload='{"status":{"status": 200}}', reason="OK", ) - - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - async with aiohttp.ClientSession() as session: - resp = await hook.run(session=session, endpoint="v1/test") - assert resp.status == 200 + async with aiohttp.ClientSession() as session: + resp = await hook.run(session=session, endpoint="v1/test") + assert resp.status == 200 @pytest.mark.asyncio async def test_async_post_request_with_error_code(self): @@ -747,11 +740,9 @@ async def test_async_post_request_with_error_code(self): payload='{"status":{"status": 418}}', reason="I am teapot", ) - - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - async with aiohttp.ClientSession() as session: - with pytest.raises(AirflowException): - await hook.run(session=session, endpoint="v1/test") + async with aiohttp.ClientSession() as session: + with pytest.raises(AirflowException): + await hook.run(session=session, endpoint="v1/test") @pytest.mark.asyncio async def test_async_request_uses_connection_extra(self): @@ -767,88 +758,84 @@ async def test_async_request_uses_connection_extra(self): reason="OK", ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - hook = HttpAsyncHook() - with mock.patch("aiohttp.ClientSession.post", new_callable=mock.AsyncMock) as mocked_function: - async with aiohttp.ClientSession() as session: - await hook.run(session=session, endpoint="v1/test") - headers = mocked_function.call_args.kwargs.get("headers") - assert all( - key in headers and headers[key] == value - for key, value in connection_extra.items() - ) + hook = HttpAsyncHook() + with mock.patch("aiohttp.ClientSession.post", new_callable=mock.AsyncMock) as mocked_function: + async with aiohttp.ClientSession() as session: + await hook.run(session=session, endpoint="v1/test") + headers = mocked_function.call_args.kwargs.get("headers") + assert all( + key in headers and headers[key] == value for key, value in connection_extra.items() + ) @pytest.mark.asyncio - async def test_async_request_uses_connection_extra_with_requests_parameters(self): + @pytest.mark.parametrize( + "setup_connections_with_extras", + [ + { + "bearer": "test", + "proxies": {"http": "http://proxy:80", "https": "https://proxy:80"}, + "timeout": 60, + "verify": False, + "allow_redirects": False, + "max_redirects": 3, + "trust_env": False, + } + ], + indirect=True, + ) + async def test_async_request_uses_connection_extra_with_requests_parameters( + self, setup_connections_with_extras + ): """Test api call asynchronously with a connection that has extra field.""" connection_extra = {"bearer": "test"} proxy = {"http": "http://proxy:80", "https": "https://proxy:80"} - airflow_connection = get_airflow_connection_with_extra( - extra={ - **connection_extra, - **{ - "proxies": proxy, - "timeout": 60, - "verify": False, - "allow_redirects": False, - "max_redirects": 3, - "trust_env": False, - }, - } - ) - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=airflow_connection): - hook = HttpAsyncHook() + hook = HttpAsyncHook(http_conn_id="http_conn_with_extras") - with aioresponses() as m: - m.post( - "http://test:8080/v1/test", - status=200, - payload='{"status":{"status": 200}}', - reason="OK", - ) - - with mock.patch("aiohttp.ClientSession.post", new_callable=mock.AsyncMock) as mocked_function: - async with aiohttp.ClientSession() as session: - await hook.run(session=session, endpoint="v1/test") - headers = mocked_function.call_args.kwargs.get("headers") - assert all( - key in headers and headers[key] == value - for key, value in connection_extra.items() - ) - assert mocked_function.call_args.kwargs.get("proxy") == proxy - assert mocked_function.call_args.kwargs.get("timeout") == 60 - assert mocked_function.call_args.kwargs.get("verify_ssl") is False - assert mocked_function.call_args.kwargs.get("allow_redirects") is False - assert mocked_function.call_args.kwargs.get("max_redirects") == 3 - assert mocked_function.call_args.kwargs.get("trust_env") is False + with aioresponses() as m: + m.post( + "http://test:8080/v1/test", + status=200, + payload='{"status":{"status": 200}}', + reason="OK", + ) + + with mock.patch("aiohttp.ClientSession.post", new_callable=mock.AsyncMock) as mocked_function: + async with aiohttp.ClientSession() as session: + await hook.run(session=session, endpoint="v1/test") + headers = mocked_function.call_args.kwargs.get("headers") + assert all( + key in headers and headers[key] == value for key, value in connection_extra.items() + ) + assert mocked_function.call_args.kwargs.get("proxy") == proxy + assert mocked_function.call_args.kwargs.get("timeout") == 60 + assert mocked_function.call_args.kwargs.get("verify_ssl") is False + assert mocked_function.call_args.kwargs.get("allow_redirects") is False + assert mocked_function.call_args.kwargs.get("max_redirects") == 3 + assert mocked_function.call_args.kwargs.get("trust_env") is False @pytest.mark.asyncio async def test_build_request_url_from_connection(self): conn = get_airflow_connection() schema = conn.schema or "http" # default to http - with mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_airflow_connection): - hook = HttpAsyncHook() + hook = HttpAsyncHook() - with aioresponses() as m: - m.post( - f"{schema}://test:8080/v1/test", - status=200, - payload='{"status":{"status": 200}}', - reason="OK", - ) + with aioresponses() as m: + m.post( + f"{schema}://test:8080/v1/test", + status=200, + payload='{"status":{"status": 200}}', + reason="OK", + ) - with mock.patch("aiohttp.ClientSession.post", new_callable=mock.AsyncMock) as mocked_function: - async with aiohttp.ClientSession() as session: - await hook.run(session=session, endpoint="v1/test") - assert mocked_function.call_args.args[0] == f"{schema}://{conn.host}v1/test" + with mock.patch("aiohttp.ClientSession.post", new_callable=mock.AsyncMock) as mocked_function: + async with aiohttp.ClientSession() as session: + await hook.run(session=session, endpoint="v1/test") + assert mocked_function.call_args.args[0] == f"{schema}://{conn.host}v1/test" @pytest.mark.asyncio async def test_build_request_url_from_endpoint_param(self): - def get_empty_conn(conn_id: str = "http_default"): - return Connection(conn_id=conn_id, conn_type="http") - - hook = HttpAsyncHook() + hook = HttpAsyncHook(http_conn_id="http_empty_conn") with aioresponses() as m: m.post( @@ -856,7 +843,6 @@ def get_empty_conn(conn_id: str = "http_default"): ) with ( - mock.patch("airflow.hooks.base.BaseHook.get_connection", side_effect=get_empty_conn), mock.patch("aiohttp.ClientSession.post", new_callable=mock.AsyncMock) as mocked_function, ): async with aiohttp.ClientSession() as session: diff --git a/providers/http/tests/unit/http/operators/test_http.py b/providers/http/tests/unit/http/operators/test_http.py index 31be860cef6c0..3a06fdca4dbab 100644 --- a/providers/http/tests/unit/http/operators/test_http.py +++ b/providers/http/tests/unit/http/operators/test_http.py @@ -33,6 +33,7 @@ from airflow.exceptions import AirflowException, TaskDeferred from airflow.hooks import base +from airflow.models import Connection from airflow.providers.http.hooks.http import HttpHook from airflow.providers.http.operators.http import HttpOperator from airflow.providers.http.triggers.http import HttpTrigger, serialize_auth_type @@ -40,6 +41,14 @@ @mock.patch.dict("os.environ", AIRFLOW_CONN_HTTP_EXAMPLE="http://www.example.com") class TestHttpOperator: + @pytest.fixture(autouse=True) + def setup_connections(self, create_connection_without_db): + create_connection_without_db( + Connection( + conn_id="http_default", conn_type="http", host="test:8080/", extra='{"bearer": "test"}' + ) + ) + def test_response_in_logs(self, requests_mock): """ Test that when using HttpOperator with 'GET', @@ -95,7 +104,6 @@ def test_filters_response(self, requests_mock): result = operator.execute({}) assert result == {"value": 5} - @pytest.mark.db_test def test_async_defer_successfully(self, requests_mock): operator = HttpOperator( task_id="test_HTTP_op", @@ -190,7 +198,6 @@ def pagination_function(response: Response) -> dict | None: assert result == [5, 10] - @pytest.mark.db_test def test_async_pagination(self, requests_mock): """ Test that the HttpOperator calls asynchronously and repetitively diff --git a/providers/http/tests/unit/http/triggers/test_http.py b/providers/http/tests/unit/http/triggers/test_http.py index 8af78149b24a7..e2e6a9c5a95c8 100644 --- a/providers/http/tests/unit/http/triggers/test_http.py +++ b/providers/http/tests/unit/http/triggers/test_http.py @@ -30,6 +30,7 @@ from requests.structures import CaseInsensitiveDict from yarl import URL +from airflow.models import Connection from airflow.providers.http.triggers.http import HttpSensorTrigger, HttpTrigger from airflow.triggers.base import TriggerEvent @@ -83,6 +84,14 @@ def client_response(): class TestHttpTrigger: + @pytest.fixture(autouse=True) + def setup_connections(self, create_connection_without_db): + create_connection_without_db( + Connection( + conn_id="http_default", conn_type="http", host="test:8080/", extra='{"bearer": "test"}' + ) + ) + @staticmethod def _mock_run_result(result_to_mock): f = Future() @@ -151,7 +160,6 @@ async def test_convert_response(self, client_response): assert response.reason == client_response.reason assert dict(response.cookies) == dict(client_response.cookies) - @pytest.mark.db_test @pytest.mark.asyncio @mock.patch("aiohttp.client.ClientSession.post") async def test_trigger_on_post_with_data(self, mock_http_post, trigger):