Skip to content

Commit

Permalink
[low-code] convert request.body to a dict when converting to AirbyteL…
Browse files Browse the repository at this point in the history
…ogMessage (#20557)

* convert request body

* fix tests

* test body data

* more tests

* more tests

* _

* return stacktrace

* pretty print

* Revert "pretty print"

This reverts commit 0912538.

* Revert "Revert "pretty print""

This reverts commit b6f62d6.

* replace \n

* missing type hint
  • Loading branch information
girarda authored Jan 3, 2023
1 parent 74be2f8 commit 2f2e530
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
from dataclasses import InitVar, dataclass, field
from json import JSONDecodeError
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import requests
Expand Down Expand Up @@ -367,7 +368,7 @@ def read_records(
stream_slice = stream_slice or {} # None-check
self.paginator.reset()
records_generator = self._read_pages(
self.parse_records_and_emit_request_and_responses,
self._parse_records_and_emit_request_and_responses,
stream_slice,
stream_state,
)
Expand Down Expand Up @@ -405,23 +406,42 @@ def state(self, value: StreamState):
"""State setter, accept state serialized by state getter."""
self.stream_slicer.update_cursor(value)

def parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]:
def _parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]:
# Only emit requests and responses when running in debug mode
if self.logger.isEnabledFor(logging.DEBUG):
yield self._create_trace_message_from_request(request)
yield self._create_trace_message_from_response(response)
yield _prepared_request_to_airbyte_message(request)
yield _response_to_airbyte_message(response)
# Not great to need to call _read_pages which is a private method
# A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester
yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state)

def _create_trace_message_from_request(self, request: requests.PreparedRequest):
# FIXME: this should return some sort of trace message
request_dict = {"url": request.url, "http_method": request.method, "headers": dict(request.headers), "body": request.body}
log_message = filter_secrets(f"request:{json.dumps(request_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))

def _create_trace_message_from_response(self, response: requests.Response):
# FIXME: this should return some sort of trace message
response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code}
log_message = filter_secrets(f"response:{json.dumps(response_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))

def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage:
# FIXME: this should return some sort of trace message
request_dict = {
"url": request.url,
"http_method": request.method,
"headers": dict(request.headers),
"body": _body_binary_string_to_dict(request.body),
}
log_message = filter_secrets(f"request:{json.dumps(request_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))


def _body_binary_string_to_dict(body_str: str) -> Optional[Mapping[str, str]]:
if body_str:
if isinstance(body_str, (bytes, bytearray)):
body_str = body_str.decode()
try:
return json.loads(body_str)
except JSONDecodeError:
return {k: v for k, v in [s.split("=") for s in body_str.split("&")]}
else:
return None


def _response_to_airbyte_message(response: requests.Response) -> AirbyteMessage:
# FIXME: this should return some sort of trace message
response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code}
log_message = filter_secrets(f"response:{json.dumps(response_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
import pytest
import requests
from airbyte_cdk.models import AirbyteLogMessage, Level, SyncMode
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
SimpleRetriever,
_prepared_request_to_airbyte_message,
_response_to_airbyte_message,
)
from airbyte_cdk.sources.declarative.stream_slicers import DatetimeStreamSlicer
from airbyte_cdk.sources.streams.http.auth import NoAuth
from airbyte_cdk.sources.streams.http.http import HttpStream
Expand Down Expand Up @@ -433,3 +437,195 @@ def test_path(test_name, requester_path, paginator_path, expected_path):

actual_path = retriever.path(stream_state=None, stream_slice=None, next_page_token=None)
assert expected_path == actual_path


@pytest.mark.parametrize(
"test_name, http_method, url, headers, params, body_json, body_data, expected_airbyte_message",
[
(
"test_basic_get_request",
HttpMethod.GET,
"https://airbyte.io",
{},
{},
{},
{},
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO, message='request:{"url": "https://airbyte.io/", "http_method": "GET", "headers": {}, "body": null}'
),
),
),
(
"test_get_request_with_headers",
HttpMethod.GET,
"https://airbyte.io",
{"h1": "v1", "h2": "v2"},
{},
{},
{},
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message='request:{"url": "https://airbyte.io/", "http_method": "GET", "headers": {"h1": "v1", "h2": "v2"}, "body": null}',
),
),
),
(
"test_get_request_with_request_params",
HttpMethod.GET,
"https://airbyte.io",
{},
{"p1": "v1", "p2": "v2"},
{},
{},
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message='request:{"url": "https://airbyte.io/?p1=v1&p2=v2", "http_method": "GET", "headers": {}, "body": null}',
),
),
),
(
"test_get_request_with_request_body_json",
HttpMethod.GET,
"https://airbyte.io",
{"Content-Type": "application/json"},
{},
{"b1": "v1", "b2": "v2"},
{},
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message='request:{"url": "https://airbyte.io/", "http_method": "GET", "headers": {"Content-Type": "application/json", "Content-Length": "24"}, "body": {"b1": "v1", "b2": "v2"}}',
),
),
),
(
"test_get_request_with_headers_params_and_body",
HttpMethod.GET,
"https://airbyte.io",
{"Content-Type": "application/json", "h1": "v1"},
{"p1": "v1", "p2": "v2"},
{"b1": "v1", "b2": "v2"},
{},
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message='request:{"url": "https://airbyte.io/?p1=v1&p2=v2", "http_method": "GET", "headers": {"Content-Type": "application/json", "h1": "v1", "Content-Length": "24"}, "body": {"b1": "v1", "b2": "v2"}}',
),
),
),
(
"test_get_request_with_request_body_data",
HttpMethod.GET,
"https://airbyte.io",
{"Content-Type": "application/json"},
{},
{},
{"b1": "v1", "b2": "v2"},
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message='request:{"url": "https://airbyte.io/", "http_method": "GET", "headers": {"Content-Type": "application/json", "Content-Length": "11"}, "body": {"b1": "v1", "b2": "v2"}}',
),
),
),
(
"test_basic_post_request",
HttpMethod.POST,
"https://airbyte.io",
{},
{},
{},
{},
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message='request:{"url": "https://airbyte.io/", "http_method": "POST", "headers": {"Content-Length": "0"}, "body": null}',
),
),
),
],
)
def test_prepared_request_to_airbyte_message(test_name, http_method, url, headers, params, body_json, body_data, expected_airbyte_message):
request = requests.Request(method=http_method.name, url=url, headers=headers, params=params)
if body_json:
request.json = body_json
if body_data:
request.data = body_data
prepared_request = request.prepare()

