diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ce4918..6898df3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- `httpx_auth.authentication` contains a new `Negotiate` class that supports Kerberos and NTLM authentication without + support for channel binding tokens +- Added extra `windows_auth` to enable support for Negotiate and NTLM authentication + +### Changed +- Optionally requires [`pyspnego[kerberos]`](https://github.com/jborean93/pyspnego)==0.1.6 +- Requires [`pytest`](https://docs.pytest.org/en/latest)==6.2.\* for testing +- Requires [`pytest-mock`](https://github.com/pytest-dev/pytest-mock)==3.6.\* for testing ## [0.15.0] - 2022-06-01 ### Changed diff --git a/README.md b/README.md index 9d31ae4..36732d6 100644 --- a/README.md +++ b/README.md @@ -667,6 +667,51 @@ with httpx.Client() as client: | `username` | User name. | Mandatory | | | `password` | User password. | Mandatory | | +## Negotiate and NTLM + +Support for Negotiate, Kerberos and NTLM authentication is optional, install with the `windows_auth` extra to enable +this feature. + +You can use Negotiate, Kerberos and NTLM authentication with `httpx_auth.Negotiate`. + +### Using cached credentials for Kerberos authentication + +Cached credentials are used by default for Kerberos authentication, this relies on a Ticket-Granting Ticket being +present on your system from `kinit` or similar. This is supported by default on Windows, on Linux it relies on system +packages being installed. See documentation for the [pyspnego](https://pypi.org/project/pyspnego/) package for more +information. + +```python +import httpx +from httpx_auth import Negotiate + +with httpx.Client() as client: + client.get('https://www.example.com', auth=Negotiate()) +``` + +### Using provided credentials for NTLM authentication + +Where other credentials are required, or if Kerberos is not supported, provide a username and password for +authentication: + +```python +import httpx +from httpx_auth import Negotiate + +with httpx.Client() as client: + client.get('https://www.example.com', auth=Negotiate('domain\\username', 'password')) +``` + +### Parameters + +| Name | Description | Mandatory | Default value | +|:------------------------|:-----------------------------------------------------------|:----------|:--------------| +| `username` | User name. | Optional | | +| `password` | User password. | Optional | | +| `force_ntlm` | Force the use of NTLM auth | Optional | False | +| `service` | Name portion of the server SPN | Optional | "Service" | +| `max_redirects` | Maximum number of redirects to follow while authenticating | Optional | 10 | + ## Multiple authentication at once You can also use a combination of authentication using `+`or `&` as in the following sample: diff --git a/httpx_auth/__init__.py b/httpx_auth/__init__.py index bb1509d..3825ca2 100644 --- a/httpx_auth/__init__.py +++ b/httpx_auth/__init__.py @@ -16,6 +16,7 @@ OktaClientCredentials, OAuth2ResourceOwnerPasswordCredentials, ) +from httpx_auth.negotiate import Negotiate from httpx_auth.oauth2_tokens import JsonTokenFileCache from httpx_auth.aws import AWS4Auth from httpx_auth.errors import ( diff --git a/httpx_auth/authentication.py b/httpx_auth/authentication.py index 6540a5f..a01e9ea 100644 --- a/httpx_auth/authentication.py +++ b/httpx_auth/authentication.py @@ -2,8 +2,8 @@ import os import uuid from hashlib import sha256, sha512 -from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode from typing import Optional, Generator +from urllib.parse import parse_qs, urlsplit, urlunsplit, urlencode import httpx diff --git a/httpx_auth/negotiate.py b/httpx_auth/negotiate.py new file mode 100644 index 0000000..c662a61 --- /dev/null +++ b/httpx_auth/negotiate.py @@ -0,0 +1,192 @@ +import base64 +from typing import Generator, List, Optional + +import httpx +from httpx_auth.authentication import SupportMultiAuth + +try: + import spnego + + WINDOWS_AUTH = True +except ImportError: + spnego = None + WINDOWS_AUTH = False + + +class Negotiate(httpx.Auth, SupportMultiAuth): + """ + NOTE: This does not support Channel Bindings which can (and ought to be) supported by servers. This is due to a + limitation in the HTTPCore library at present. + """ + + _username: str + _password: str + force_ntlm: bool + auth_header: str + auth_complete: bool + auth_type: str + _service: str + _context_proxy: "spnego._context.ContextProxy" + max_redirects: int = 10 + + def __init__( + self, + username: str = None, + password: str = None, + force_ntlm: bool = False, + service: str = None, + max_redirects: int = 10, + ) -> None: + """ + :param username: Username and domain (if required). Optional for servers that support Kerberos, required for + those that require NTLM + :param password: Password if required by server for authentication. + :param force_ntlm: Force authentication to use NTLM if available. + :param service: Service portion of the target Service Principal Name (default HTTP) + :return: None + """ + if not WINDOWS_AUTH: + raise ImportError( + "Windows authentication support not enabled, install with the windows_auth extra." + ) + if password and not username: + raise ValueError( + "Negotiate authentication with credentials requires username and password, no username was provided." + ) + if force_ntlm and not (username and password): + raise ValueError( + "NTLM authentication requires credentials, provide a username and password." + ) + self._username = username + self._password = password + self.force_ntlm = force_ntlm + self.auth_header = "" + self.auth_complete = False + self.auth_type = "" + self._service = service + self.max_redirects = max_redirects + + def auth_flow( + self, request: httpx.Request + ) -> Generator[httpx.Request, httpx.Response, None]: + + responses = [] + response = yield request + responses.append(response) + + redirect_count = 0 + + # If anything comes back except an authenticate challenge then return it for the client to deal with, hopefully + # a successful response. + if responses[-1].status_code != 401: + return responses[-1] + + # Otherwise authenticate. Determine the authentication name to use, prefer Negotiate if available. + self.auth_type = self._auth_type_from_header( + responses[-1].headers.get("WWW-Authenticate") + ) + if self.auth_type is None: + return responses[-1] + + # Run authentication flow. + yield from self._do_auth_flow(request, responses) + + # If we were redirected we will need to rerun the auth flow on the new url, repeat until either we receive a + # status that is not 401 Unauthorized, or until the url we ended up at is the same as the one we requested. + while responses[-1].status_code == 401 and responses[-1].url != request.url: + redirect_count += 1 + if redirect_count > self.max_redirects: + raise httpx.TooManyRedirects( + message=f"Redirected too many times ({self.max_redirects}).", + request=request, + ) + request.url = responses[-1].url + yield from self._do_auth_flow(request, responses) + + return responses[-1] + + def _do_auth_flow( + self, request: httpx.Request, responses: List[httpx.Response] + ) -> Generator[httpx.Request, httpx.Response, None]: + # Phase 1: + # Configure context proxy, generate message header, attach to request and resend. + host = request.url.host + self.context_proxy = self._new_context_proxy() + self.context_proxy.spn = "{0}/{1}".format( + self._service.upper() if self._service else "HTTP", host + ) + request.headers["Authorization"] = self._make_authorization_header( + self.context_proxy.step(None) + ) + response = yield request + responses.append(response) + + # Phase 2: + # Server responds with Challenge message, parse the authenticate header and deal with cookies. Some web apps use + # cookies to store progress in the auth process. + if "set-cookie" in responses[-1].headers: + request.headers["Cookie"] = responses[-1].headers["set-cookie"] + + auth_header_bytes = self._parse_authenticate_header( + responses[-1].headers["WWW-Authenticate"] + ) + + # Phase 3: + # Generate Authenticate message, attach to the request and resend it. If the user is authorized then this will + # succeed. If not then this will fail. + self.auth_header = self._make_authorization_header( + self.context_proxy.step(auth_header_bytes) + ) + request.headers["Authorization"] = self.auth_header + response = yield request + responses.append(response) + + def _new_context_proxy(self) -> "spnego._context.ContextProxy": + client = spnego.client( + self._username, + self._password, + service=self._service, + protocol="ntlm" if self.force_ntlm else "negotiate", + ) + if self.force_ntlm: + client.options = spnego.NegotiateOptions.use_ntlm + client.protocol = "ntlm" + return client + + def _parse_authenticate_header(self, header: str) -> bytes: + """ + Extract NTLM/Negotiate value from Authenticate header and convert to bytes + :param header: str WWW-Authenticate header + :return: bytes Negotiate challenge + """ + + auth_strip = self.auth_type.lower() + " " + auth_header_value = next( + s + for s in (val.lstrip() for val in header.split(",")) + if s.lower().startswith(auth_strip) + ) + return base64.b64decode(auth_header_value[len(auth_strip) :]) + + def _make_authorization_header(self, response_bytes: bytes) -> str: + """ + Convert the auth bytes to base64 encoded string and build Authorization header. + :param response_bytes: bytes auth response content + :return: str Authorization/Proxy-Authorization header + """ + + auth_response = base64.b64encode(response_bytes).decode("ascii") + return f"{self.auth_type} {auth_response}" + + @staticmethod + def _auth_type_from_header(header: str) -> Optional[str]: + """ + Given a WWW-Authenticate header, returns the authentication type to use. + :param header: str Authenticate header + :return: Optional[str] Authentication type or None if not supported + """ + if "negotiate" in header.lower(): + return "Negotiate" + elif "ntlm" in header.lower(): + return "NTLM" + return None diff --git a/setup.py b/setup.py index a191abc..6e75baf 100644 --- a/setup.py +++ b/setup.py @@ -48,6 +48,12 @@ "pytest_httpx==0.21.*", # Used to check coverage "pytest-cov==3.*", + # Used to test NTLM support + "pytest==6.*", + "pytest-mock==3.6.*" + ], + 'windows_auth': [ + "pyspnego[kerberos]==0.1.6" ] }, python_requires=">=3.7", diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py new file mode 100644 index 0000000..135f54d --- /dev/null +++ b/tests/test_ntlm.py @@ -0,0 +1,365 @@ +import sys +from typing import Optional, List +from pytest_httpx import HTTPXMock +from httpx_auth import Negotiate +from collections import namedtuple + +import httpx +import pytest + +_ = pytest.importorskip("spnego") + +TEST_USER = "test_user" +TEST_PASS = "test_pass" +TEST_URL = "https://www.example.com/test" + +FLOW_CHALLENGE_RESPONSE = "NTLM CAkKCwwNDg8=" +FLOW_AUTHENTICATE_REQUEST = "NTLM AAECAwQFBgc=" +FLOW_AUTHORIZATION_RESPONSE = "NTLM Dw4NDAsKCQg=" + +MockDefinition = namedtuple( + "mock_definition", ("url", "status_code", "match_headers", "headers") +) +MockDefinition.__new__.__defaults__ = ("", 0, {}, {}) + + +@pytest.fixture +def negotiate_auth_fixture(): + yield Negotiate(TEST_USER, TEST_PASS) + + +class TestNegotiateUnit: + bytes_content = b"\x00\x01\x02\x03\x04\x05\x06\x07" + str_content = "AAECAwQFBgc=" + + def test_parse_auth_header_single_success(self, negotiate_auth_fixture): + negotiate_auth_fixture.auth_type = "NTLM" + + header_value = f"NTLM {self.str_content}" + actual_output = negotiate_auth_fixture._parse_authenticate_header(header_value) + assert actual_output == self.bytes_content + + def test_parse_auth_header_multi_success(self, negotiate_auth_fixture): + negotiate_auth_fixture.auth_type = "Negotiate" + + header_value = ( + f"Negotiate {self.str_content}, Basic dGVzdF91c2VyOnRlc3RfcGFzcw==" + ) + actual_output = negotiate_auth_fixture._parse_authenticate_header(header_value) + assert actual_output == self.bytes_content + + def test_parse_auth_header_single_fail(self, negotiate_auth_fixture): + negotiate_auth_fixture.auth_type = "NTLM" + + header_value = f"Negotiate {self.str_content}" + with pytest.raises(StopIteration): + _ = negotiate_auth_fixture._parse_authenticate_header(header_value) + + @pytest.mark.parametrize("auth_type", ["NTLM", "Negotiate"]) + def test_make_auth_header(self, negotiate_auth_fixture, auth_type: str): + expected_output = f"{auth_type} {self.str_content}" + negotiate_auth_fixture.auth_type = auth_type + actual_str = negotiate_auth_fixture._make_authorization_header( + self.bytes_content + ) + + assert actual_str == expected_output + + @pytest.mark.parametrize( + ["test_input", "expected_output"], + [ + ("NTLM Successful", "NTLM"), + ("NtLm Successful", "NTLM"), + ("Negotiate Successful", "Negotiate"), + ("NeGoTiATe Successful", "Negotiate"), + ("Negotiate", "Negotiate"), + ], + ) + def test_auth_type_from_header( + self, negotiate_auth_fixture, test_input: str, expected_output: Optional[str] + ): + actual_output = negotiate_auth_fixture._auth_type_from_header(test_input) + assert actual_output.lower() == expected_output.lower() + + def test_auth_type_from_header_returns_none_when_not_ntlm( + self, negotiate_auth_fixture + ): + header_content = "Basic Failure" + actual_output = negotiate_auth_fixture._auth_type_from_header(header_content) + assert actual_output is None + + def test_new_context_proxy(self, negotiate_auth_fixture): + import spnego as spnego_ + proxy = negotiate_auth_fixture._new_context_proxy() + assert proxy.username == TEST_USER + assert proxy.password == TEST_PASS + assert proxy.protocol.lower() == "negotiate" + assert spnego_.NegotiateOptions.use_ntlm not in proxy.options + assert proxy.spn.lower() == "host/unspecified" + + def test_new_context_proxy_with_ntlm(self, negotiate_auth_fixture): + import spnego as spnego_ + negotiate_auth_fixture.force_ntlm = True + proxy = negotiate_auth_fixture._new_context_proxy() + assert proxy.username == TEST_USER + assert proxy.password == TEST_PASS + assert proxy.protocol.lower() == "ntlm" + assert spnego_.NegotiateOptions.use_ntlm in proxy.options + assert proxy.spn.lower() == "host/unspecified" + + def test_password_with_no_username_throws(self): + with pytest.raises(ValueError) as exception_info: + _ = Negotiate(password=TEST_PASS) + assert "no username was provided" in str(exception_info) + + def test_ntlm_with_no_credentials_throws(self): + with pytest.raises(ValueError) as exception_info: + _ = Negotiate(force_ntlm=True) + assert "provide a username and password" in str(exception_info) + + +def mock_auth_responses(request_count: int): + return [ + b"\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F", + b"\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08", + ] * request_count + + +def get_patch_method(): + if sys.platform == "win32": + patch_object = "httpx_auth.negotiate.spnego.sspi.SSPIProxy.step" + else: + patch_object = "httpx_auth.negotiate.spnego.negotiate.NegotiateProxy.step" + return patch_object + + +def make_mockery(httpx_mock: HTTPXMock, mockery_definition: List[namedtuple]) -> None: + for mock_definition in mockery_definition: + httpx_mock.add_response( + url=mock_definition.url, + status_code=mock_definition.status_code, + match_headers=mock_definition.match_headers, + headers=mock_definition.headers, + ) + + +class TestNegotiateFunctional: + def test_http_200_response_makes_one_request( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock + ): + make_mockery(httpx_mock, [MockDefinition(TEST_URL, 200, {}, {})]) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == 200 + assert len(httpx_mock.get_requests()) == 1 + + def test_http_authenticate_with_digest_returns_401( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock + ): + make_mockery( + httpx_mock, + [MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "Digest"})], + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == 401 + assert len(resp.history) == 0 + assert len(httpx_mock.get_requests()) == 1 + + def test_http_401s_make_three_requests_and_return_401( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker + ): + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + ], + ) + + mocker.patch( + get_patch_method(), + side_effect=mock_auth_responses(1), + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == 401 + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 + + def test_authentication_with_redirect_is_followed( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker + ): + redirect_url = TEST_URL + "/" + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + MockDefinition( + TEST_URL, + 301, + {"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + {"Location": redirect_url}, + ), + MockDefinition(redirect_url, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + redirect_url, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + MockDefinition( + redirect_url, + 200, + {"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + {}, + ), + ], + ) + mocker.patch( + get_patch_method(), + side_effect=mock_auth_responses(2), + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == 200 + assert len(resp.history) == 4 + assert len(httpx_mock.get_requests()) == 6 + + def test_authentication_with_too_many_redirects_throws( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker + ): + redirect_url = TEST_URL + "/" + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + MockDefinition( + TEST_URL, + 301, + {"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + {"Location": redirect_url}, + ), + MockDefinition(redirect_url, 401, {}, {"WWW-Authenticate": "NTLM"}), + ], + ) + mocker.patch( + get_patch_method(), + side_effect=mock_auth_responses(1), + ) + with httpx.Client() as client: + auth = negotiate_auth_fixture + auth.max_redirects = 0 + with pytest.raises(httpx.TooManyRedirects) as exception_info: + _ = client.get( + url=TEST_URL, + auth=auth, + ) + assert "Redirected too many times" in str(exception_info) + assert "0" in str(exception_info) + + @pytest.mark.parametrize("status_code", [200, 403, 404]) + def test_http_response_reported_correctly_when_auth_completes( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker, status_code + ): + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + {"WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST}, + ), + MockDefinition( + TEST_URL, + status_code, + {"Authorization": FLOW_AUTHORIZATION_RESPONSE}, + {}, + ), + ], + ) + + mocker.patch( + get_patch_method(), + side_effect=mock_auth_responses(1), + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == status_code + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3 + + def test_http_response_sets_cookie_if_required( + self, negotiate_auth_fixture, httpx_mock: HTTPXMock, mocker + ): + test_cookie = "foo=bar" + make_mockery( + httpx_mock, + [ + MockDefinition(TEST_URL, 401, {}, {"WWW-Authenticate": "NTLM"}), + MockDefinition( + TEST_URL, + 401, + {"Authorization": FLOW_CHALLENGE_RESPONSE}, + { + "WWW-Authenticate": FLOW_AUTHENTICATE_REQUEST, + "Set-Cookie": test_cookie, + }, + ), + MockDefinition( + TEST_URL, + 200, + { + "Authorization": FLOW_AUTHORIZATION_RESPONSE, + "Cookie": test_cookie, + }, + {}, + ), + ], + ) + mocker.patch( + get_patch_method(), + side_effect=mock_auth_responses(1), + ) + with httpx.Client() as client: + resp = client.get( + url=TEST_URL, + auth=negotiate_auth_fixture, + ) + assert resp.status_code == 200 + assert len(resp.history) == 2 + assert len(httpx_mock.get_requests()) == 3