From 7196c22a731da4af0c54d4aca0b225712b5ef8f6 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 7 Nov 2022 18:15:00 -0800 Subject: [PATCH] better log message --- .../sources/declarative/requesters/request.py | 12 ++ .../declarative/requesters/response.py | 12 ++ .../retrievers/simple_retriever.py | 159 +++++++++++------- 3 files changed, 121 insertions(+), 62 deletions(-) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/response.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request.py new file mode 100644 index 000000000000..185af5f93385 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request.py @@ -0,0 +1,12 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass + + +@dataclass +class Request: + url: str + headers: dict + body: dict diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/response.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/response.py new file mode 100644 index 000000000000..4f4b76026fc2 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/response.py @@ -0,0 +1,12 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass + + +@dataclass +class Response: + body: dict + headers: dict + status_code: int diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 5ed3e19f2f94..83804cf26260 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -1,28 +1,41 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - +import json import logging from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import requests -from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode +from dataclasses_jsonschema import JsonSchemaMixin + +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, \ + SyncMode from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.exceptions import ReadException -from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector -from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction -from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination -from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.extractors.http_selector import \ + HttpSelector +from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import \ + ResponseAction +from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import \ + NoPagination +from airbyte_cdk.sources.declarative.requesters.paginators.paginator import \ + Paginator +from airbyte_cdk.sources.declarative.requesters.request import Request from airbyte_cdk.sources.declarative.requesters.requester import Requester +from airbyte_cdk.sources.declarative.requesters.response import Response from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever -from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice -from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer -from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState +from airbyte_cdk.sources.declarative.stream_slicers.single_slice import \ + SingleSlice +from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import \ + StreamSlicer +from airbyte_cdk.sources.declarative.types import Record, StreamSlice, \ + StreamState from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http import HttpStream -from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message -from dataclasses_jsonschema import JsonSchemaMixin +from airbyte_cdk.sources.utils.record_helper import \ + stream_data_to_airbyte_message +from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets @dataclass @@ -127,12 +140,12 @@ def error_message(self, response: requests.Response) -> str: return self.requester.interpret_response_status(response).error_message def _get_request_options( - self, - stream_slice: Optional[StreamSlice], - next_page_token: Optional[Mapping[str, Any]], - requester_method, - paginator_method, - stream_slicer_method, + self, + stream_slice: Optional[StreamSlice], + next_page_token: Optional[Mapping[str, Any]], + requester_method, + paginator_method, + stream_slicer_method, ): """ Get the request_option from the requester and from the paginator @@ -153,16 +166,16 @@ def _get_request_options( stream_slicer_mapping_keys = set(stream_slicer_mapping.keys()) intersection = ( - (requester_mapping_keys & paginator_mapping_keys) - | (requester_mapping_keys & stream_slicer_mapping_keys) - | (paginator_mapping_keys & stream_slicer_mapping_keys) + (requester_mapping_keys & paginator_mapping_keys) + | (requester_mapping_keys & stream_slicer_mapping_keys) + | (paginator_mapping_keys & stream_slicer_mapping_keys) ) if intersection: raise ValueError(f"Duplicate keys found: {intersection}") return {**requester_mapping, **paginator_mapping, **stream_slicer_mapping} def request_headers( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None + self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None ) -> Mapping[str, Any]: """ Specifies request headers. @@ -178,10 +191,10 @@ def request_headers( return {str(k): str(v) for k, v in headers.items()} def request_params( - self, - stream_state: StreamSlice, - stream_slice: Optional[StreamSlice] = None, - next_page_token: Optional[Mapping[str, Any]] = None, + self, + stream_state: StreamSlice, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: """ Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. @@ -197,10 +210,10 @@ def request_params( ) def request_body_data( - self, - stream_state: StreamState, - stream_slice: Optional[StreamSlice] = None, - next_page_token: Optional[Mapping[str, Any]] = None, + self, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Union[Mapping, str]]: """ Specifies how to populate the body of the request with a non-JSON payload. @@ -232,10 +245,10 @@ def request_body_data( ) def request_body_json( - self, - stream_state: StreamState, - stream_slice: Optional[StreamSlice] = None, - next_page_token: Optional[Mapping[str, Any]] = None, + self, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: """ Specifies how to populate the body of the request with a JSON payload. @@ -252,10 +265,10 @@ def request_body_json( ) def request_kwargs( - self, - stream_state: StreamState, - stream_slice: Optional[StreamSlice] = None, - next_page_token: Optional[Mapping[str, Any]] = None, + self, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: """ Specifies how to configure a mapping of keyword arguments to be used when creating the HTTP request. @@ -266,11 +279,11 @@ def request_kwargs( return self.requester.request_kwargs(stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token) def path( - self, - *, - stream_state: Optional[StreamState] = None, - stream_slice: Optional[StreamSlice] = None, - next_page_token: Optional[Mapping[str, Any]] = None, + self, + *, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> str: """ Return the path the submit the next request to. @@ -302,12 +315,12 @@ def use_cache(self) -> bool: return self.requester.use_cache def parse_response( - self, - response: requests.Response, - *, - stream_state: StreamState, - stream_slice: Optional[StreamSlice] = None, - next_page_token: Optional[Mapping[str, Any]] = None, + self, + response: requests.Response, + *, + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, ) -> Iterable[Record]: # if fail -> raise exception # if ignore -> ignore response and return no records @@ -349,11 +362,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, return self.paginator.next_page_token(response, self._last_records) def read_records( - self, - sync_mode: SyncMode, - cursor_field: Optional[List[str]] = None, - stream_slice: Optional[StreamSlice] = None, - stream_state: Optional[StreamState] = None, + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[StreamSlice] = None, + stream_state: Optional[StreamState] = None, ) -> Iterable[StreamData]: # Warning: use self.state instead of the stream_state passed as argument! stream_slice = stream_slice or {} # None-check @@ -365,8 +378,10 @@ def read_records( stream_slice, stream_state, ) + # FIXME: need to get the actual stream schema before this can use type transformers + stream_schema = {} for r in records_generator: - message = stream_data_to_airbyte_message(self.name, r, self.transformer, self.get_json_schema()) + message = stream_data_to_airbyte_message(self.name, r, self.transformer, stream_schema) if message.type == MessageType.RECORD: self.stream_slicer.update_cursor(stream_slice, last_record=message.record.data) if message.type == MessageType.RECORD or self.logger.isEnabledFor(logging.DEBUG): @@ -377,7 +392,7 @@ def read_records( yield from [] def stream_slices( - self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None ) -> Iterable[Optional[Mapping[str, Any]]]: """ Specifies the slices for this stream. See the stream slicing section of the docs for more information. @@ -405,18 +420,38 @@ def parse_records_and_emit_request_and_responses(self, request, response, stream # Not great to need to call _read_pages which is a private method # A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester for record_mapping in self._read_pages( - lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state + lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, + stream_state ): # FIXME: need to get the real json schema if we want to support type transforms json_schema = {} yield stream_data_to_airbyte_message(self.name, record_mapping, self.transformer, json_schema) - def _create_trace_message_from_request(self, request): + def _create_trace_message_from_request(self, request: requests.PreparedRequest): # FIXME: this should return some sort of trace message - log_message = f"request:{str(request)}" - return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) + request_object = Request(url=request.url, headers=request.headers, body=self._safe_request_body(request.body)) + log_message = filter_secrets(f"request:{str(request_object)}") + return AirbyteLogMessage(level=Level.INFO, message=log_message) - def _create_trace_message_from_response(self, response): + def _create_trace_message_from_response(self, response: requests.Response): # FIXME: this should return some sort of trace message - log_message = f"response:{str(response)}" - return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) + response_object = Response(body=self._safe_response_json(response), headers=response.headers, status_code=response.status_code) + + log_message = filter_secrets(f"response:{response_object}") + return AirbyteLogMessage(level=Level.INFO, message=log_message) + + @staticmethod + def _safe_request_body(request_body: Union[str, bytes]) -> dict: + if isinstance(request_body, bytes): + request_body = request_body.decode("utf-8") + if isinstance(request_body, str): + return json.loads(request_body) + else: + raise ValueError(f"Unexpected request body type: {type(request_body)}") + + @staticmethod + def _safe_response_json(response: requests.Response) -> dict: + try: + return response.json() + except requests.exceptions.JSONDecodeError: + return {}