actual_airbyte_message = _prepared_request_to_airbyte_message(prepared_request)

assert expected_airbyte_message == actual_airbyte_message


@pytest.mark.parametrize(
"test_name, response_body, response_headers, status_code, expected_airbyte_message",
[
(
"test_response_no_body_no_headers",
b"",
{},
200,
AirbyteMessage(
type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message='response:{"body": "", "headers": {}, "status_code": 200}')
),
),
(
"test_response_no_body_with_headers",
b"",
{"h1": "v1", "h2": "v2"},
200,
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO, message='response:{"body": "", "headers": {"h1": "v1", "h2": "v2"}, "status_code": 200}'
),
),
),
(
"test_response_with_body_no_headers",
b'{"b1": "v1", "b2": "v2"}',
{},
200,
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message='response:{"body": "{\\"b1\\": \\"v1\\", \\"b2\\": \\"v2\\"}", "headers": {}, "status_code": 200}',
),
),
),
(
"test_response_with_body_and_headers",
b'{"b1": "v1", "b2": "v2"}',
{"h1": "v1", "h2": "v2"},
200,
AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message='response:{"body": "{\\"b1\\": \\"v1\\", \\"b2\\": \\"v2\\"}", "headers": {"h1": "v1", "h2": "v2"}, "status_code": 200}',
),
),
),
],
)
def test_response_to_airbyte_message(test_name, response_body, response_headers, status_code, expected_airbyte_message):
response = requests.Response()
response.status_code = status_code
response.headers = response_headers
response._content = response_body

actual_airbyte_message = _response_to_airbyte_message(response)

assert expected_airbyte_message == actual_airbyte_message
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

import json
import logging
import traceback
from json import JSONDecodeError
from typing import Any, Dict, Iterable, Optional, Union
from urllib.parse import parse_qs, urljoin, urlparse

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type
from fastapi import Body, HTTPException
from jsonschema import ValidationError

from connector_builder.generated.apis.default_api_interface import DefaultApi
from connector_builder.generated.models.http_request import HttpRequest
from connector_builder.generated.models.http_response import HttpResponse
Expand All @@ -23,6 +21,8 @@
from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams
from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter
from fastapi import Body, HTTPException
from jsonschema import ValidationError


class DefaultApiImpl(DefaultApi):
Expand Down Expand Up @@ -113,15 +113,18 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
log_messages = []
try:
for message_group in self._get_message_groups(
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config)
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config)
):
if isinstance(message_group, AirbyteLogMessage):
log_messages.append({"message": message_group.message})
else:
single_slice.pages.append(message_group)
except Exception as error:
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
raise HTTPException(status_code=400, detail=f"Could not perform read with with error: {error.args[0]}")
raise HTTPException(
status_code=400,
detail=f"Could not perform read with with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}",
)

return StreamRead(logs=log_messages, slices=[single_slice])

Expand Down Expand Up @@ -207,4 +210,11 @@ def _create_low_code_adapter(manifest: Dict[str, Any]) -> LowCodeSourceAdapter:
return LowCodeSourceAdapter(manifest=manifest)
except ValidationError as error:
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
raise HTTPException(status_code=400, detail=f"Invalid connector manifest with error: {error.message}")
raise HTTPException(
status_code=400,
detail=f"Invalid connector manifest with error: {error.message} - {DefaultApiImpl._get_stacktrace_as_string(error)}",
)

@staticmethod
def _get_stacktrace_as_string(error) -> str:
return "".join(traceback.TracebackException.from_exception(error).format())
Loading

0 comments on commit 2f2e530

Please sign in to comment.