Skip to content

Commit

Permalink
better log message
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Nov 8, 2022
1 parent 4f1c005 commit 7196c22
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass


@dataclass
class Request:
url: str
headers: dict
body: dict
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass


@dataclass
class Response:
body: dict
headers: dict
status_code: int
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import logging
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import requests
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
from dataclasses_jsonschema import JsonSchemaMixin

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, \
SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.extractors.http_selector import \
HttpSelector
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import \
ResponseAction
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import \
NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import \
Paginator
from airbyte_cdk.sources.declarative.requesters.request import Request
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.requesters.response import Response
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import \
SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import \
StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, \
StreamState
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from dataclasses_jsonschema import JsonSchemaMixin
from airbyte_cdk.sources.utils.record_helper import \
stream_data_to_airbyte_message
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets


@dataclass
Expand Down Expand Up @@ -127,12 +140,12 @@ def error_message(self, response: requests.Response) -> str:
return self.requester.interpret_response_status(response).error_message

def _get_request_options(
self,
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
requester_method,
paginator_method,
stream_slicer_method,
self,
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
requester_method,
paginator_method,
stream_slicer_method,
):
"""
Get the request_option from the requester and from the paginator
Expand All @@ -153,16 +166,16 @@ def _get_request_options(
stream_slicer_mapping_keys = set(stream_slicer_mapping.keys())

intersection = (
(requester_mapping_keys & paginator_mapping_keys)
| (requester_mapping_keys & stream_slicer_mapping_keys)
| (paginator_mapping_keys & stream_slicer_mapping_keys)
(requester_mapping_keys & paginator_mapping_keys)
| (requester_mapping_keys & stream_slicer_mapping_keys)
| (paginator_mapping_keys & stream_slicer_mapping_keys)
)
if intersection:
raise ValueError(f"Duplicate keys found: {intersection}")
return {**requester_mapping, **paginator_mapping, **stream_slicer_mapping}

def request_headers(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
) -> Mapping[str, Any]:
"""
Specifies request headers.
Expand All @@ -178,10 +191,10 @@ def request_headers(
return {str(k): str(v) for k, v in headers.items()}

def request_params(
self,
stream_state: StreamSlice,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
self,
stream_state: StreamSlice,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
Expand All @@ -197,10 +210,10 @@ def request_params(
)

def request_body_data(
self,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
self,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
"""
Specifies how to populate the body of the request with a non-JSON payload.
Expand Down Expand Up @@ -232,10 +245,10 @@ def request_body_data(
)

def request_body_json(
self,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
self,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
"""
Specifies how to populate the body of the request with a JSON payload.
Expand All @@ -252,10 +265,10 @@ def request_body_json(
)

def request_kwargs(
self,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
self,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Specifies how to configure a mapping of keyword arguments to be used when creating the HTTP request.
Expand All @@ -266,11 +279,11 @@ def request_kwargs(
return self.requester.request_kwargs(stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token)

def path(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
"""
Return the path the submit the next request to.
Expand Down Expand Up @@ -302,12 +315,12 @@ def use_cache(self) -> bool:
return self.requester.use_cache

def parse_response(
self,
response: requests.Response,
*,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
self,
response: requests.Response,
*,
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Iterable[Record]:
# if fail -> raise exception
# if ignore -> ignore response and return no records
Expand Down Expand Up @@ -349,11 +362,11 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return self.paginator.next_page_token(response, self._last_records)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[StreamSlice] = None,
stream_state: Optional[StreamState] = None,
self,
sync_mode: SyncMode,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[StreamSlice] = None,
stream_state: Optional[StreamState] = None,
) -> Iterable[StreamData]:
# Warning: use self.state instead of the stream_state passed as argument!
stream_slice = stream_slice or {} # None-check
Expand All @@ -365,8 +378,10 @@ def read_records(
stream_slice,
stream_state,
)
# FIXME: need to get the actual stream schema before this can use type transformers
stream_schema = {}
for r in records_generator:
message = stream_data_to_airbyte_message(self.name, r, self.transformer, self.get_json_schema())
message = stream_data_to_airbyte_message(self.name, r, self.transformer, stream_schema)
if message.type == MessageType.RECORD:
self.stream_slicer.update_cursor(stream_slice, last_record=message.record.data)
if message.type == MessageType.RECORD or self.logger.isEnabledFor(logging.DEBUG):
Expand All @@ -377,7 +392,7 @@ def read_records(
yield from []

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Expand Down Expand Up @@ -405,18 +420,38 @@ def parse_records_and_emit_request_and_responses(self, request, response, stream
# Not great to need to call _read_pages which is a private method
# A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester
for record_mapping in self._read_pages(
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice,
stream_state
):
# FIXME: need to get the real json schema if we want to support type transforms
json_schema = {}
yield stream_data_to_airbyte_message(self.name, record_mapping, self.transformer, json_schema)

def _create_trace_message_from_request(self, request):
def _create_trace_message_from_request(self, request: requests.PreparedRequest):
# FIXME: this should return some sort of trace message
log_message = f"request:{str(request)}"
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))
request_object = Request(url=request.url, headers=request.headers, body=self._safe_request_body(request.body))
log_message = filter_secrets(f"request:{str(request_object)}")
return AirbyteLogMessage(level=Level.INFO, message=log_message)

def _create_trace_message_from_response(self, response):
def _create_trace_message_from_response(self, response: requests.Response):
# FIXME: this should return some sort of trace message
log_message = f"response:{str(response)}"
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))
response_object = Response(body=self._safe_response_json(response), headers=response.headers, status_code=response.status_code)

log_message = filter_secrets(f"response:{response_object}")
return AirbyteLogMessage(level=Level.INFO, message=log_message)

@staticmethod
def _safe_request_body(request_body: Union[str, bytes]) -> dict:
if isinstance(request_body, bytes):
request_body = request_body.decode("utf-8")
if isinstance(request_body, str):
return json.loads(request_body)
else:
raise ValueError(f"Unexpected request body type: {type(request_body)}")

@staticmethod
def _safe_response_json(response: requests.Response) -> dict:
try:
return response.json()
except requests.exceptions.JSONDecodeError:
return {}

0 comments on commit 7196c22

Please sign in to comment.