Skip to content

Commit

Permalink
feat: Implement asynchronous AuthorizedSession api request class (#…
Browse files Browse the repository at this point in the history
…1579)

* feat: implement request class for asynchoronous AuthorizedSession API

* add type checking and address TODOs

* remove default values from interface methods

* aiohttp reponse close method must not be awaited

* cleanup

* update Request class docstring
  • Loading branch information
ohmayr authored Aug 13, 2024
1 parent 8c47981 commit 5f46b60
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 11 deletions.
54 changes: 52 additions & 2 deletions google/auth/aio/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Transport - Async HTTP client library support.
"""Transport - Asynchronous HTTP client library support.
:mod:`google.auth.aio` is designed to work with various asynchronous client libraries such
as aiohttp. In order to work across these libraries with different
Expand All @@ -25,7 +25,7 @@
"""

import abc
from typing import AsyncGenerator, Dict
from typing import AsyncGenerator, Dict, Mapping, Optional


class Response(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -79,3 +79,53 @@ async def read(self) -> bytes:
async def close(self):
"""Close the response after it is fully consumed to resource."""
raise NotImplementedError("close must be implemented.")


class Request(metaclass=abc.ABCMeta):
"""Interface for a callable that makes HTTP requests.
Specific transport implementations should provide an implementation of
this that adapts their specific request / response API.
.. automethod:: __call__
"""

@abc.abstractmethod
async def __call__(
self,
url: str,
method: str,
body: bytes,
headers: Optional[Mapping[str, str]],
timeout: float,
**kwargs
) -> Response:
"""Make an HTTP request.
Args:
url (str): The URI to be requested.
method (str): The HTTP method to use for the request. Defaults
to 'GET'.
body (bytes): The payload / body in HTTP request.
headers (Mapping[str, str]): Request headers.
timeout (float): The number of seconds to wait for a
response from the server. If not specified or if None, the
transport-specific default timeout will be used.
kwargs: Additional arguments passed on to the transport's
request method.
Returns:
google.auth.aio.transport.Response: The HTTP response.
Raises:
google.auth.exceptions.TransportError: If any exception occurred.
"""
# pylint: disable=redundant-returns-doc, missing-raises-doc
# (pylint doesn't play well with abstract docstrings.)
raise NotImplementedError("__call__ must be implemented.")

async def close(self) -> None:
"""
Close the underlying session.
"""
raise NotImplementedError("close must be implemented.")
111 changes: 105 additions & 6 deletions google/auth/aio/transport/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
"""Transport adapter for AIOHTTP Requests.
"""

import asyncio
from contextlib import asynccontextmanager
import time
from typing import AsyncGenerator, Dict, Mapping, Optional

try:
import aiohttp
except ImportError as caught_exc: # pragma: NO COVER
raise ImportError(
"The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport."
) from caught_exc
from typing import AsyncGenerator, Dict

from google.auth import _helpers
from google.auth import exceptions
from google.auth.aio import transport
from google.auth.exceptions import TimeoutError

import asyncio
import time
from contextlib import asynccontextmanager

_DEFAULT_TIMEOUT_SECONDS = 180


@asynccontextmanager
Expand Down Expand Up @@ -77,7 +81,6 @@ async def with_timeout(coro):


class Response(transport.Response):

"""
Represents an HTTP response and its data. It is returned by ``google.auth.aio.transport.sessions.AuthorizedSession``.
Expand Down Expand Up @@ -113,4 +116,100 @@ async def read(self) -> bytes:

@_helpers.copy_docstring(transport.Response)
async def close(self):
return await self._response.close()
self._response.close()


class Request(transport.Request):
"""Asynchronous Requests request adapter.
This class is used internally for making requests using aiohttp
in a consistent way. If you use :class:`AuthorizedSession` you do not need
to construct or use this class directly.
This class can be useful if you want to configure a Request callable
with a custom ``aiohttp.ClientSession`` in :class:`AuthorizedSession` or if
you want to manually refresh a :class:`~google.auth.aio.credentials.Credentials` instance::
import aiohttp
import google.auth.aio.transport.aiohttp
# Default example:
request = google.auth.aio.transport.aiohttp.Request()
await credentials.refresh(request)
# Custom aiohttp Session Example:
session = session=aiohttp.ClientSession(auto_decompress=False)
request = google.auth.aio.transport.aiohttp.Request(session=session)
auth_sesion = google.auth.aio.transport.sessions.AuthorizedSession(auth_request=request)
Args:
session (aiohttp.ClientSession): An instance :class:`aiohttp.ClientSession` used
to make HTTP requests. If not specified, a session will be created.
.. automethod:: __call__
"""

def __init__(self, session: aiohttp.ClientSession = None):
self.session = session or aiohttp.ClientSession()

async def __call__(
self,
url: str,
method: str = "GET",
body: Optional[bytes] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: float = _DEFAULT_TIMEOUT_SECONDS,
**kwargs,
) -> transport.Response:
"""
Make an HTTP request using aiohttp.
Args:
url (str): The URL to be requested.
method (Optional[str]):
The HTTP method to use for the request. Defaults to 'GET'.
body (Optional[bytes]):
The payload or body in HTTP request.
headers (Optional[Mapping[str, str]]):
Request headers.
timeout (float): The number of seconds to wait for a
response from the server. If not specified or if None, the
requests default timeout will be used.
kwargs: Additional arguments passed through to the underlying
aiohttp :meth:`aiohttp.Session.request` method.
Returns:
google.auth.aio.transport.Response: The HTTP response.
Raises:
- google.auth.exceptions.TransportError: If the request fails.
- google.auth.exceptions.TimeoutError: If the request times out.
"""

try:
client_timeout = aiohttp.ClientTimeout(total=timeout)
response = await self.session.request(
method,
url,
data=body,
headers=headers,
timeout=client_timeout,
**kwargs,
)
return Response(response)

except aiohttp.ClientError as caught_exc:
new_exc = exceptions.TransportError(f"Failed to send request to {url}.")
raise new_exc from caught_exc

except asyncio.TimeoutError as caught_exc:
new_exc = exceptions.TimeoutError(
f"Request timed out after {timeout} seconds."
)
raise new_exc from caught_exc

async def close(self) -> None:
"""
Close the underlying aiohttp session to release the acquired resources.
"""
await self.session.close()
53 changes: 50 additions & 3 deletions tests/transport/aio/test_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from unittest.mock import AsyncMock, Mock, patch

from aioresponses import aioresponses
import pytest # type: ignore
import pytest_asyncio

import asyncio

from google.auth import exceptions
import google.auth.aio.transport.aiohttp as auth_aiohttp

from google.auth.exceptions import TimeoutError


try:
import aiohttp
except ImportError as caught_exc: # pragma: NO COVER
raise ImportError(
"The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport."
) from caught_exc


@pytest.fixture
async def simple_async_task():
return True
Expand Down Expand Up @@ -139,3 +148,41 @@ async def test_timeout_with_async_task_timing_out_before_context(
assert exc.match(
f"The operation {simple_async_task} exceeded the configured timeout of {self.default_timeout}s."
)


@pytest.mark.asyncio
class TestRequest:
@pytest_asyncio.fixture
async def aiohttp_request(self):
request = auth_aiohttp.Request()
yield request
await request.close()

async def test_request_call_success(self, aiohttp_request):
with aioresponses() as m:
mocked_chunks = [b"Cavefish ", b"have ", b"no ", b"sight."]
mocked_response = b"".join(mocked_chunks)
m.get("http://example.com", status=200, body=mocked_response)
response = await aiohttp_request("http://example.com")
assert response.status_code == 200
assert response.headers == {"Content-Type": "application/json"}
content = b"".join([chunk async for chunk in response.content()])
assert content == b"Cavefish have no sight."

async def test_request_call_raises_client_error(self, aiohttp_request):
with aioresponses() as m:
m.get("http://example.com", exception=aiohttp.ClientError)

with pytest.raises(exceptions.TransportError) as exc:
await aiohttp_request("http://example.com/api")

exc.match("Failed to send request to http://example.com/api.")

async def test_request_call_raises_timeout_error(self, aiohttp_request):
with aioresponses() as m:
m.get("http://example.com", exception=asyncio.TimeoutError)

with pytest.raises(exceptions.TimeoutError) as exc:
await aiohttp_request("http://example.com")

exc.match("Request timed out after 180 seconds.")

0 comments on commit 5f46b60

Please sign in to comment.