diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py index f978b64ef..da4424c2c 100644 --- a/google/auth/impersonated_credentials.py +++ b/google/auth/impersonated_credentials.py @@ -433,6 +433,7 @@ def refresh(self, request): headers=headers, data=json.dumps(body).encode("utf-8"), ) + response.raise_for_status() id_token = response.json()["token"] self.token = id_token diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py index 0e4dc08d7..8113438f8 100644 --- a/tests/test_impersonated_credentials.py +++ b/tests/test_impersonated_credentials.py @@ -21,6 +21,7 @@ import mock import pytest # type: ignore +import requests from six.moves import http_client from google.auth import _helpers @@ -75,6 +76,10 @@ def __init__(self, json_data, status_code): def json(self): return self.json_data + def raise_for_status(self): + if self.status_code != http_client.OK: + raise requests.HTTPError + @pytest.fixture def mock_authorizedsession_sign(): @@ -444,6 +449,39 @@ def test_id_token_success( assert id_creds.token == ID_TOKEN_DATA assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY) + def test_id_token_failure( + self, mock_donor_credentials, mock_authorizedsession_idtoken + ): + data = {"error": {"code": 403, "message": "unauthorized"}} + mock_authorizedsession_idtoken.return_value = MockResponse( + data, http_client.UNAUTHORIZED + ) + + credentials = self.make_credentials(lifetime=None) + token = "token" + target_audience = "https://foo.bar" + + expire_time = ( + _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) + ).isoformat("T") + "Z" + response_body = {"accessToken": token, "expireTime": expire_time} + + request = self.make_request( + data=json.dumps(response_body), status=http_client.OK + ) + + credentials.refresh(request) + + assert credentials.valid + assert not credentials.expired + + id_creds = impersonated_credentials.IDTokenCredentials( + credentials, target_audience=target_audience + ) + + with pytest.raises(requests.HTTPError): + id_creds.refresh(request) + def test_id_token_from_credential( self, mock_donor_credentials, mock_authorizedsession_idtoken ):