Skip to content

Commit

Permalink
estuary-cdk: refactor HTTPSesion to use request_stream
Browse files Browse the repository at this point in the history
request_stream() is an AsyncGenerator over arbitrary stream chunks.
Then, request() and a new request_lines() are implemented in terms of
request_stream().

This allows callers to efficiently process unbounded responses.
  • Loading branch information
jgraettinger committed Feb 29, 2024
1 parent d5f0c41 commit 97be18e
Showing 1 changed file with 87 additions and 43 deletions.
130 changes: 87 additions & 43 deletions estuary-cdk/estuary_cdk/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pydantic import BaseModel
from dataclasses import dataclass
from typing import AsyncGenerator
import abc
import aiohttp
import asyncio
Expand All @@ -15,6 +16,16 @@
# Connectors should use this type for typing constraints.
class HTTPSession(abc.ABC):
@abc.abstractmethod
def request_stream(
self,
url: str,
method: str = "GET",
params=None,
json=None,
data=None,
with_token=True,
) -> AsyncGenerator[bytes, None]: ...

async def request(
self,
url: str,
Expand All @@ -23,7 +34,43 @@ async def request(
json=None,
data=None,
with_token=True,
) -> bytes: ...
) -> bytes:
chunks: list[bytes] = []
async for chunk in self.request_stream(
url, method, params=params, json=json, data=data, with_token=with_token
):
chunks.append(chunk)

if len(chunks) == 0:
return b""
elif len(chunks) == 1:
return chunks[0]
else:
return b"".join(chunks)

async def request_lines(
self,
url: str,
method: str = "GET",
params=None,
json=None,
data=None,
with_token=True,
delim=b'\n',
) -> AsyncGenerator[bytes, None]:
buffer = b''
async for chunk in self.request_stream(
url, method, params=params, json=json, data=data, with_token=with_token
):
buffer += chunk
while delim in buffer:
line, buffer = buffer.split(delim, 1)
yield line

if buffer:
yield buffer

return


@dataclass
Expand Down Expand Up @@ -158,57 +205,54 @@ async def _mixin_exit(self, logger: logging.Logger):
await self.inner.close()
return self

async def _send_raw_request(
self, url: str, method: str, params, json, data, with_token
) -> tuple[aiohttp.ClientResponse, bytes]:

headers = {}
if with_token and self.token_source is not None:
token_type, token = await self.token_source.fetch_token(self)
headers["Authorization"] = f"{token_type} {token}"

async with self.inner.request(
headers=headers,
json=json,
data=data,
method=method,
params=params,
url=url,
) as resp:

return (resp, await resp.read())

async def request(
async def request_stream(
self,
url: str,
method: str = "GET",
params=None,
json=None,
data=None,
with_token=True,
) -> bytes:
) -> AsyncGenerator[bytes, None]:
while True:

cur_delay = self.rate_limiter.delay
await asyncio.sleep(cur_delay)

resp, body = await self._send_raw_request(
url, method, params, json, data, with_token
)
self.rate_limiter.update(cur_delay, resp.status == 429)

if resp.status == 429:
pass

elif resp.status >= 500 and resp.status < 600:
logger.warning(
"server internal error (will retry)",
{"body": body.decode("utf-8")},
)
elif resp.status >= 400 and resp.status < 500:
raise RuntimeError(
f"Encountered HTTP error status {resp.status} which cannot be retried.\nURL: {url}\nResponse:\n{body.decode('utf-8')}"
)
else:
resp.raise_for_status()
return body
headers = {}
if with_token and self.token_source is not None:
token_type, token = await self.token_source.fetch_token(self)
headers["Authorization"] = f"{token_type} {token}"

async with self.inner.request(
headers=headers,
json=json,
data=data,
method=method,
params=params,
url=url,
) as resp:

self.rate_limiter.update(cur_delay, resp.status == 429)

if resp.status == 429:
pass

elif resp.status >= 500 and resp.status < 600:
body = await resp.read()
logger.warning(
"server internal error (will retry)",
{"body": body.decode("utf-8")},
)
elif resp.status >= 400 and resp.status < 500:
body = await resp.read()
raise RuntimeError(
f"Encountered HTTP error status {resp.status} which cannot be retried.\nURL: {url}\nResponse:\n{body.decode('utf-8')}"
)
else:
resp.raise_for_status()

async for chunk in resp.content.iter_any():
yield chunk

return

0 comments on commit 97be18e

Please sign in to comment.