From 003ff92c6a5ea13088f3eb99178964fa80c7d04a Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sun, 9 May 2021 00:55:17 +0100 Subject: [PATCH 01/21] Initial commit of NTLM support --- httpx_auth/authentication.py | 171 ++++++++++++++++++++++++- setup.py | 6 +- tests/test_ntlm.py | 235 +++++++++++++++++++++++++++++++++++ 3 files changed, 410 insertions(+), 2 deletions(-) create mode 100644 tests/test_ntlm.py diff --git a/httpx_auth/authentication.py b/httpx_auth/authentication.py index 3b428d4..79a1e5d 100644 --- a/httpx_auth/authentication.py +++ b/httpx_auth/authentication.py @@ -1,11 +1,13 @@ import base64 import os import uuid +from enum import Enum from hashlib import sha256, sha512 -from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode from typing import Optional, Generator +from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode import httpx +from spnego.ntlm import NTLMProxy from httpx_auth import oauth2_authentication_responses_server, oauth2_tokens from httpx_auth.errors import InvalidGrantRequest, GrantNotProvided @@ -1165,6 +1167,173 @@ def __init__(self, username: str, password: str): httpx.BasicAuth.__init__(self, username, password) +class NTLM(httpx.Auth, SupportMultiAuth): + """Describes plain NTLM challenge-response authentication + NOTE: This does not support Channel Bindings which can (and ought to be) supported by servers. This is due to a + limitation in the HTTPCore library at present. + """ + + def __init__(self, username: str, password: str, domain: str = None): + """ + :param username: str username + :param password: str password + :param domain: str domain, default: None + """ + self.username = username + self.password = password + self.domain = domain + self.authentication_target = AuthenticationTarget.NONE + self.authenticate_type: Optional[str] = None + self.ntlm_auth_header = "" + + def auth_flow( + self, request: httpx.Request + ) -> Generator[httpx.Request, httpx.Response, None]: + + if self.authentication_target is not AuthenticationTarget.NONE: + request.headers[ + self.authentication_target.response_header_name() + ] = self.authentication_target + + response = yield request + + # If anything comes back except an authenticate challenge then return it for the client to deal with, hopefully + # a successful response. + if response.status_code not in [401, 407]: + return response + + # Otherwise authenticate, determine whether we need to auth to the server or to a proxy + candidate_auth_target = AuthenticationTarget.from_status_code( + response.status_code + ) + authenticate_header = response.headers.get( + candidate_auth_target.challenge_header_name(), "" + ) + auth_type = self._auth_type_from_header(authenticate_header) + if auth_type is not None: + self.authentication_target = candidate_auth_target + self.authenticate_type = auth_type + else: + return response + + # Create the NTLM proxy object to handle the auth process + ntlm_proxy = NTLMProxy(self.username, self.password, request.url.host) + + # Phase 1: + # Generate ntlm Negotiate message header, attach to request and resend. + response_header = self._make_authorization_header(ntlm_proxy.step(None)) + request.headers[ + self.authentication_target.response_header_name() + ] = response_header + response2 = yield request + + # Phase 2: + # Server responds with NTLM Challenge message, parse the authenticate header and deal with cookies. Some web + # apps use cookies to store progress in the auth process. + if "set-cookie" in response2.headers: + request.headers["Cookie"] = response2.headers["Cookie"] + + auth_header_value = response2.headers[ + self.authentication_target.challenge_header_name() + ] + ntlm_header_bytes = self._parse_ntlm_authenticate_header(auth_header_value) + + # Phase 3: + # Generate Authenticate message, attach to the request and resend it. If the user is authorized then this will + # succeed. If not then this will fail. + self.ntlm_auth_header = self._make_authorization_header( + ntlm_proxy.step(ntlm_header_bytes) + ) + request.headers[ + self.authentication_target.response_header_name() + ] = self.ntlm_auth_header + response3 = yield request + return response3 + + def _parse_ntlm_authenticate_header(self, header: str) -> bytes: + """ + Extract NTLM/Negotiate value from authenticate header and convert to bytes + :param header: str www or proxy-authenticate header + :return: bytes NTLM challenge + """ + + auth_strip = self.authenticate_type + " " + ntlm_header_value = next( + s + for s in (val.lstrip() for val in header.split(",")) + if s.startswith(auth_strip) + ) + return base64.b64decode(ntlm_header_value[len(auth_strip) :]) + + def _make_authorization_header(self, response_bytes: bytes) -> str: + """ + Convert the ntlm bytes to base64 encoded string and build authorization header. + :param response_bytes: bytes NTLM response content + :return: str Authorization/Proxy-Authorization header + """ + + ntlm_response = base64.b64encode(response_bytes).decode("ascii") + return "{} {}".format(self.authenticate_type, ntlm_response) + + def _auth_type_from_header(self, header: str) -> Optional[str]: + """ + Given a WWW-Authenticate header or Proxy-Authenticate header, returns the authentication + type to use. Prefer NTLM if the server supports it. + :param header: str Authenticate header + :return: Optional[str] Authentication type or None if not supported + """ + + if "ntlm" in header.lower(): + return "NTLM" + elif "negotiate" in header.lower(): + return "Negotiate" + return None + + +class AuthenticationTarget(Enum): + NONE = 0 + WWW = 1 + PROXY = 2 + + def response_header_name(self) -> Optional[str]: + """ + The name of the header to be sent in a response + :return: str response header name + """ + if self.value == 1: + return "Authorization" + elif self.value == 2: + return "Proxy-Authorization" + else: + return None + + def challenge_header_name(self) -> Optional[str]: + """ + The name of the header to expect in a challenge + :return: str challenge header name + """ + if self.value == 1: + return "WWW-Authenticate" + elif self.value == 2: + return "Proxy-Authenticate" + else: + return None + + @staticmethod + def from_status_code(status_code: int) -> "AuthenticationTarget": + """ + Create an instance of an AuthenticationTarget from a response status code + :param status_code: int HTTP status code + :return: AuthenticationTarget + """ + if status_code == 401: + return AuthenticationTarget.WWW + elif status_code == 407: + return AuthenticationTarget.PROXY + else: + return AuthenticationTarget.NONE + + class _MultiAuth(httpx.Auth): """Authentication using multiple authentication methods.""" diff --git a/setup.py b/setup.py index 184ff19..a6a61e5 100644 --- a/setup.py +++ b/setup.py @@ -36,7 +36,8 @@ packages=find_packages(exclude=["tests*"]), install_requires=[ # Used for Base Authentication and to communicate with OAuth2 servers - "httpx==0.18.*" + "httpx==0.18.*", + "pyspnego==0.1.6" ], extras_require={ "testing": [ @@ -46,6 +47,9 @@ "pytest_httpx==0.12.*", # Used to check coverage "pytest-cov==2.*", + # Used to test NTLM support + "pytest==6.*", + "pytest-mock==3.6.*" ] }, python_requires=">=3.6", diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py new file mode 100644 index 0000000..237678f --- /dev/null +++ b/tests/test_ntlm.py @@ -0,0 +1,235 @@ +import os +from functools import wraps +from typing import Optional + +import httpx +import pytest +from pytest_httpx import HTTPXMock +from pytest_mock import mocker + +from httpx_auth.authentication import NTLM, AuthenticationTarget + + +class TestAuthenticationTargetUnit: + @pytest.mark.parametrize( + ["status_code", "expected_type"], [(401, 1), (407, 2), (403, 0)] + ) + def test_from_status_code(self, status_code: int, expected_type: int): + auth_target = AuthenticationTarget.from_status_code(status_code) + assert auth_target.value == expected_type + + @pytest.mark.parametrize( + ["auth_target", "expected_output"], + [ + (AuthenticationTarget.WWW, "WWW-Authenticate"), + (AuthenticationTarget.PROXY, "Proxy-Authenticate"), + ], + ) + def test_challenge_header( + self, auth_target: AuthenticationTarget, expected_output: str + ): + actual_output = auth_target.challenge_header_name() + assert actual_output.lower() == expected_output.lower() + + @pytest.mark.parametrize( + ["auth_target", "expected_output"], + [ + (AuthenticationTarget.WWW, "Authorization"), + (AuthenticationTarget.PROXY, "Proxy-Authorization"), + ], + ) + def test_challenge_header( + self, auth_target: AuthenticationTarget, expected_output: str + ): + actual_output = auth_target.response_header_name() + assert actual_output.lower() == expected_output.lower() + + +@pytest.fixture() +def ntlm_auth_fixture(): + yield NTLM("test_user", "test_pass") + + +class TestNTLMUnit: + bytes_content = b"\x00\x01\x02\x03\x04\x05\x06\x07" + str_content = "AAECAwQFBgc=" + + def test_parse_auth_header_single_success(self, ntlm_auth_fixture): + ntlm_auth_fixture.authenticate_type = "NTLM" + + header_value = "NTLM {}".format(self.str_content) + actual_output = ntlm_auth_fixture._parse_ntlm_authenticate_header(header_value) + assert actual_output == self.bytes_content + + def test_parse_auth_header_multi_success(self, ntlm_auth_fixture): + ntlm_auth_fixture.authenticate_type = "Negotiate" + + header_value = "Negotiate {}, Basic dGVzdF91c2VyOnRlc3RfcGFzcw==".format( + self.str_content + ) + actual_output = ntlm_auth_fixture._parse_ntlm_authenticate_header(header_value) + assert actual_output == self.bytes_content + + def test_parse_auth_header_single_fail(self, ntlm_auth_fixture): + ntlm_auth_fixture.authenticate_type = "NTLM" + + header_value = "Negotiate {}".format(self.str_content) + with pytest.raises(StopIteration): + _ = ntlm_auth_fixture._parse_ntlm_authenticate_header(header_value) + + @pytest.mark.parametrize("auth_type", ["NTLM", "Negotiate"]) + def test_make_auth_header(self, ntlm_auth_fixture, auth_type: str): + expected_output = "{} {}".format(auth_type, self.str_content) + ntlm_auth_fixture.authenticate_type = auth_type + actual_str = ntlm_auth_fixture._make_authorization_header(self.bytes_content) + + assert actual_str == expected_output + + @pytest.mark.parametrize( + ["test_input", "expected_output"], + [ + ("NTLM Successful", "NTLM"), + ("NtLm Successful", "NTLM"), + ("Negotiate Successful", "Negotiate"), + ("NeGoTiATe Successful", "Negotiate"), + ], + ) + def test_auth_type_from_header( + self, ntlm_auth_fixture, test_input: str, expected_output: Optional[str] + ): + actual_output = ntlm_auth_fixture._auth_type_from_header(test_input) + assert actual_output.lower() == expected_output.lower() + + def test_auth_type_from_header_returns_none_when_not_ntlm(self, ntlm_auth_fixture): + header_content = "Basic Failure" + actual_output = ntlm_auth_fixture._auth_type_from_header(header_content) + assert actual_output is None + + +def wrap_with_workstation(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + try: + current_workstation = os.environ["NETBIOS_COMPUTER_NAME"] + except KeyError: + current_workstation = None + os.environ["NETBIOS_COMPUTER_NAME"] = "TESTWORKSTATION" + func(self, *args, **kwargs) + if current_workstation is not None: + os.environ["NETBIOS_COMPUTER_NAME"] = current_workstation + else: + del os.environ["NETBIOS_COMPUTER_NAME"] + + return wrapper + + +class TestNTLMFunctional: + def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): + httpx_mock.add_response(url="http://www.example.com/test", status_code=200) + + with httpx.Client() as client: + resp = client.get( + url="http://www.example.com/test", auth=NTLM("test_user", "test_pass") + ) + assert resp.status_code == 200 + assert len(httpx_mock.get_requests()) == 1 + + def test_http_401s_make_three_requests_and_return_401( + self, httpx_mock: HTTPXMock, mocker_fixture: mocker + ): + httpx_mock.add_response(status_code=401, headers={"WWW-Authenticate": "NTLM"}) + httpx_mock.add_response( + status_code=401, + headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, + match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + ) + + mocker_fixture.patch( + "httpx_auth.authentication.NTLMProxy.step", + return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + ) + + with httpx.Client() as client: + resp = client.get( + url="http://www.example.com/test", auth=NTLM("test_user", "test_pass") + ) + assert resp.status_code == 401 + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 + + def test_http_407s_make_three_requests_and_return_407( + self, httpx_mock: HTTPXMock, mocker_fixture: mocker + ): + httpx_mock.add_response( + status_code=407, headers={"Proxy-Authenticate": "Negotiate"} + ) + httpx_mock.add_response( + status_code=407, + headers={"Proxy-Authenticate": "Negotiate AAECAwQFBgc="}, + match_headers={"Proxy-Authorization": "Negotiate CAkKCwwNDg8="}, + ) + + mocker_fixture.patch( + "httpx_auth.authentication.NTLMProxy.step", + return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + ) + + with httpx.Client() as client: + resp = client.get( + url="http://www.example.com/test", auth=NTLM("test_user", "test_pass") + ) + assert resp.status_code == 407 + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 + + @wrap_with_workstation + @pytest.mark.parametrize("status_code", [200, 401, 403]) + def test_valid_handshake_returns_final_status( + self, httpx_mock, mocker_fixture: mocker, status_code: int + ): + expect1 = {"Authorization": "NTLM TlRMTVNTUAABAAAAN4II4AAAAAAgAAAAAAAAACAAAAA="} + response1 = { + "WWW-Authenticate": "NTLM TlRMTVNTUAACAAAAHgAeADgAAAA1gori1CEifyE0ovkAAAAAAAAAAJgAmABWAAAACgBh" + "SgAAAA9UAEUAUwBUAFcATwBSAEsAUwBUAEEAVABJAE8ATgACAB4AVABFAFMAVABXAE8AUgBLAFMAVABBAFQASQBPAE4AA" + "QAeAFQARQBTAFQAVwBPAFIASwBTAFQAQQBUAEkATwBOAAQAHgBUAEUAUwBUAFcATwBSAEsAUwBUAEEAVABJAE8ATgADAB" + "4AVABFAFMAVABXAE8AUgBLAFMAVABBAFQASQBPAE4ABwAIADbWHPMoRNcBAAAAAA==" + } + expect2 = { + "Authorization": "NTLM TlRMTVNTUAADAAAAGAAYAFgAAADwAPAAcAAAAAAAAABgAQAAEAAQAGABAAAeAB4AcAEAAAgAC" + "ACOAQAANYKK4gABBgAAAAAPw38elkNrZcKFdMx/yneDWQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACSQFWteoy7KhaGzllQe" + "8OIBAQAAAAAAADbWHPMoRNcB3q2+796tvu8AAAAAAgAeAFQARQBTAFQAVwBPAFIASwBTAFQAQQBUAEkATwBOAAEAHgBUAEU" + "AUwBUAFcATwBSAEsAUwBUAEEAVABJAE8ATgAEAB4AVABFAFMAVABXAE8AUgBLAFMAVABBAFQASQBPAE4AAwAeAFQARQBTAF" + "QAVwBPAFIASwBTAFQAQQBUAEkATwBOAAcACAA21hzzKETXAQkAHABIAE8AUwBUAC8AbABvAGMAYQBsAGgAbwBzAHQABgAEA" + "AIAAAAAAAAAAAAAAEkASQBTAF8AVABlAHMAdABUAEUAUwBUAFcATwBSAEsAUwBUAEEAVABJAE8ATgCbo4V5ivHWOA==" + } + + # Mock os.urandom since the client challenge is generated for the AUTHENTICATE message with 8 bytes of random + # date + mocker_fixture.patch( + "httpx_auth.authentication.os.urandom", + return_value=b"\xDE\xAD\xBE\xEF\xDE\xAD\xBE\xEF", + ) + + httpx_mock.add_response( + url="http://localhost/test", + status_code=401, + headers={"WWW-Authenticate": "NTLM"}, + ) + httpx_mock.add_response( + url="http://localhost/test", + status_code=401, + headers=response1, + match_headers=expect1, + ) + httpx_mock.add_response( + url="http://localhost/test", status_code=status_code, match_headers=expect2 + ) + + with httpx.Client() as client: + resp = client.get( + url="http://localhost/test", auth=NTLM("IIS_Test", "rosebud") + ) + print(resp.request.headers["Authorization"]) + assert resp.status_code == status_code + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 From c7fdedf14791e2cda5e587f25d58b15a60700c5e Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Sun, 9 May 2021 01:06:24 +0100 Subject: [PATCH 02/21] Make AuthenticationType private, add changes to changelog --- CHANGELOG.md | 8 ++++++++ httpx_auth/authentication.py | 16 ++++++++-------- tests/test_ntlm.py | 16 ++++++++-------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e94f34..65ecd9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- `httpx_auth.authentication` contains a new `NTLM` class that supports NTLM authentication without support for channel + bindings + +### Changed +- Requires [`pyspnego`](https://github.com/jborean93/pyspnego)==0.1.6 +- Requires [`pytest`](https://docs.pytest.org/en/latest)==6.2.\* for testing +- Requires [`pytest-mock`](https://github.com/pytest-dev/pytest-mock)==3.6.\* for testing ## [0.10.0] - 2021-04-27 ### Changed diff --git a/httpx_auth/authentication.py b/httpx_auth/authentication.py index 79a1e5d..7e2be33 100644 --- a/httpx_auth/authentication.py +++ b/httpx_auth/authentication.py @@ -1182,7 +1182,7 @@ def __init__(self, username: str, password: str, domain: str = None): self.username = username self.password = password self.domain = domain - self.authentication_target = AuthenticationTarget.NONE + self.authentication_target = _AuthenticationTarget.NONE self.authenticate_type: Optional[str] = None self.ntlm_auth_header = "" @@ -1190,7 +1190,7 @@ def auth_flow( self, request: httpx.Request ) -> Generator[httpx.Request, httpx.Response, None]: - if self.authentication_target is not AuthenticationTarget.NONE: + if self.authentication_target is not _AuthenticationTarget.NONE: request.headers[ self.authentication_target.response_header_name() ] = self.authentication_target @@ -1203,7 +1203,7 @@ def auth_flow( return response # Otherwise authenticate, determine whether we need to auth to the server or to a proxy - candidate_auth_target = AuthenticationTarget.from_status_code( + candidate_auth_target = _AuthenticationTarget.from_status_code( response.status_code ) authenticate_header = response.headers.get( @@ -1290,7 +1290,7 @@ def _auth_type_from_header(self, header: str) -> Optional[str]: return None -class AuthenticationTarget(Enum): +class _AuthenticationTarget(Enum): NONE = 0 WWW = 1 PROXY = 2 @@ -1320,18 +1320,18 @@ def challenge_header_name(self) -> Optional[str]: return None @staticmethod - def from_status_code(status_code: int) -> "AuthenticationTarget": + def from_status_code(status_code: int) -> "_AuthenticationTarget": """ Create an instance of an AuthenticationTarget from a response status code :param status_code: int HTTP status code :return: AuthenticationTarget """ if status_code == 401: - return AuthenticationTarget.WWW + return _AuthenticationTarget.WWW elif status_code == 407: - return AuthenticationTarget.PROXY + return _AuthenticationTarget.PROXY else: - return AuthenticationTarget.NONE + return _AuthenticationTarget.NONE class _MultiAuth(httpx.Auth): diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 237678f..a3905b9 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -7,7 +7,7 @@ from pytest_httpx import HTTPXMock from pytest_mock import mocker -from httpx_auth.authentication import NTLM, AuthenticationTarget +from httpx_auth.authentication import NTLM, _AuthenticationTarget class TestAuthenticationTargetUnit: @@ -15,18 +15,18 @@ class TestAuthenticationTargetUnit: ["status_code", "expected_type"], [(401, 1), (407, 2), (403, 0)] ) def test_from_status_code(self, status_code: int, expected_type: int): - auth_target = AuthenticationTarget.from_status_code(status_code) + auth_target = _AuthenticationTarget.from_status_code(status_code) assert auth_target.value == expected_type @pytest.mark.parametrize( ["auth_target", "expected_output"], [ - (AuthenticationTarget.WWW, "WWW-Authenticate"), - (AuthenticationTarget.PROXY, "Proxy-Authenticate"), + (_AuthenticationTarget.WWW, "WWW-Authenticate"), + (_AuthenticationTarget.PROXY, "Proxy-Authenticate"), ], ) def test_challenge_header( - self, auth_target: AuthenticationTarget, expected_output: str + self, auth_target: _AuthenticationTarget, expected_output: str ): actual_output = auth_target.challenge_header_name() assert actual_output.lower() == expected_output.lower() @@ -34,12 +34,12 @@ def test_challenge_header( @pytest.mark.parametrize( ["auth_target", "expected_output"], [ - (AuthenticationTarget.WWW, "Authorization"), - (AuthenticationTarget.PROXY, "Proxy-Authorization"), + (_AuthenticationTarget.WWW, "Authorization"), + (_AuthenticationTarget.PROXY, "Proxy-Authorization"), ], ) def test_challenge_header( - self, auth_target: AuthenticationTarget, expected_output: str + self, auth_target: _AuthenticationTarget, expected_output: str ): actual_output = auth_target.response_header_name() assert actual_output.lower() == expected_output.lower() From e5fcb52a1ccfe93ece1d83ce04c6c098c140ce95 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Mon, 24 May 2021 23:15:48 +0100 Subject: [PATCH 03/21] Fix sonarQube warnings about https and fixup import naming --- tests/test_ntlm.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index a3905b9..0ba37c5 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -125,17 +125,17 @@ def wrapper(self, *args, **kwargs): class TestNTLMFunctional: def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): - httpx_mock.add_response(url="http://www.example.com/test", status_code=200) + httpx_mock.add_response(url="https://www.example.com/test", status_code=200) with httpx.Client() as client: resp = client.get( - url="http://www.example.com/test", auth=NTLM("test_user", "test_pass") + url="https://www.example.com/test", auth=NTLM("test_user", "test_pass") ) assert resp.status_code == 200 assert len(httpx_mock.get_requests()) == 1 def test_http_401s_make_three_requests_and_return_401( - self, httpx_mock: HTTPXMock, mocker_fixture: mocker + self, httpx_mock: HTTPXMock, mocker ): httpx_mock.add_response(status_code=401, headers={"WWW-Authenticate": "NTLM"}) httpx_mock.add_response( @@ -144,21 +144,21 @@ def test_http_401s_make_three_requests_and_return_401( match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, ) - mocker_fixture.patch( + mocker.patch( "httpx_auth.authentication.NTLMProxy.step", return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", ) with httpx.Client() as client: resp = client.get( - url="http://www.example.com/test", auth=NTLM("test_user", "test_pass") + url="https://www.example.com/test", auth=NTLM("test_user", "test_pass") ) assert resp.status_code == 401 assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 def test_http_407s_make_three_requests_and_return_407( - self, httpx_mock: HTTPXMock, mocker_fixture: mocker + self, httpx_mock: HTTPXMock, mocker ): httpx_mock.add_response( status_code=407, headers={"Proxy-Authenticate": "Negotiate"} @@ -169,14 +169,14 @@ def test_http_407s_make_three_requests_and_return_407( match_headers={"Proxy-Authorization": "Negotiate CAkKCwwNDg8="}, ) - mocker_fixture.patch( + mocker.patch( "httpx_auth.authentication.NTLMProxy.step", return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", ) with httpx.Client() as client: resp = client.get( - url="http://www.example.com/test", auth=NTLM("test_user", "test_pass") + url="https://www.example.com/test", auth=NTLM("test_user", "test_pass") ) assert resp.status_code == 407 assert len(resp.history) == 2 @@ -185,7 +185,7 @@ def test_http_407s_make_three_requests_and_return_407( @wrap_with_workstation @pytest.mark.parametrize("status_code", [200, 401, 403]) def test_valid_handshake_returns_final_status( - self, httpx_mock, mocker_fixture: mocker, status_code: int + self, httpx_mock, mocker, status_code: int ): expect1 = {"Authorization": "NTLM TlRMTVNTUAABAAAAN4II4AAAAAAgAAAAAAAAACAAAAA="} response1 = { @@ -205,29 +205,29 @@ def test_valid_handshake_returns_final_status( # Mock os.urandom since the client challenge is generated for the AUTHENTICATE message with 8 bytes of random # date - mocker_fixture.patch( + mocker.patch( "httpx_auth.authentication.os.urandom", return_value=b"\xDE\xAD\xBE\xEF\xDE\xAD\xBE\xEF", ) httpx_mock.add_response( - url="http://localhost/test", + url="https://localhost/test", status_code=401, headers={"WWW-Authenticate": "NTLM"}, ) httpx_mock.add_response( - url="http://localhost/test", + url="https://localhost/test", status_code=401, headers=response1, match_headers=expect1, ) httpx_mock.add_response( - url="http://localhost/test", status_code=status_code, match_headers=expect2 + url="https://localhost/test", status_code=status_code, match_headers=expect2 ) with httpx.Client() as client: resp = client.get( - url="http://localhost/test", auth=NTLM("IIS_Test", "rosebud") + url="https://localhost/test", auth=NTLM("IIS_Test", "rosebud") ) print(resp.request.headers["Authorization"]) assert resp.status_code == status_code From f30553dbd4b07f5402a8c423fe87c4c49a44da47 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 15:47:28 +0100 Subject: [PATCH 04/21] First pass at Negotiate implementation - Remove subclasses - Use supported API for spnego.client - Update some tests --- httpx_auth/authentication.py | 261 +++++++++++++++++++---------------- setup.py | 6 +- tests/test_ntlm.py | 236 +++++++++---------------------- 3 files changed, 211 insertions(+), 292 deletions(-) diff --git a/httpx_auth/authentication.py b/httpx_auth/authentication.py index 7e2be33..fdb18d4 100644 --- a/httpx_auth/authentication.py +++ b/httpx_auth/authentication.py @@ -3,11 +3,18 @@ import uuid from enum import Enum from hashlib import sha256, sha512 -from typing import Optional, Generator +from typing import Optional, Generator, List from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode import httpx -from spnego.ntlm import NTLMProxy + +try: + import spnego + + WINDOWS_AUTH = True +except ImportError: + spnego = None + WINDOWS_AUTH = False from httpx_auth import oauth2_authentication_responses_server, oauth2_tokens from httpx_auth.errors import InvalidGrantRequest, GrantNotProvided @@ -1167,173 +1174,185 @@ def __init__(self, username: str, password: str): httpx.BasicAuth.__init__(self, username, password) -class NTLM(httpx.Auth, SupportMultiAuth): - """Describes plain NTLM challenge-response authentication +class Negotiate(httpx.Auth, SupportMultiAuth): + """ NOTE: This does not support Channel Bindings which can (and ought to be) supported by servers. This is due to a limitation in the HTTPCore library at present. """ - def __init__(self, username: str, password: str, domain: str = None): + _username: str + _password: str + force_ntlm: bool + auth_header: str + auth_complete: bool + auth_type: str + _service: str + _context_proxy: "spnego._context.ContextProxy" + max_redirects: int = 10 + + def __init__( + self, + username: str = None, + password: str = None, + force_ntlm: bool = False, + service: str = None, + max_redirects: int = 10, + ) -> None: """ - :param username: str username - :param password: str password - :param domain: str domain, default: None + :param username: Username and domain (if required). Optional for servers that support Kerberos, required for + those that require NTLM + :param password: Password if required by server for authentication. + :param force_ntlm: Force authentication to use NTLM if available. + :param service: Service portion of the target Service Principal Name (default HTTP) + :return: None """ - self.username = username - self.password = password - self.domain = domain - self.authentication_target = _AuthenticationTarget.NONE - self.authenticate_type: Optional[str] = None - self.ntlm_auth_header = "" + if not WINDOWS_AUTH: + raise ImportError( + "Windows authentication support not enabled, install with the windows_auth extra." + ) + if password and not username: + raise ValueError( + "Negotiate authentication with credentials requires username and password, no username was provided." + ) + if force_ntlm and not (username and password): + raise ValueError( + "NTLM authentication requires credentials, provide a username and password." + ) + self._username = username + self._password = password + self.force_ntlm = force_ntlm + self.auth_header = "" + self.auth_complete = False + self.auth_type = "" + self._service = service + self.max_redirects = max_redirects def auth_flow( self, request: httpx.Request ) -> Generator[httpx.Request, httpx.Response, None]: - if self.authentication_target is not _AuthenticationTarget.NONE: - request.headers[ - self.authentication_target.response_header_name() - ] = self.authentication_target - + responses = [] response = yield request + responses.append(response) + + redirect_count = 0 # If anything comes back except an authenticate challenge then return it for the client to deal with, hopefully # a successful response. - if response.status_code not in [401, 407]: - return response + if responses[-1].status_code != 401: + return responses[-1] - # Otherwise authenticate, determine whether we need to auth to the server or to a proxy - candidate_auth_target = _AuthenticationTarget.from_status_code( - response.status_code + # Otherwise authenticate. Determine the authentication name to use, prefer Negotiate if available. + self.auth_type = self._auth_type_from_header( + responses[-1].headers.get("WWW-Authenticate") ) - authenticate_header = response.headers.get( - candidate_auth_target.challenge_header_name(), "" - ) - auth_type = self._auth_type_from_header(authenticate_header) - if auth_type is not None: - self.authentication_target = candidate_auth_target - self.authenticate_type = auth_type - else: - return response - - # Create the NTLM proxy object to handle the auth process - ntlm_proxy = NTLMProxy(self.username, self.password, request.url.host) - + if self.auth_type is None: + return responses[-1] + + # Run authentication flow. + yield from self._do_auth_flow(request, responses) + + # If we were redirected we will need to rerun the auth flow on the new url, repeat until either we receive a + # status that is not 401 Unauthorized, or until the url we ended up at is the same as the one we requested. + while responses[-1].status_code == 401 and responses[-1].url != request.url: + redirect_count += 1 + if redirect_count > self.max_redirects: + raise httpx.TooManyRedirects( + message=f"Redirected too many times ({self.max_redirects}).", + request=request, + ) + request.url = responses[-1].url + yield from self._do_auth_flow(request, responses) + + return responses[-1] + + def _do_auth_flow( + self, request: httpx.Request, responses: List[httpx.Response] + ) -> Generator[httpx.Request, httpx.Response, None]: # Phase 1: - # Generate ntlm Negotiate message header, attach to request and resend. - response_header = self._make_authorization_header(ntlm_proxy.step(None)) - request.headers[ - self.authentication_target.response_header_name() - ] = response_header - response2 = yield request + # Configure context proxy, generate message header, attach to request and resend. + host = request.url.host + self.context_proxy = self._new_context_proxy() + self.context_proxy.spn = "{0}/{1}".format( + self._service.upper() if self._service else "HTTP", host + ) + request.headers["Authorization"] = self._make_authorization_header( + self.context_proxy.step(None) + ) + response = yield request + responses.append(response) # Phase 2: - # Server responds with NTLM Challenge message, parse the authenticate header and deal with cookies. Some web - # apps use cookies to store progress in the auth process. - if "set-cookie" in response2.headers: - request.headers["Cookie"] = response2.headers["Cookie"] + # Server responds with Challenge message, parse the authenticate header and deal with cookies. Some web apps use + # cookies to store progress in the auth process. + if "set-cookie" in responses[-1].headers: + request.headers["Cookie"] = responses[-1].headers["Cookie"] - auth_header_value = response2.headers[ - self.authentication_target.challenge_header_name() - ] - ntlm_header_bytes = self._parse_ntlm_authenticate_header(auth_header_value) + auth_header_bytes = self._parse_authenticate_header( + responses[-1].headers["WWW-Authenticate"] + ) # Phase 3: # Generate Authenticate message, attach to the request and resend it. If the user is authorized then this will # succeed. If not then this will fail. - self.ntlm_auth_header = self._make_authorization_header( - ntlm_proxy.step(ntlm_header_bytes) + self.auth_header = self._make_authorization_header( + self.context_proxy.step(auth_header_bytes) + ) + request.headers["Authorization"] = self.auth_header + response = yield request + responses.append(response) + + def _new_context_proxy(self) -> "spnego._context.ContextProxy": + client = spnego.client( + self._username, + self._password, + service=self._service, + protocol="ntlm" if self.force_ntlm else "negotiate", ) - request.headers[ - self.authentication_target.response_header_name() - ] = self.ntlm_auth_header - response3 = yield request - return response3 + if self.force_ntlm: + client.options = spnego.NegotiateOptions.use_ntlm + client.protocol = "ntlm" + return client - def _parse_ntlm_authenticate_header(self, header: str) -> bytes: + def _parse_authenticate_header(self, header: str) -> bytes: """ - Extract NTLM/Negotiate value from authenticate header and convert to bytes - :param header: str www or proxy-authenticate header - :return: bytes NTLM challenge + Extract NTLM/Negotiate value from Authenticate header and convert to bytes + :param header: str WWW-Authenticate header + :return: bytes Negotiate challenge """ - auth_strip = self.authenticate_type + " " - ntlm_header_value = next( + auth_strip = self.auth_type.lower() + " " + auth_header_value = next( s for s in (val.lstrip() for val in header.split(",")) - if s.startswith(auth_strip) + if s.lower().startswith(auth_strip) ) - return base64.b64decode(ntlm_header_value[len(auth_strip) :]) + return base64.b64decode(auth_header_value[len(auth_strip) :]) def _make_authorization_header(self, response_bytes: bytes) -> str: """ - Convert the ntlm bytes to base64 encoded string and build authorization header. - :param response_bytes: bytes NTLM response content + Convert the auth bytes to base64 encoded string and build Authorization header. + :param response_bytes: bytes auth response content :return: str Authorization/Proxy-Authorization header """ - ntlm_response = base64.b64encode(response_bytes).decode("ascii") - return "{} {}".format(self.authenticate_type, ntlm_response) + auth_response = base64.b64encode(response_bytes).decode("ascii") + return f"{self.auth_type} {auth_response}" - def _auth_type_from_header(self, header: str) -> Optional[str]: + @staticmethod + def _auth_type_from_header(header: str) -> Optional[str]: """ - Given a WWW-Authenticate header or Proxy-Authenticate header, returns the authentication - type to use. Prefer NTLM if the server supports it. + Given a WWW-Authenticate header, returns the authentication type to use. :param header: str Authenticate header :return: Optional[str] Authentication type or None if not supported """ - - if "ntlm" in header.lower(): - return "NTLM" - elif "negotiate" in header.lower(): + if "negotiate" in header.lower(): return "Negotiate" + elif "ntlm" in header.lower(): + return "NTLM" return None -class _AuthenticationTarget(Enum): - NONE = 0 - WWW = 1 - PROXY = 2 - - def response_header_name(self) -> Optional[str]: - """ - The name of the header to be sent in a response - :return: str response header name - """ - if self.value == 1: - return "Authorization" - elif self.value == 2: - return "Proxy-Authorization" - else: - return None - - def challenge_header_name(self) -> Optional[str]: - """ - The name of the header to expect in a challenge - :return: str challenge header name - """ - if self.value == 1: - return "WWW-Authenticate" - elif self.value == 2: - return "Proxy-Authenticate" - else: - return None - - @staticmethod - def from_status_code(status_code: int) -> "_AuthenticationTarget": - """ - Create an instance of an AuthenticationTarget from a response status code - :param status_code: int HTTP status code - :return: AuthenticationTarget - """ - if status_code == 401: - return _AuthenticationTarget.WWW - elif status_code == 407: - return _AuthenticationTarget.PROXY - else: - return _AuthenticationTarget.NONE - - class _MultiAuth(httpx.Auth): """Authentication using multiple authentication methods.""" diff --git a/setup.py b/setup.py index a6a61e5..bace16b 100644 --- a/setup.py +++ b/setup.py @@ -36,8 +36,7 @@ packages=find_packages(exclude=["tests*"]), install_requires=[ # Used for Base Authentication and to communicate with OAuth2 servers - "httpx==0.18.*", - "pyspnego==0.1.6" + "httpx==0.18.*" ], extras_require={ "testing": [ @@ -50,6 +49,9 @@ # Used to test NTLM support "pytest==6.*", "pytest-mock==3.6.*" + ], + 'windows_auth': [ + "pyspnego[kerberos]==0.1.6" ] }, python_requires=">=3.6", diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 0ba37c5..2279202 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -1,87 +1,57 @@ -import os -from functools import wraps +import sys from typing import Optional import httpx import pytest +import spnego from pytest_httpx import HTTPXMock -from pytest_mock import mocker -from httpx_auth.authentication import NTLM, _AuthenticationTarget +from httpx_auth.authentication import Negotiate -class TestAuthenticationTargetUnit: - @pytest.mark.parametrize( - ["status_code", "expected_type"], [(401, 1), (407, 2), (403, 0)] - ) - def test_from_status_code(self, status_code: int, expected_type: int): - auth_target = _AuthenticationTarget.from_status_code(status_code) - assert auth_target.value == expected_type - - @pytest.mark.parametrize( - ["auth_target", "expected_output"], - [ - (_AuthenticationTarget.WWW, "WWW-Authenticate"), - (_AuthenticationTarget.PROXY, "Proxy-Authenticate"), - ], - ) - def test_challenge_header( - self, auth_target: _AuthenticationTarget, expected_output: str - ): - actual_output = auth_target.challenge_header_name() - assert actual_output.lower() == expected_output.lower() - - @pytest.mark.parametrize( - ["auth_target", "expected_output"], - [ - (_AuthenticationTarget.WWW, "Authorization"), - (_AuthenticationTarget.PROXY, "Proxy-Authorization"), - ], - ) - def test_challenge_header( - self, auth_target: _AuthenticationTarget, expected_output: str - ): - actual_output = auth_target.response_header_name() - assert actual_output.lower() == expected_output.lower() +TEST_USER = "test_user" +TEST_PASS = "test_pass" @pytest.fixture() -def ntlm_auth_fixture(): - yield NTLM("test_user", "test_pass") +def negotiate_auth_fixture(): + yield Negotiate(TEST_USER, TEST_PASS) -class TestNTLMUnit: +class TestNegotiateUnit: bytes_content = b"\x00\x01\x02\x03\x04\x05\x06\x07" str_content = "AAECAwQFBgc=" - def test_parse_auth_header_single_success(self, ntlm_auth_fixture): - ntlm_auth_fixture.authenticate_type = "NTLM" + def test_parse_auth_header_single_success(self, negotiate_auth_fixture): + negotiate_auth_fixture.auth_type = "NTLM" - header_value = "NTLM {}".format(self.str_content) - actual_output = ntlm_auth_fixture._parse_ntlm_authenticate_header(header_value) + header_value = f"NTLM {self.str_content}" + actual_output = negotiate_auth_fixture._parse_authenticate_header(header_value) assert actual_output == self.bytes_content - def test_parse_auth_header_multi_success(self, ntlm_auth_fixture): - ntlm_auth_fixture.authenticate_type = "Negotiate" + def test_parse_auth_header_multi_success(self, negotiate_auth_fixture): + negotiate_auth_fixture.auth_type = "Negotiate" - header_value = "Negotiate {}, Basic dGVzdF91c2VyOnRlc3RfcGFzcw==".format( - self.str_content + header_value = ( + f"Negotiate {self.str_content}, Basic dGVzdF91c2VyOnRlc3RfcGFzcw==" ) - actual_output = ntlm_auth_fixture._parse_ntlm_authenticate_header(header_value) + actual_output = negotiate_auth_fixture._parse_authenticate_header(header_value) assert actual_output == self.bytes_content - def test_parse_auth_header_single_fail(self, ntlm_auth_fixture): - ntlm_auth_fixture.authenticate_type = "NTLM" + def test_parse_auth_header_single_fail(self, negotiate_auth_fixture): + negotiate_auth_fixture.auth_type = "NTLM" - header_value = "Negotiate {}".format(self.str_content) + header_value = f"Negotiate {self.str_content}" with pytest.raises(StopIteration): - _ = ntlm_auth_fixture._parse_ntlm_authenticate_header(header_value) + _ = negotiate_auth_fixture._parse_authenticate_header(header_value) @pytest.mark.parametrize("auth_type", ["NTLM", "Negotiate"]) - def test_make_auth_header(self, ntlm_auth_fixture, auth_type: str): - expected_output = "{} {}".format(auth_type, self.str_content) - ntlm_auth_fixture.authenticate_type = auth_type - actual_str = ntlm_auth_fixture._make_authorization_header(self.bytes_content) + def test_make_auth_header(self, negotiate_auth_fixture, auth_type: str): + expected_output = f"{auth_type} {self.str_content}" + negotiate_auth_fixture.auth_type = auth_type + actual_str = negotiate_auth_fixture._make_authorization_header( + self.bytes_content + ) assert actual_str == expected_output @@ -92,48 +62,53 @@ def test_make_auth_header(self, ntlm_auth_fixture, auth_type: str): ("NtLm Successful", "NTLM"), ("Negotiate Successful", "Negotiate"), ("NeGoTiATe Successful", "Negotiate"), + ("Negotiate", "Negotiate"), ], ) def test_auth_type_from_header( - self, ntlm_auth_fixture, test_input: str, expected_output: Optional[str] + self, negotiate_auth_fixture, test_input: str, expected_output: Optional[str] ): - actual_output = ntlm_auth_fixture._auth_type_from_header(test_input) + actual_output = negotiate_auth_fixture._auth_type_from_header(test_input) assert actual_output.lower() == expected_output.lower() - def test_auth_type_from_header_returns_none_when_not_ntlm(self, ntlm_auth_fixture): + def test_auth_type_from_header_returns_none_when_not_ntlm( + self, negotiate_auth_fixture + ): header_content = "Basic Failure" - actual_output = ntlm_auth_fixture._auth_type_from_header(header_content) + actual_output = negotiate_auth_fixture._auth_type_from_header(header_content) assert actual_output is None - -def wrap_with_workstation(func): - @wraps(func) - def wrapper(self, *args, **kwargs): - try: - current_workstation = os.environ["NETBIOS_COMPUTER_NAME"] - except KeyError: - current_workstation = None - os.environ["NETBIOS_COMPUTER_NAME"] = "TESTWORKSTATION" - func(self, *args, **kwargs) - if current_workstation is not None: - os.environ["NETBIOS_COMPUTER_NAME"] = current_workstation - else: - del os.environ["NETBIOS_COMPUTER_NAME"] - - return wrapper - - -class TestNTLMFunctional: + def test_new_context_proxy(self, negotiate_auth_fixture): + proxy = negotiate_auth_fixture._new_context_proxy() + assert proxy.username == TEST_USER + assert proxy.password == TEST_PASS + assert proxy.protocol.lower() == "negotiate" + assert spnego.NegotiateOptions.use_ntlm not in proxy.options + assert proxy.spn.lower() == "host/unspecified" + + def test_new_context_proxy_with_ntlm(self, negotiate_auth_fixture): + negotiate_auth_fixture.force_ntlm = True + proxy = negotiate_auth_fixture._new_context_proxy() + assert proxy.username == TEST_USER + assert proxy.password == TEST_PASS + assert proxy.protocol.lower() == "ntlm" + assert spnego.NegotiateOptions.use_ntlm in proxy.options + assert proxy.spn.lower() == "host/unspecified" + + +class TestNegotiateFunctional: def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): httpx_mock.add_response(url="https://www.example.com/test", status_code=200) with httpx.Client() as client: resp = client.get( - url="https://www.example.com/test", auth=NTLM("test_user", "test_pass") + url="https://www.example.com/test", + auth=Negotiate("test_user", "test_pass"), ) assert resp.status_code == 200 assert len(httpx_mock.get_requests()) == 1 + @pytest.mark.skipif(sys.platform != "nt", reason="Different proxy is used on linux") def test_http_401s_make_three_requests_and_return_401( self, httpx_mock: HTTPXMock, mocker ): @@ -144,92 +119,15 @@ def test_http_401s_make_three_requests_and_return_401( match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, ) - mocker.patch( - "httpx_auth.authentication.NTLMProxy.step", - return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", - ) - - with httpx.Client() as client: - resp = client.get( - url="https://www.example.com/test", auth=NTLM("test_user", "test_pass") - ) - assert resp.status_code == 401 - assert len(resp.history) == 2 - assert len(httpx_mock.get_requests()) == 3 - - def test_http_407s_make_three_requests_and_return_407( - self, httpx_mock: HTTPXMock, mocker - ): - httpx_mock.add_response( - status_code=407, headers={"Proxy-Authenticate": "Negotiate"} - ) - httpx_mock.add_response( - status_code=407, - headers={"Proxy-Authenticate": "Negotiate AAECAwQFBgc="}, - match_headers={"Proxy-Authorization": "Negotiate CAkKCwwNDg8="}, - ) - - mocker.patch( - "httpx_auth.authentication.NTLMProxy.step", + with mocker.patch( + "httpx_auth.authentication.spnego.sspi.SSPIProxy.step", return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", - ) - - with httpx.Client() as client: - resp = client.get( - url="https://www.example.com/test", auth=NTLM("test_user", "test_pass") - ) - assert resp.status_code == 407 - assert len(resp.history) == 2 - assert len(httpx_mock.get_requests()) == 3 - - @wrap_with_workstation - @pytest.mark.parametrize("status_code", [200, 401, 403]) - def test_valid_handshake_returns_final_status( - self, httpx_mock, mocker, status_code: int - ): - expect1 = {"Authorization": "NTLM TlRMTVNTUAABAAAAN4II4AAAAAAgAAAAAAAAACAAAAA="} - response1 = { - "WWW-Authenticate": "NTLM TlRMTVNTUAACAAAAHgAeADgAAAA1gori1CEifyE0ovkAAAAAAAAAAJgAmABWAAAACgBh" - "SgAAAA9UAEUAUwBUAFcATwBSAEsAUwBUAEEAVABJAE8ATgACAB4AVABFAFMAVABXAE8AUgBLAFMAVABBAFQASQBPAE4AA" - "QAeAFQARQBTAFQAVwBPAFIASwBTAFQAQQBUAEkATwBOAAQAHgBUAEUAUwBUAFcATwBSAEsAUwBUAEEAVABJAE8ATgADAB" - "4AVABFAFMAVABXAE8AUgBLAFMAVABBAFQASQBPAE4ABwAIADbWHPMoRNcBAAAAAA==" - } - expect2 = { - "Authorization": "NTLM TlRMTVNTUAADAAAAGAAYAFgAAADwAPAAcAAAAAAAAABgAQAAEAAQAGABAAAeAB4AcAEAAAgAC" - "ACOAQAANYKK4gABBgAAAAAPw38elkNrZcKFdMx/yneDWQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACSQFWteoy7KhaGzllQe" - "8OIBAQAAAAAAADbWHPMoRNcB3q2+796tvu8AAAAAAgAeAFQARQBTAFQAVwBPAFIASwBTAFQAQQBUAEkATwBOAAEAHgBUAEU" - "AUwBUAFcATwBSAEsAUwBUAEEAVABJAE8ATgAEAB4AVABFAFMAVABXAE8AUgBLAFMAVABBAFQASQBPAE4AAwAeAFQARQBTAF" - "QAVwBPAFIASwBTAFQAQQBUAEkATwBOAAcACAA21hzzKETXAQkAHABIAE8AUwBUAC8AbABvAGMAYQBsAGgAbwBzAHQABgAEA" - "AIAAAAAAAAAAAAAAEkASQBTAF8AVABlAHMAdABUAEUAUwBUAFcATwBSAEsAUwBUAEEAVABJAE8ATgCbo4V5ivHWOA==" - } - - # Mock os.urandom since the client challenge is generated for the AUTHENTICATE message with 8 bytes of random - # date - mocker.patch( - "httpx_auth.authentication.os.urandom", - return_value=b"\xDE\xAD\xBE\xEF\xDE\xAD\xBE\xEF", - ) - - httpx_mock.add_response( - url="https://localhost/test", - status_code=401, - headers={"WWW-Authenticate": "NTLM"}, - ) - httpx_mock.add_response( - url="https://localhost/test", - status_code=401, - headers=response1, - match_headers=expect1, - ) - httpx_mock.add_response( - url="https://localhost/test", status_code=status_code, match_headers=expect2 - ) - - with httpx.Client() as client: - resp = client.get( - url="https://localhost/test", auth=NTLM("IIS_Test", "rosebud") - ) - print(resp.request.headers["Authorization"]) - assert resp.status_code == status_code - assert len(resp.history) == 2 - assert len(httpx_mock.get_requests()) == 3 + ): + with httpx.Client() as client: + resp = client.get( + url="https://www.example.com/test", + auth=Negotiate("test_user", "test_pass"), + ) + assert resp.status_code == 401 + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 From fc372b7c3013b969c6d62b6fe5af1c790ba9913d Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 16:32:53 +0100 Subject: [PATCH 05/21] Update documentation and package export --- README.md | 45 ++++++++++++++++++++++++++++++++++++ httpx_auth/__init__.py | 1 + httpx_auth/authentication.py | 1 - 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f97e54c..05ea866 100644 --- a/README.md +++ b/README.md @@ -667,6 +667,51 @@ with httpx.Client() as client: | `username` | User name. | Mandatory | | | `password` | User password. | Mandatory | | +## Negotiate and NTLM + +Support for Negotiate, Kerberos and NTLM authentication is optional, install with the `windows_auth` extra to enable +this feature. + +You can use Negotiate, Kerberos and NTLM authentication with `httpx_auth.Negotiate`. + +### Using cached credentials for Kerberos authentication + +Cached credentials are used by default for Kerberos authentication, this relies on a Ticket-Granting Ticket being +present on your system from `kinit` or similar. This is supported by default on Windows, on Linux it relies on system +packages being installed. See documentation for the [pyspnego](https://pypi.org/project/pyspnego/) package for more +information. + +```python +import httpx +from httpx_auth import Negotiate + +with httpx.Client() as client: + client.get('https://www.example.com', auth=Negotiate()) +``` + +### Using provided credentials for NTLM authentication + +Where other credentials are required, or if Kerberos is not supported, provide a username and password for +authentication: + +```python +import httpx +from httpx_auth import Negotiate + +with httpx.Client() as client: + client.get('https://www.example.com', auth=Negotiate('domain\\username', 'password')) +``` + +### Parameters + +| Name | Description | Mandatory | Default value | +|:------------------------|:-----------------------------------------------------------|:----------|:--------------| +| `username` | User name. | Optional | | +| `password` | User password. | Optional | | +| `force_ntlm` | Force the use of NTLM auth | Optional | False | +| `service` | Name portion of the server SPN | Optional | "Service" | +| `max_redirects` | Maximum number of redirects to follow while authenticating | Optional | 10 | + ## Multiple authentication at once You can also use a combination of authentication using `+`or `&` as in the following sample: diff --git a/httpx_auth/__init__.py b/httpx_auth/__init__.py index 8ab6089..5cdc875 100644 --- a/httpx_auth/__init__.py +++ b/httpx_auth/__init__.py @@ -15,6 +15,7 @@ OAuth2ClientCredentials, OktaClientCredentials, OAuth2ResourceOwnerPasswordCredentials, + Negotiate, ) from httpx_auth.oauth2_tokens import JsonTokenFileCache from httpx_auth.aws import AWS4Auth diff --git a/httpx_auth/authentication.py b/httpx_auth/authentication.py index fdb18d4..6ad0e39 100644 --- a/httpx_auth/authentication.py +++ b/httpx_auth/authentication.py @@ -1,7 +1,6 @@ import base64 import os import uuid -from enum import Enum from hashlib import sha256, sha512 from typing import Optional, Generator, List from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode From 4f4c2d07475cdf5c6e02bac7e327b4d7f06d3429 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 16:37:10 +0100 Subject: [PATCH 06/21] Skip test_ntlm if extra is not present --- tests/test_ntlm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 2279202..19b1b32 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -3,7 +3,7 @@ import httpx import pytest -import spnego +spnego = pytest.importorskip('spnego') from pytest_httpx import HTTPXMock from httpx_auth.authentication import Negotiate From 95021ade981e09fffb694bb4adff9f00b96d6cfc Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 16:43:47 +0100 Subject: [PATCH 07/21] Add extra to travis install script and add missing changelog... --- .travis.yml | 2 +- CHANGELOG.md | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index ec60a42..cbffa3a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ python: - "3.8" - "3.9" install: - - pip install .[testing] + - pip install .[testing, windows_auth] script: - pytest --cov=httpx_auth --cov-fail-under=100 --cov-report=term-missing - pip install idna==2.10 diff --git a/CHANGELOG.md b/CHANGELOG.md index 65ecd9e..3d1c22b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added -- `httpx_auth.authentication` contains a new `NTLM` class that supports NTLM authentication without support for channel - bindings +- `httpx_auth.authentication` contains a new `Negotiate` class that supports Kerberos and NTLM authentication without + support for channel binding tokens +- Added extra `windows_auth` to enable support for Negotiate and NTLM authentication ### Changed -- Requires [`pyspnego`](https://github.com/jborean93/pyspnego)==0.1.6 +- Optionally requires [`pyspnego[kerberos]`](https://github.com/jborean93/pyspnego)==0.1.6 - Requires [`pytest`](https://docs.pytest.org/en/latest)==6.2.\* for testing - Requires [`pytest-mock`](https://github.com/pytest-dev/pytest-mock)==3.6.\* for testing From a34f030518d76970c3b6c563db3cef08d86906a0 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 16:45:21 +0100 Subject: [PATCH 08/21] Remove extra comma in extras --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index cbffa3a..023c34d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ python: - "3.8" - "3.9" install: - - pip install .[testing, windows_auth] + - pip install .[testing,windows_auth] script: - pytest --cov=httpx_auth --cov-fail-under=100 --cov-report=term-missing - pip install idna==2.10 From 89043ea4d631cea043aabf52c4bd78a867a3ab87 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 16:52:15 +0100 Subject: [PATCH 09/21] Add additional tests to close some gaps in coverage --- tests/test_ntlm.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 19b1b32..2a17d8a 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -95,6 +95,16 @@ def test_new_context_proxy_with_ntlm(self, negotiate_auth_fixture): assert spnego.NegotiateOptions.use_ntlm in proxy.options assert proxy.spn.lower() == "host/unspecified" + def test_password_with_no_username_throws(self): + with pytest.raises(ValueError) as exception_info: + _ = Negotiate(password=TEST_PASS) + assert "no username was provided" in str(exception_info) + + def test_ntlm_with_no_credentials_throws(self): + with pytest.raises(ValueError) as exception_info: + _ = Negotiate(force_ntlm=True) + assert "provide a username and password" in str(exception_info) + class TestNegotiateFunctional: def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): @@ -108,7 +118,6 @@ def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): assert resp.status_code == 200 assert len(httpx_mock.get_requests()) == 1 - @pytest.mark.skipif(sys.platform != "nt", reason="Different proxy is used on linux") def test_http_401s_make_three_requests_and_return_401( self, httpx_mock: HTTPXMock, mocker ): @@ -119,8 +128,12 @@ def test_http_401s_make_three_requests_and_return_401( match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, ) + if sys.platform == "nt": + patch_object = "httpx_auth.authentication.spnego.sspi.SSPIProxy.step" + else: + patch_object = "httpx_auth.authentication.spnego.sspi.GSSAPIProxy.step" with mocker.patch( - "httpx_auth.authentication.spnego.sspi.SSPIProxy.step", + patch_object, return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", ): with httpx.Client() as client: From 752f73a2400d69c038f5fd70b8fab92af534f0b4 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 16:54:36 +0100 Subject: [PATCH 10/21] Patch correct file --- tests/test_ntlm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 2a17d8a..b342b2e 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -131,7 +131,7 @@ def test_http_401s_make_three_requests_and_return_401( if sys.platform == "nt": patch_object = "httpx_auth.authentication.spnego.sspi.SSPIProxy.step" else: - patch_object = "httpx_auth.authentication.spnego.sspi.GSSAPIProxy.step" + patch_object = "httpx_auth.authentication.spnego.gss.GSSAPIProxy.step" with mocker.patch( patch_object, return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", From 3eae2ae0c61cefbe1cf0541162a5f987f38fc1ec Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 17:32:25 +0100 Subject: [PATCH 11/21] Extract Negotiate to separate file - Add a functional test - Update imports --- httpx_auth/__init__.py | 2 +- httpx_auth/authentication.py | 189 +--------------------------------- httpx_auth/negotiate.py | 192 +++++++++++++++++++++++++++++++++++ tests/test_ntlm.py | 59 ++++++++--- 4 files changed, 241 insertions(+), 201 deletions(-) create mode 100644 httpx_auth/negotiate.py diff --git a/httpx_auth/__init__.py b/httpx_auth/__init__.py index 5cdc875..7e08257 100644 --- a/httpx_auth/__init__.py +++ b/httpx_auth/__init__.py @@ -15,8 +15,8 @@ OAuth2ClientCredentials, OktaClientCredentials, OAuth2ResourceOwnerPasswordCredentials, - Negotiate, ) +from httpx_auth.negotiate import Negotiate from httpx_auth.oauth2_tokens import JsonTokenFileCache from httpx_auth.aws import AWS4Auth from httpx_auth.errors import ( diff --git a/httpx_auth/authentication.py b/httpx_auth/authentication.py index 6ad0e39..ccdf3f4 100644 --- a/httpx_auth/authentication.py +++ b/httpx_auth/authentication.py @@ -2,19 +2,11 @@ import os import uuid from hashlib import sha256, sha512 -from typing import Optional, Generator, List +from typing import Optional, Generator from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode import httpx -try: - import spnego - - WINDOWS_AUTH = True -except ImportError: - spnego = None - WINDOWS_AUTH = False - from httpx_auth import oauth2_authentication_responses_server, oauth2_tokens from httpx_auth.errors import InvalidGrantRequest, GrantNotProvided @@ -1173,185 +1165,6 @@ def __init__(self, username: str, password: str): httpx.BasicAuth.__init__(self, username, password) -class Negotiate(httpx.Auth, SupportMultiAuth): - """ - NOTE: This does not support Channel Bindings which can (and ought to be) supported by servers. This is due to a - limitation in the HTTPCore library at present. - """ - - _username: str - _password: str - force_ntlm: bool - auth_header: str - auth_complete: bool - auth_type: str - _service: str - _context_proxy: "spnego._context.ContextProxy" - max_redirects: int = 10 - - def __init__( - self, - username: str = None, - password: str = None, - force_ntlm: bool = False, - service: str = None, - max_redirects: int = 10, - ) -> None: - """ - :param username: Username and domain (if required). Optional for servers that support Kerberos, required for - those that require NTLM - :param password: Password if required by server for authentication. - :param force_ntlm: Force authentication to use NTLM if available. - :param service: Service portion of the target Service Principal Name (default HTTP) - :return: None - """ - if not WINDOWS_AUTH: - raise ImportError( - "Windows authentication support not enabled, install with the windows_auth extra." - ) - if password and not username: - raise ValueError( - "Negotiate authentication with credentials requires username and password, no username was provided." - ) - if force_ntlm and not (username and password): - raise ValueError( - "NTLM authentication requires credentials, provide a username and password." - ) - self._username = username - self._password = password - self.force_ntlm = force_ntlm - self.auth_header = "" - self.auth_complete = False - self.auth_type = "" - self._service = service - self.max_redirects = max_redirects - - def auth_flow( - self, request: httpx.Request - ) -> Generator[httpx.Request, httpx.Response, None]: - - responses = [] - response = yield request - responses.append(response) - - redirect_count = 0 - - # If anything comes back except an authenticate challenge then return it for the client to deal with, hopefully - # a successful response. - if responses[-1].status_code != 401: - return responses[-1] - - # Otherwise authenticate. Determine the authentication name to use, prefer Negotiate if available. - self.auth_type = self._auth_type_from_header( - responses[-1].headers.get("WWW-Authenticate") - ) - if self.auth_type is None: - return responses[-1] - - # Run authentication flow. - yield from self._do_auth_flow(request, responses) - - # If we were redirected we will need to rerun the auth flow on the new url, repeat until either we receive a - # status that is not 401 Unauthorized, or until the url we ended up at is the same as the one we requested. - while responses[-1].status_code == 401 and responses[-1].url != request.url: - redirect_count += 1 - if redirect_count > self.max_redirects: - raise httpx.TooManyRedirects( - message=f"Redirected too many times ({self.max_redirects}).", - request=request, - ) - request.url = responses[-1].url - yield from self._do_auth_flow(request, responses) - - return responses[-1] - - def _do_auth_flow( - self, request: httpx.Request, responses: List[httpx.Response] - ) -> Generator[httpx.Request, httpx.Response, None]: - # Phase 1: - # Configure context proxy, generate message header, attach to request and resend. - host = request.url.host - self.context_proxy = self._new_context_proxy() - self.context_proxy.spn = "{0}/{1}".format( - self._service.upper() if self._service else "HTTP", host - ) - request.headers["Authorization"] = self._make_authorization_header( - self.context_proxy.step(None) - ) - response = yield request - responses.append(response) - - # Phase 2: - # Server responds with Challenge message, parse the authenticate header and deal with cookies. Some web apps use - # cookies to store progress in the auth process. - if "set-cookie" in responses[-1].headers: - request.headers["Cookie"] = responses[-1].headers["Cookie"] - - auth_header_bytes = self._parse_authenticate_header( - responses[-1].headers["WWW-Authenticate"] - ) - - # Phase 3: - # Generate Authenticate message, attach to the request and resend it. If the user is authorized then this will - # succeed. If not then this will fail. - self.auth_header = self._make_authorization_header( - self.context_proxy.step(auth_header_bytes) - ) - request.headers["Authorization"] = self.auth_header - response = yield request - responses.append(response) - - def _new_context_proxy(self) -> "spnego._context.ContextProxy": - client = spnego.client( - self._username, - self._password, - service=self._service, - protocol="ntlm" if self.force_ntlm else "negotiate", - ) - if self.force_ntlm: - client.options = spnego.NegotiateOptions.use_ntlm - client.protocol = "ntlm" - return client - - def _parse_authenticate_header(self, header: str) -> bytes: - """ - Extract NTLM/Negotiate value from Authenticate header and convert to bytes - :param header: str WWW-Authenticate header - :return: bytes Negotiate challenge - """ - - auth_strip = self.auth_type.lower() + " " - auth_header_value = next( - s - for s in (val.lstrip() for val in header.split(",")) - if s.lower().startswith(auth_strip) - ) - return base64.b64decode(auth_header_value[len(auth_strip) :]) - - def _make_authorization_header(self, response_bytes: bytes) -> str: - """ - Convert the auth bytes to base64 encoded string and build Authorization header. - :param response_bytes: bytes auth response content - :return: str Authorization/Proxy-Authorization header - """ - - auth_response = base64.b64encode(response_bytes).decode("ascii") - return f"{self.auth_type} {auth_response}" - - @staticmethod - def _auth_type_from_header(header: str) -> Optional[str]: - """ - Given a WWW-Authenticate header, returns the authentication type to use. - :param header: str Authenticate header - :return: Optional[str] Authentication type or None if not supported - """ - if "negotiate" in header.lower(): - return "Negotiate" - elif "ntlm" in header.lower(): - return "NTLM" - return None - - class _MultiAuth(httpx.Auth): """Authentication using multiple authentication methods.""" diff --git a/httpx_auth/negotiate.py b/httpx_auth/negotiate.py new file mode 100644 index 0000000..f40107b --- /dev/null +++ b/httpx_auth/negotiate.py @@ -0,0 +1,192 @@ +import base64 +from typing import Generator, List, Optional + +import httpx +from httpx_auth.authentication import SupportMultiAuth + +try: + import spnego + + WINDOWS_AUTH = True +except ImportError: + spnego = None + WINDOWS_AUTH = False + + +class Negotiate(httpx.Auth, SupportMultiAuth): + """ + NOTE: This does not support Channel Bindings which can (and ought to be) supported by servers. This is due to a + limitation in the HTTPCore library at present. + """ + + _username: str + _password: str + force_ntlm: bool + auth_header: str + auth_complete: bool + auth_type: str + _service: str + _context_proxy: "spnego._context.ContextProxy" + max_redirects: int = 10 + + def __init__( + self, + username: str = None, + password: str = None, + force_ntlm: bool = False, + service: str = None, + max_redirects: int = 10, + ) -> None: + """ + :param username: Username and domain (if required). Optional for servers that support Kerberos, required for + those that require NTLM + :param password: Password if required by server for authentication. + :param force_ntlm: Force authentication to use NTLM if available. + :param service: Service portion of the target Service Principal Name (default HTTP) + :return: None + """ + if not WINDOWS_AUTH: + raise ImportError( + "Windows authentication support not enabled, install with the windows_auth extra." + ) + if password and not username: + raise ValueError( + "Negotiate authentication with credentials requires username and password, no username was provided." + ) + if force_ntlm and not (username and password): + raise ValueError( + "NTLM authentication requires credentials, provide a username and password." + ) + self._username = username + self._password = password + self.force_ntlm = force_ntlm + self.auth_header = "" + self.auth_complete = False + self.auth_type = "" + self._service = service + self.max_redirects = max_redirects + + def auth_flow( + self, request: httpx.Request + ) -> Generator[httpx.Request, httpx.Response, None]: + + responses = [] + response = yield request + responses.append(response) + + redirect_count = 0 + + # If anything comes back except an authenticate challenge then return it for the client to deal with, hopefully + # a successful response. + if responses[-1].status_code != 401: + return responses[-1] + + # Otherwise authenticate. Determine the authentication name to use, prefer Negotiate if available. + self.auth_type = self._auth_type_from_header( + responses[-1].headers.get("WWW-Authenticate") + ) + if self.auth_type is None: + return responses[-1] + + # Run authentication flow. + yield from self._do_auth_flow(request, responses) + + # If we were redirected we will need to rerun the auth flow on the new url, repeat until either we receive a + # status that is not 401 Unauthorized, or until the url we ended up at is the same as the one we requested. + while responses[-1].status_code == 401 and responses[-1].url != request.url: + redirect_count += 1 + if redirect_count > self.max_redirects: + raise httpx.TooManyRedirects( + message=f"Redirected too many times ({self.max_redirects}).", + request=request, + ) + request.url = responses[-1].url + yield from self._do_auth_flow(request, responses) + + return responses[-1] + + def _do_auth_flow( + self, request: httpx.Request, responses: List[httpx.Response] + ) -> Generator[httpx.Request, httpx.Response, None]: + # Phase 1: + # Configure context proxy, generate message header, attach to request and resend. + host = request.url.host + self.context_proxy = self._new_context_proxy() + self.context_proxy.spn = "{0}/{1}".format( + self._service.upper() if self._service else "HTTP", host + ) + request.headers["Authorization"] = self._make_authorization_header( + self.context_proxy.step(None) + ) + response = yield request + responses.append(response) + + # Phase 2: + # Server responds with Challenge message, parse the authenticate header and deal with cookies. Some web apps use + # cookies to store progress in the auth process. + if "set-cookie" in responses[-1].headers: + request.headers["Cookie"] = responses[-1].headers["Cookie"] + + auth_header_bytes = self._parse_authenticate_header( + responses[-1].headers["WWW-Authenticate"] + ) + + # Phase 3: + # Generate Authenticate message, attach to the request and resend it. If the user is authorized then this will + # succeed. If not then this will fail. + self.auth_header = self._make_authorization_header( + self.context_proxy.step(auth_header_bytes) + ) + request.headers["Authorization"] = self.auth_header + response = yield request + responses.append(response) + + def _new_context_proxy(self) -> "spnego._context.ContextProxy": + client = spnego.client( + self._username, + self._password, + service=self._service, + protocol="ntlm" if self.force_ntlm else "negotiate", + ) + if self.force_ntlm: + client.options = spnego.NegotiateOptions.use_ntlm + client.protocol = "ntlm" + return client + + def _parse_authenticate_header(self, header: str) -> bytes: + """ + Extract NTLM/Negotiate value from Authenticate header and convert to bytes + :param header: str WWW-Authenticate header + :return: bytes Negotiate challenge + """ + + auth_strip = self.auth_type.lower() + " " + auth_header_value = next( + s + for s in (val.lstrip() for val in header.split(",")) + if s.lower().startswith(auth_strip) + ) + return base64.b64decode(auth_header_value[len(auth_strip) :]) + + def _make_authorization_header(self, response_bytes: bytes) -> str: + """ + Convert the auth bytes to base64 encoded string and build Authorization header. + :param response_bytes: bytes auth response content + :return: str Authorization/Proxy-Authorization header + """ + + auth_response = base64.b64encode(response_bytes).decode("ascii") + return f"{self.auth_type} {auth_response}" + + @staticmethod + def _auth_type_from_header(header: str) -> Optional[str]: + """ + Given a WWW-Authenticate header, returns the authentication type to use. + :param header: str Authenticate header + :return: Optional[str] Authentication type or None if not supported + """ + if "negotiate" in header.lower(): + return "Negotiate" + elif "ntlm" in header.lower(): + return "NTLM" + return None diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index b342b2e..c1ac58a 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -1,13 +1,12 @@ import sys from typing import Optional +from pytest_httpx import HTTPXMock +from httpx_auth import Negotiate import httpx import pytest -spnego = pytest.importorskip('spnego') -from pytest_httpx import HTTPXMock - -from httpx_auth.authentication import Negotiate +spnego = pytest.importorskip("spnego") TEST_USER = "test_user" TEST_PASS = "test_pass" @@ -66,13 +65,13 @@ def test_make_auth_header(self, negotiate_auth_fixture, auth_type: str): ], ) def test_auth_type_from_header( - self, negotiate_auth_fixture, test_input: str, expected_output: Optional[str] + self, negotiate_auth_fixture, test_input: str, expected_output: Optional[str] ): actual_output = negotiate_auth_fixture._auth_type_from_header(test_input) assert actual_output.lower() == expected_output.lower() def test_auth_type_from_header_returns_none_when_not_ntlm( - self, negotiate_auth_fixture + self, negotiate_auth_fixture ): header_content = "Basic Failure" actual_output = negotiate_auth_fixture._auth_type_from_header(header_content) @@ -119,7 +118,7 @@ def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): assert len(httpx_mock.get_requests()) == 1 def test_http_401s_make_three_requests_and_return_401( - self, httpx_mock: HTTPXMock, mocker + self, httpx_mock: HTTPXMock, mocker ): httpx_mock.add_response(status_code=401, headers={"WWW-Authenticate": "NTLM"}) httpx_mock.add_response( @@ -128,13 +127,13 @@ def test_http_401s_make_three_requests_and_return_401( match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, ) - if sys.platform == "nt": - patch_object = "httpx_auth.authentication.spnego.sspi.SSPIProxy.step" + if sys.platform == "win32": + patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" else: - patch_object = "httpx_auth.authentication.spnego.gss.GSSAPIProxy.step" + patch_object = "httpx_auth.negotiate.spnego.gss.GSSAPIProxy.step" with mocker.patch( - patch_object, - return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + patch_object, + return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", ): with httpx.Client() as client: resp = client.get( @@ -144,3 +143,39 @@ def test_http_401s_make_three_requests_and_return_401( assert resp.status_code == 401 assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 + + @pytest.mark.parametrize('status_code', [200, 403, 404]) + def test_http_response_reported_correctly_when_auth_completes( + self, httpx_mock: HTTPXMock, mocker, status_code + ): + httpx_mock.add_response(status_code=401, headers={"WWW-Authenticate": "NTLM"}) + httpx_mock.add_response( + status_code=401, + headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, + match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + ) + httpx_mock.add_response( + status_code=status_code, + match_headers={"Authorization": "NTLM Dw4NDAsKCQg="}, + ) + + if sys.platform == "win32": + patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" + else: + patch_object = "httpx_auth.negotiate.spnego.gss.GSSAPIProxy.step" + + with mocker.patch( + patch_object, + side_effect=[ + b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08" + ] + ): + with httpx.Client() as client: + resp = client.get( + url="https://www.example.com/test", + auth=Negotiate("test_user", "test_pass"), + ) + assert resp.status_code == status_code + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 From 0af9cd5841e1b5f542bed7e8cf3f679e56e48293 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 17:49:53 +0100 Subject: [PATCH 12/21] Patch correct method on linux --- tests/test_ntlm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index c1ac58a..0be0a53 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -130,7 +130,7 @@ def test_http_401s_make_three_requests_and_return_401( if sys.platform == "win32": patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" else: - patch_object = "httpx_auth.negotiate.spnego.gss.GSSAPIProxy.step" + patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" with mocker.patch( patch_object, return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", @@ -162,7 +162,7 @@ def test_http_response_reported_correctly_when_auth_completes( if sys.platform == "win32": patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" else: - patch_object = "httpx_auth.negotiate.spnego.gss.GSSAPIProxy.step" + patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" with mocker.patch( patch_object, From e39b884f55af834a622b06303f4e59b2390b8289 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 17:53:32 +0100 Subject: [PATCH 13/21] Reformat test_ntlm and fixup sonarqube issue --- tests/test_ntlm.py | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 0be0a53..b155ca0 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -10,6 +10,7 @@ TEST_USER = "test_user" TEST_PASS = "test_pass" +TEST_URL = "https://www.example.com/test" @pytest.fixture() @@ -65,13 +66,13 @@ def test_make_auth_header(self, negotiate_auth_fixture, auth_type: str): ], ) def test_auth_type_from_header( - self, negotiate_auth_fixture, test_input: str, expected_output: Optional[str] + self, negotiate_auth_fixture, test_input: str, expected_output: Optional[str] ): actual_output = negotiate_auth_fixture._auth_type_from_header(test_input) assert actual_output.lower() == expected_output.lower() def test_auth_type_from_header_returns_none_when_not_ntlm( - self, negotiate_auth_fixture + self, negotiate_auth_fixture ): header_content = "Basic Failure" actual_output = negotiate_auth_fixture._auth_type_from_header(header_content) @@ -107,21 +108,24 @@ def test_ntlm_with_no_credentials_throws(self): class TestNegotiateFunctional: def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): - httpx_mock.add_response(url="https://www.example.com/test", status_code=200) + httpx_mock.add_response(url=TEST_URL, status_code=200) with httpx.Client() as client: resp = client.get( - url="https://www.example.com/test", + url=TEST_URL, auth=Negotiate("test_user", "test_pass"), ) assert resp.status_code == 200 assert len(httpx_mock.get_requests()) == 1 def test_http_401s_make_three_requests_and_return_401( - self, httpx_mock: HTTPXMock, mocker + self, httpx_mock: HTTPXMock, mocker ): - httpx_mock.add_response(status_code=401, headers={"WWW-Authenticate": "NTLM"}) httpx_mock.add_response( + url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} + ) + httpx_mock.add_response( + url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, @@ -132,29 +136,33 @@ def test_http_401s_make_three_requests_and_return_401( else: patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" with mocker.patch( - patch_object, - return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + patch_object, + return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", ): with httpx.Client() as client: resp = client.get( - url="https://www.example.com/test", + url=TEST_URL, auth=Negotiate("test_user", "test_pass"), ) assert resp.status_code == 401 assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 - @pytest.mark.parametrize('status_code', [200, 403, 404]) + @pytest.mark.parametrize("status_code", [200, 403, 404]) def test_http_response_reported_correctly_when_auth_completes( - self, httpx_mock: HTTPXMock, mocker, status_code + self, httpx_mock: HTTPXMock, mocker, status_code ): - httpx_mock.add_response(status_code=401, headers={"WWW-Authenticate": "NTLM"}) httpx_mock.add_response( + url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} + ) + httpx_mock.add_response( + url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, ) httpx_mock.add_response( + url=TEST_URL, status_code=status_code, match_headers={"Authorization": "NTLM Dw4NDAsKCQg="}, ) @@ -165,15 +173,15 @@ def test_http_response_reported_correctly_when_auth_completes( patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" with mocker.patch( - patch_object, - side_effect=[ - b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", - b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08" - ] + patch_object, + side_effect=[ + b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", + ], ): with httpx.Client() as client: resp = client.get( - url="https://www.example.com/test", + url=TEST_URL, auth=Negotiate("test_user", "test_pass"), ) assert resp.status_code == status_code From de51c3479a11e2d6130e5513529d5ee09686c2c2 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 18:20:26 +0100 Subject: [PATCH 14/21] Add tests and fix cookie handling issue - Add tests for redirect handling - Add test for setting cookies during auth process --- httpx_auth/negotiate.py | 2 +- tests/test_ntlm.py | 149 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 1 deletion(-) diff --git a/httpx_auth/negotiate.py b/httpx_auth/negotiate.py index f40107b..c662a61 100644 --- a/httpx_auth/negotiate.py +++ b/httpx_auth/negotiate.py @@ -125,7 +125,7 @@ def _do_auth_flow( # Server responds with Challenge message, parse the authenticate header and deal with cookies. Some web apps use # cookies to store progress in the auth process. if "set-cookie" in responses[-1].headers: - request.headers["Cookie"] = responses[-1].headers["Cookie"] + request.headers["Cookie"] = responses[-1].headers["set-cookie"] auth_header_bytes = self._parse_authenticate_header( responses[-1].headers["WWW-Authenticate"] diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index b155ca0..2bd05d1 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -118,6 +118,19 @@ def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): assert resp.status_code == 200 assert len(httpx_mock.get_requests()) == 1 + def test_http_authenticate_with_digest_returns_401(self, httpx_mock: HTTPXMock): + httpx_mock.add_response( + url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "Digest"} + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=Negotiate("test_user", "test_pass"), + ) + assert resp.status_code == 401 + assert len(resp.history) == 0 + assert len(httpx_mock.get_requests()) == 1 + def test_http_401s_make_three_requests_and_return_401( self, httpx_mock: HTTPXMock, mocker ): @@ -148,6 +161,102 @@ def test_http_401s_make_three_requests_and_return_401( assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 + def test_authentication_with_redirect_is_followed(self, httpx_mock: HTTPXMock, mocker): + redirect_url = TEST_URL + '/' + httpx_mock.add_response( + url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} + ) + httpx_mock.add_response( + url=TEST_URL, + status_code=401, + headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, + match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + ) + httpx_mock.add_response( + url=TEST_URL, + status_code=301, + match_headers={"Authorization": "NTLM Dw4NDAsKCQg="}, + headers={"Location": redirect_url}, + ) + httpx_mock.add_response( + url=redirect_url, status_code=401, headers={"WWW-Authenticate": "NTLM"} + ) + httpx_mock.add_response( + url=redirect_url, + status_code=401, + headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, + match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + ) + httpx_mock.add_response( + url=redirect_url, + status_code=200 + ) + + if sys.platform == "win32": + patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" + else: + patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" + + with mocker.patch( + patch_object, + side_effect=[ + b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", + b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", + ], + ): + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=Negotiate("test_user", "test_pass"), + ) + assert resp.status_code == 200 + assert len(resp.history) == 4 + assert len(httpx_mock.get_requests()) == 6 + + def test_authentication_with_too_many_redirects_throws(self, httpx_mock: HTTPXMock, mocker): + redirect_url = TEST_URL + '/' + httpx_mock.add_response( + url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} + ) + httpx_mock.add_response( + url=TEST_URL, + status_code=401, + headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, + match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + ) + httpx_mock.add_response( + url=TEST_URL, + status_code=301, + match_headers={"Authorization": "NTLM Dw4NDAsKCQg="}, + headers={"Location": redirect_url}, + ) + httpx_mock.add_response( + url=redirect_url, status_code=401, headers={"WWW-Authenticate": "NTLM"} + ) + + if sys.platform == "win32": + patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" + else: + patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" + + with mocker.patch( + patch_object, + side_effect=[ + b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", + ], + ): + with httpx.Client() as client: + with pytest.raises(httpx.TooManyRedirects) as exception_info: + _ = client.get( + url=TEST_URL, + auth=Negotiate("test_user", "test_pass", max_redirects=0), + ) + assert "Redirected too many times" in str(exception_info) + assert "0" in str(exception_info) + @pytest.mark.parametrize("status_code", [200, 403, 404]) def test_http_response_reported_correctly_when_auth_completes( self, httpx_mock: HTTPXMock, mocker, status_code @@ -187,3 +296,43 @@ def test_http_response_reported_correctly_when_auth_completes( assert resp.status_code == status_code assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 + + def test_http_response_sets_cookie_if_required( + self, httpx_mock: HTTPXMock, mocker + ): + test_cookie = "foo=bar" + httpx_mock.add_response( + url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} + ) + httpx_mock.add_response( + url=TEST_URL, + status_code=401, + headers={"WWW-Authenticate": "NTLM AAECAwQFBgc=", "Set-Cookie": test_cookie}, + match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + ) + httpx_mock.add_response( + url=TEST_URL, + status_code=200, + match_headers={"Authorization": "NTLM Dw4NDAsKCQg=", "Cookie": test_cookie}, + ) + + if sys.platform == "win32": + patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" + else: + patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" + + with mocker.patch( + patch_object, + side_effect=[ + b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", + ], + ): + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=Negotiate("test_user", "test_pass"), + ) + assert resp.status_code == 200 + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 From 7360db98e74a96dffe48453019a478f235820843 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 21:03:58 +0100 Subject: [PATCH 15/21] Extract methods and constants from test_ntlm.py --- tests/test_ntlm.py | 142 ++++++++++++++++++++------------------------- 1 file changed, 63 insertions(+), 79 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 2bd05d1..bff2b43 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -12,8 +12,12 @@ TEST_PASS = "test_pass" TEST_URL = "https://www.example.com/test" +FLOW_CHALLENGE_RESPONSE = "NTLM CAkKCwwNDg8=" +FLOW_AUTHENTICATE_REQUEST = "NTLM AAECAwQFBgc=" +FLOW_AUTHORIZATION_RESPONSE = "NTLM Dw4NDAsKCQg=" -@pytest.fixture() + +@pytest.fixture def negotiate_auth_fixture(): yield Negotiate(TEST_USER, TEST_PASS) @@ -106,33 +110,48 @@ def test_ntlm_with_no_credentials_throws(self): assert "provide a username and password" in str(exception_info) +def mock_auth_responses(request_count: int): + return [ + b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", + ] * request_count + + +def get_patch_method(): + if sys.platform == "win32": + patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" + else: + patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" + return patch_object + + class TestNegotiateFunctional: - def test_http_200_response_makes_one_request(self, httpx_mock: HTTPXMock): + def test_http_200_response_makes_one_request(self, negotiate_auth_fixture, httpx_mock: HTTPXMock): httpx_mock.add_response(url=TEST_URL, status_code=200) with httpx.Client() as client: resp = client.get( url=TEST_URL, - auth=Negotiate("test_user", "test_pass"), + auth=negotiate_auth_fixture, ) assert resp.status_code == 200 assert len(httpx_mock.get_requests()) == 1 - def test_http_authenticate_with_digest_returns_401(self, httpx_mock: HTTPXMock): + def test_http_authenticate_with_digest_returns_401(self, negotiate_auth_fixture, httpx_mock: HTTPXMock): httpx_mock.add_response( url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "Digest"} ) with httpx.Client() as client: resp = client.get( url=TEST_URL, - auth=Negotiate("test_user", "test_pass"), + auth=negotiate_auth_fixture, ) assert resp.status_code == 401 assert len(resp.history) == 0 assert len(httpx_mock.get_requests()) == 1 def test_http_401s_make_three_requests_and_return_401( - self, httpx_mock: HTTPXMock, mocker + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker ): httpx_mock.add_response( url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} @@ -140,28 +159,24 @@ def test_http_401s_make_three_requests_and_return_401( httpx_mock.add_response( url=TEST_URL, status_code=401, - headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, - match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, ) - if sys.platform == "win32": - patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" - else: - patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" with mocker.patch( - patch_object, - return_value=b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + get_patch_method(), + side_effect=mock_auth_responses(1), ): with httpx.Client() as client: resp = client.get( url=TEST_URL, - auth=Negotiate("test_user", "test_pass"), + auth=negotiate_auth_fixture, ) assert resp.status_code == 401 assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 - def test_authentication_with_redirect_is_followed(self, httpx_mock: HTTPXMock, mocker): + def test_authentication_with_redirect_is_followed(self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker): redirect_url = TEST_URL + '/' httpx_mock.add_response( url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} @@ -169,13 +184,13 @@ def test_authentication_with_redirect_is_followed(self, httpx_mock: HTTPXMock, m httpx_mock.add_response( url=TEST_URL, status_code=401, - headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, - match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, ) httpx_mock.add_response( url=TEST_URL, status_code=301, - match_headers={"Authorization": "NTLM Dw4NDAsKCQg="}, + match_headers={"Authorization": FLOW_AUTHORIZATION_RESPONSE}, headers={"Location": redirect_url}, ) httpx_mock.add_response( @@ -184,38 +199,28 @@ def test_authentication_with_redirect_is_followed(self, httpx_mock: HTTPXMock, m httpx_mock.add_response( url=redirect_url, status_code=401, - headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, - match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, ) httpx_mock.add_response( url=redirect_url, status_code=200 ) - if sys.platform == "win32": - patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" - else: - patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" - with mocker.patch( - patch_object, - side_effect=[ - b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", - b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", - b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", - b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", - ], + get_patch_method(), + side_effect=mock_auth_responses(2), ): with httpx.Client() as client: resp = client.get( url=TEST_URL, - auth=Negotiate("test_user", "test_pass"), + auth=negotiate_auth_fixture, ) assert resp.status_code == 200 assert len(resp.history) == 4 assert len(httpx_mock.get_requests()) == 6 - def test_authentication_with_too_many_redirects_throws(self, httpx_mock: HTTPXMock, mocker): + def test_authentication_with_too_many_redirects_throws(self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker): redirect_url = TEST_URL + '/' httpx_mock.add_response( url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} @@ -223,43 +228,37 @@ def test_authentication_with_too_many_redirects_throws(self, httpx_mock: HTTPXMo httpx_mock.add_response( url=TEST_URL, status_code=401, - headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, - match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, ) httpx_mock.add_response( url=TEST_URL, status_code=301, - match_headers={"Authorization": "NTLM Dw4NDAsKCQg="}, + match_headers={"Authorization": FLOW_AUTHORIZATION_RESPONSE}, headers={"Location": redirect_url}, ) httpx_mock.add_response( url=redirect_url, status_code=401, headers={"WWW-Authenticate": "NTLM"} ) - if sys.platform == "win32": - patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" - else: - patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" - with mocker.patch( - patch_object, - side_effect=[ - b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", - b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", - ], + get_patch_method(), + side_effect=mock_auth_responses(1), ): with httpx.Client() as client: + auth = negotiate_auth_fixture + auth.max_redirects = 0 with pytest.raises(httpx.TooManyRedirects) as exception_info: _ = client.get( url=TEST_URL, - auth=Negotiate("test_user", "test_pass", max_redirects=0), + auth=auth, ) assert "Redirected too many times" in str(exception_info) assert "0" in str(exception_info) @pytest.mark.parametrize("status_code", [200, 403, 404]) def test_http_response_reported_correctly_when_auth_completes( - self, httpx_mock: HTTPXMock, mocker, status_code + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker, status_code ): httpx_mock.add_response( url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} @@ -267,38 +266,30 @@ def test_http_response_reported_correctly_when_auth_completes( httpx_mock.add_response( url=TEST_URL, status_code=401, - headers={"WWW-Authenticate": "NTLM AAECAwQFBgc="}, - match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, ) httpx_mock.add_response( url=TEST_URL, status_code=status_code, - match_headers={"Authorization": "NTLM Dw4NDAsKCQg="}, + match_headers={"Authorization": FLOW_AUTHORIZATION_RESPONSE}, ) - if sys.platform == "win32": - patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" - else: - patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" - with mocker.patch( - patch_object, - side_effect=[ - b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", - b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", - ], + get_patch_method(), + side_effect=mock_auth_responses(1), ): with httpx.Client() as client: resp = client.get( url=TEST_URL, - auth=Negotiate("test_user", "test_pass"), + auth=negotiate_auth_fixture, ) assert resp.status_code == status_code assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 def test_http_response_sets_cookie_if_required( - self, httpx_mock: HTTPXMock, mocker + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker ): test_cookie = "foo=bar" httpx_mock.add_response( @@ -307,32 +298,25 @@ def test_http_response_sets_cookie_if_required( httpx_mock.add_response( url=TEST_URL, status_code=401, - headers={"WWW-Authenticate": "NTLM AAECAwQFBgc=", "Set-Cookie": test_cookie}, - match_headers={"Authorization": "NTLM CAkKCwwNDg8="}, + headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST, "Set-Cookie": test_cookie}, + match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, ) httpx_mock.add_response( url=TEST_URL, status_code=200, - match_headers={"Authorization": "NTLM Dw4NDAsKCQg=", "Cookie": test_cookie}, + match_headers={"Authorization": FLOW_AUTHORIZATION_RESPONSE, "Cookie": test_cookie}, ) - if sys.platform == "win32": - patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" - else: - patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" - with mocker.patch( - patch_object, - side_effect=[ - b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", - b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", - ], + get_patch_method(), + side_effect=mock_auth_responses(1), ): with httpx.Client() as client: resp = client.get( url=TEST_URL, - auth=Negotiate("test_user", "test_pass"), + auth=negotiate_auth_fixture, ) assert resp.status_code == 200 assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 + From dd89aa683809e3a287449dceb4bb2e1e13d7fc58 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Wed, 25 Aug 2021 21:41:56 +0100 Subject: [PATCH 16/21] Add MockDefinition object to test_ntlm.py, remove some more duplication --- tests/test_ntlm.py | 235 ++++++++++++++++++++++++++------------------- 1 file changed, 138 insertions(+), 97 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index bff2b43..411b0f9 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -1,7 +1,8 @@ import sys -from typing import Optional +from typing import Optional, List from pytest_httpx import HTTPXMock from httpx_auth import Negotiate +from collections import namedtuple import httpx import pytest @@ -16,6 +17,11 @@ FLOW_AUTHENTICATE_REQUEST = "NTLM AAECAwQFBgc=" FLOW_AUTHORIZATION_RESPONSE = "NTLM Dw4NDAsKCQg=" +MockDefinition = namedtuple( + "mock_definition", ("url", "status_code", "match_headers", "headers") +) +MockDefinition.__new__.__defaults__ = ("", 0, {}, {}) + @pytest.fixture def negotiate_auth_fixture(): @@ -125,10 +131,21 @@ def get_patch_method(): return patch_object -class TestNegotiateFunctional: - def test_http_200_response_makes_one_request(self, negotiate_auth_fixture, httpx_mock: HTTPXMock): - httpx_mock.add_response(url=TEST_URL, status_code=200) +def make_mockery(httpx_mock: HTTPXMock, mockery_definition: List[namedtuple]) -> None: + for mock_definition in mockery_definition: + httpx_mock.add_response( + url=mock_definition.url, + status_code=mock_definition.status_code, + match_headers=mock_definition.match_headers, + headers=mock_definition.headers, + ) + +class TestNegotiateFunctional: + def test_http_200_response_makes_one_request( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock + ): + make_mockery(httpx_mock, [MockDefinition(TEST_URL, 200, {}, {})]) with httpx.Client() as client: resp = client.get( url=TEST_URL, @@ -137,9 +154,12 @@ def test_http_200_response_makes_one_request(self, negotiate_auth_fixture, httpx assert resp.status_code == 200 assert len(httpx_mock.get_requests()) == 1 - def test_http_authenticate_with_digest_returns_401(self, negotiate_auth_fixture, httpx_mock: HTTPXMock): - httpx_mock.add_response( - url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "Digest"} + def test_http_authenticate_with_digest_returns_401( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock + ): + make_mockery( + httpx_mock, + [MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "Digest"})], ) with httpx.Client() as client: resp = client.get( @@ -153,14 +173,17 @@ def test_http_authenticate_with_digest_returns_401(self, negotiate_auth_fixture, def test_http_401s_make_three_requests_and_return_401( self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker ): - httpx_mock.add_response( - url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} - ) - httpx_mock.add_response( - url=TEST_URL, - status_code=401, - headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, - match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + ], ) with mocker.patch( @@ -176,40 +199,44 @@ def test_http_401s_make_three_requests_and_return_401( assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 - def test_authentication_with_redirect_is_followed(self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker): - redirect_url = TEST_URL + '/' - httpx_mock.add_response( - url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} - ) - httpx_mock.add_response( - url=TEST_URL, - status_code=401, - headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, - match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, - ) - httpx_mock.add_response( - url=TEST_URL, - status_code=301, - match_headers={"Authorization": FLOW_AUTHORIZATION_RESPONSE}, - headers={"Location": redirect_url}, - ) - httpx_mock.add_response( - url=redirect_url, status_code=401, headers={"WWW-Authenticate": "NTLM"} - ) - httpx_mock.add_response( - url=redirect_url, - status_code=401, - headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, - match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, - ) - httpx_mock.add_response( - url=redirect_url, - status_code=200 + def test_authentication_with_redirect_is_followed( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker + ): + redirect_url = TEST_URL + "/" + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + MockDefinition( + TEST_URL, + 301, + {"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + {"Location": redirect_url}, + ), + MockDefinition(redirect_url, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + redirect_url, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + MockDefinition( + redirect_url, + 200, + {"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + {}, + ), + ], ) - with mocker.patch( - get_patch_method(), - side_effect=mock_auth_responses(2), + get_patch_method(), + side_effect=mock_auth_responses(2), ): with httpx.Client() as client: resp = client.get( @@ -220,30 +247,32 @@ def test_authentication_with_redirect_is_followed(self, negotiate_auth_fixture, assert len(resp.history) == 4 assert len(httpx_mock.get_requests()) == 6 - def test_authentication_with_too_many_redirects_throws(self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker): - redirect_url = TEST_URL + '/' - httpx_mock.add_response( - url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} - ) - httpx_mock.add_response( - url=TEST_URL, - status_code=401, - headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, - match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, - ) - httpx_mock.add_response( - url=TEST_URL, - status_code=301, - match_headers={"Authorization": FLOW_AUTHORIZATION_RESPONSE}, - headers={"Location": redirect_url}, - ) - httpx_mock.add_response( - url=redirect_url, status_code=401, headers={"WWW-Authenticate": "NTLM"} + def test_authentication_with_too_many_redirects_throws( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker + ): + redirect_url = TEST_URL + "/" + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + MockDefinition( + TEST_URL, + 301, + {"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + {"Location": redirect_url}, + ), + MockDefinition(redirect_url, 401, {}, {"WWW-Authenticate": "NTLM"}), + ], ) - with mocker.patch( - get_patch_method(), - side_effect=mock_auth_responses(1), + get_patch_method(), + side_effect=mock_auth_responses(1), ): with httpx.Client() as client: auth = negotiate_auth_fixture @@ -260,19 +289,23 @@ def test_authentication_with_too_many_redirects_throws(self, negotiate_auth_fixt def test_http_response_reported_correctly_when_auth_completes( self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker, status_code ): - httpx_mock.add_response( - url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} - ) - httpx_mock.add_response( - url=TEST_URL, - status_code=401, - headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, - match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, - ) - httpx_mock.add_response( - url=TEST_URL, - status_code=status_code, - match_headers={"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + MockDefinition( + TEST_URL, + status_code, + {"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + {}, + ), + ], ) with mocker.patch( @@ -292,21 +325,30 @@ def test_http_response_sets_cookie_if_required( self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker ): test_cookie = "foo=bar" - httpx_mock.add_response( - url=TEST_URL, status_code=401, headers={"WWW-Authenticate": "NTLM"} + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + { + "WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST, + "Set-Cookie": test_cookie, + }, + ), + MockDefinition( + TEST_URL, + 200, + { + "Authorization": FLOW_AUTHORIZATION_RESPONSE, + "Cookie": test_cookie, + }, + {}, + ), + ], ) - httpx_mock.add_response( - url=TEST_URL, - status_code=401, - headers={"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST, "Set-Cookie": test_cookie}, - match_headers={"Authorization": FLOW_CHALLENGE_RESPONSE}, - ) - httpx_mock.add_response( - url=TEST_URL, - status_code=200, - match_headers={"Authorization": FLOW_AUTHORIZATION_RESPONSE, "Cookie": test_cookie}, - ) - with mocker.patch( get_patch_method(), side_effect=mock_auth_responses(1), @@ -319,4 +361,3 @@ def test_http_response_sets_cookie_if_required( assert resp.status_code == 200 assert len(resp.history) == 2 assert len(httpx_mock.get_requests()) == 3 - From 569cd803af3631095c6a3016794173d5b2b693cb Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Thu, 26 Aug 2021 16:48:12 +0100 Subject: [PATCH 17/21] Add test for missing extra --- tests/test_ntlm.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 411b0f9..4987f45 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -115,6 +115,13 @@ def test_ntlm_with_no_credentials_throws(self): _ = Negotiate(force_ntlm=True) assert "provide a username and password" in str(exception_info) + def test_no_spnego_package_is_handled(self): + sys.modules['spnego'] = None + from negotiate import Negotiate + with pytest.raises(ImportError) as exception_info: + _ = Negotiate() + assert "Windows authentication support not enabled" in str(exception_info) + def mock_auth_responses(request_count: int): return [ From b2183c0bc3bf3ee444cdb9a885750abeb0867ba3 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Thu, 26 Aug 2021 17:07:17 +0100 Subject: [PATCH 18/21] Remove test that does not work... --- tests/test_ntlm.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 4987f45..6dca116 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -3,11 +3,12 @@ from pytest_httpx import HTTPXMock from httpx_auth import Negotiate from collections import namedtuple +import importlib import httpx import pytest -spnego = pytest.importorskip("spnego") +_ = pytest.importorskip("spnego") TEST_USER = "test_user" TEST_PASS = "test_pass" @@ -89,20 +90,22 @@ def test_auth_type_from_header_returns_none_when_not_ntlm( assert actual_output is None def test_new_context_proxy(self, negotiate_auth_fixture): + import spnego as spnego_ proxy = negotiate_auth_fixture._new_context_proxy() assert proxy.username == TEST_USER assert proxy.password == TEST_PASS assert proxy.protocol.lower() == "negotiate" - assert spnego.NegotiateOptions.use_ntlm not in proxy.options + assert spnego_.NegotiateOptions.use_ntlm not in proxy.options assert proxy.spn.lower() == "host/unspecified" def test_new_context_proxy_with_ntlm(self, negotiate_auth_fixture): + import spnego as spnego_ negotiate_auth_fixture.force_ntlm = True proxy = negotiate_auth_fixture._new_context_proxy() assert proxy.username == TEST_USER assert proxy.password == TEST_PASS assert proxy.protocol.lower() == "ntlm" - assert spnego.NegotiateOptions.use_ntlm in proxy.options + assert spnego_.NegotiateOptions.use_ntlm in proxy.options assert proxy.spn.lower() == "host/unspecified" def test_password_with_no_username_throws(self): @@ -115,13 +118,6 @@ def test_ntlm_with_no_credentials_throws(self): _ = Negotiate(force_ntlm=True) assert "provide a username and password" in str(exception_info) - def test_no_spnego_package_is_handled(self): - sys.modules['spnego'] = None - from negotiate import Negotiate - with pytest.raises(ImportError) as exception_info: - _ = Negotiate() - assert "Windows authentication support not enabled" in str(exception_info) - def mock_auth_responses(request_count: int): return [ From 444c1ac074ff920bea34cbd4e187d3f5d4d82600 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Thu, 26 Aug 2021 17:07:17 +0100 Subject: [PATCH 19/21] Remove test that does not work... --- tests/test_ntlm.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 4987f45..976a6dd 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -7,7 +7,7 @@ import httpx import pytest -spnego = pytest.importorskip("spnego") +_ = pytest.importorskip("spnego") TEST_USER = "test_user" TEST_PASS = "test_pass" @@ -89,20 +89,22 @@ def test_auth_type_from_header_returns_none_when_not_ntlm( assert actual_output is None def test_new_context_proxy(self, negotiate_auth_fixture): + import spnego as spnego_ proxy = negotiate_auth_fixture._new_context_proxy() assert proxy.username == TEST_USER assert proxy.password == TEST_PASS assert proxy.protocol.lower() == "negotiate" - assert spnego.NegotiateOptions.use_ntlm not in proxy.options + assert spnego_.NegotiateOptions.use_ntlm not in proxy.options assert proxy.spn.lower() == "host/unspecified" def test_new_context_proxy_with_ntlm(self, negotiate_auth_fixture): + import spnego as spnego_ negotiate_auth_fixture.force_ntlm = True proxy = negotiate_auth_fixture._new_context_proxy() assert proxy.username == TEST_USER assert proxy.password == TEST_PASS assert proxy.protocol.lower() == "ntlm" - assert spnego.NegotiateOptions.use_ntlm in proxy.options + assert spnego_.NegotiateOptions.use_ntlm in proxy.options assert proxy.spn.lower() == "host/unspecified" def test_password_with_no_username_throws(self): @@ -115,13 +117,6 @@ def test_ntlm_with_no_credentials_throws(self): _ = Negotiate(force_ntlm=True) assert "provide a username and password" in str(exception_info) - def test_no_spnego_package_is_handled(self): - sys.modules['spnego'] = None - from negotiate import Negotiate - with pytest.raises(ImportError) as exception_info: - _ = Negotiate() - assert "Windows authentication support not enabled" in str(exception_info) - def mock_auth_responses(request_count: int): return [ From 3495ae5984c5517dea0290e7a115a234b5592f87 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Thu, 26 Aug 2021 17:08:34 +0100 Subject: [PATCH 20/21] Remove test that does not work... --- tests/test_ntlm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 6dca116..976a6dd 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -3,7 +3,6 @@ from pytest_httpx import HTTPXMock from httpx_auth import Negotiate from collections import namedtuple -import importlib import httpx import pytest From 555d91b3a8bf2ec849d7a8e7b02faa2687539482 Mon Sep 17 00:00:00 2001 From: Doug Addy Date: Thu, 26 Aug 2021 17:20:08 +0100 Subject: [PATCH 21/21] Squash warnings about pytest_mock as a context manager --- tests/test_ntlm.py | 104 ++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 976a6dd..135f54d 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -188,18 +188,18 @@ def test_http_401s_make_three_requests_and_return_401( ], ) - with mocker.patch( + mocker.patch( get_patch_method(), side_effect=mock_auth_responses(1), - ): - with httpx.Client() as client: - resp = client.get( - url=TEST_URL, - auth=negotiate_auth_fixture, - ) - assert resp.status_code == 401 - assert len(resp.history) == 2 - assert len(httpx_mock.get_requests()) == 3 + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == 401 + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 def test_authentication_with_redirect_is_followed( self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker @@ -236,18 +236,18 @@ def test_authentication_with_redirect_is_followed( ), ], ) - with mocker.patch( + mocker.patch( get_patch_method(), side_effect=mock_auth_responses(2), - ): - with httpx.Client() as client: - resp = client.get( - url=TEST_URL, - auth=negotiate_auth_fixture, - ) - assert resp.status_code == 200 - assert len(resp.history) == 4 - assert len(httpx_mock.get_requests()) == 6 + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == 200 + assert len(resp.history) == 4 + assert len(httpx_mock.get_requests()) == 6 def test_authentication_with_too_many_redirects_throws( self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker @@ -272,20 +272,20 @@ def test_authentication_with_too_many_redirects_throws( MockDefinition(redirect_url, 401, {}, {"WWW-Authenticate": "NTLM"}), ], ) - with mocker.patch( + mocker.patch( get_patch_method(), side_effect=mock_auth_responses(1), - ): - with httpx.Client() as client: - auth = negotiate_auth_fixture - auth.max_redirects = 0 - with pytest.raises(httpx.TooManyRedirects) as exception_info: - _ = client.get( - url=TEST_URL, - auth=auth, - ) - assert "Redirected too many times" in str(exception_info) - assert "0" in str(exception_info) + ) + with httpx.Client() as client: + auth = negotiate_auth_fixture + auth.max_redirects = 0 + with pytest.raises(httpx.TooManyRedirects) as exception_info: + _ = client.get( + url=TEST_URL, + auth=auth, + ) + assert "Redirected too many times" in str(exception_info) + assert "0" in str(exception_info) @pytest.mark.parametrize("status_code", [200, 403, 404]) def test_http_response_reported_correctly_when_auth_completes( @@ -310,18 +310,18 @@ def test_http_response_reported_correctly_when_auth_completes( ], ) - with mocker.patch( + mocker.patch( get_patch_method(), side_effect=mock_auth_responses(1), - ): - with httpx.Client() as client: - resp = client.get( - url=TEST_URL, - auth=negotiate_auth_fixture, - ) - assert resp.status_code == status_code - assert len(resp.history) == 2 - assert len(httpx_mock.get_requests()) == 3 + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == status_code + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 def test_http_response_sets_cookie_if_required( self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker @@ -351,15 +351,15 @@ def test_http_response_sets_cookie_if_required( ), ], ) - with mocker.patch( + mocker.patch( get_patch_method(), side_effect=mock_auth_responses(1), - ): - with httpx.Client() as client: - resp = client.get( - url=TEST_URL, - auth=negotiate_auth_fixture, - ) - assert resp.status_code == 200 - assert len(resp.history) == 2 - assert len(httpx_mock.get_requests()) == 3 + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == 200 + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3