Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spike source emit request and response #17839

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from abc import abstractmethod
from typing import Tuple
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, Level
from airbyte_cdk.models.airbyte_protocol import Type as MessageType
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker

Expand All @@ -31,3 +34,23 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
The error object will be cast to string to display the problem to the user.
"""
return self.connection_checker.check_connection(self, logger, config)

def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
last_request = None
for airbyte_message in super().read(logger, config, catalog, state):
if airbyte_message.record and airbyte_message.record.data["_ab_request"] != last_request:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complexity: also need to check if we're processing a new stream slice

yield AirbyteMessage(
type=MessageType.LOG, log=AirbyteLogMessage(message=str(airbyte_message.record.data["_ab_request"]), level=Level.INFO)
)
yield AirbyteMessage(
type=MessageType.LOG, log=AirbyteLogMessage(message=str(airbyte_message.record.data["_ab_response"]), level=Level.INFO)
)
last_request = airbyte_message.record.data.pop("_ab_request")
airbyte_message.record.data.pop("_ab_response")
yield airbyte_message
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

Expand Down Expand Up @@ -59,6 +58,7 @@ def __post_init__(self, options: Mapping[str, Any]):
HttpStream.__init__(self, self.requester.get_authenticator())
self._last_response = None
self._last_records = None
self._request = None

@property
def name(self) -> str:
Expand Down Expand Up @@ -312,6 +312,9 @@ def parse_response(
response=response, stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token
)
self._last_records = records
for r in records:
r["_ab_response"] = response.content
r["_ab_request"] = self._request
return records

@property
Expand Down Expand Up @@ -353,6 +356,12 @@ def read_records(
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
yield from []

def log_request(self, request: requests.PreparedRequest):
self._request = {"url": request.url, "body": request.body, "headers": dict(request.headers)}

def log_response(self, response):
pass

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
Expand Down
28 changes: 18 additions & 10 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None):
self.cache_file = self.request_cache()
# we need this attr to get metadata about cassettes, such as record play count, all records played, etc.
self.cassete = None
self.current_page = 0

@property
def cache_filename(self):
Expand Down Expand Up @@ -286,17 +287,10 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
Unexpected transient exceptions use the default backoff parameters.
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
self.logger.debug(
"Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body}
)
self.log_request(request)
response: requests.Response = self._session.send(request, **request_kwargs)

# Evaluation of response.text can be heavy, for example, if streaming a large response
# Do it only in debug mode
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
"Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text}
)
self.log_response(response)
if self.should_retry(response):
custom_backoff_time = self.backoff_time(response)
if custom_backoff_time:
Expand Down Expand Up @@ -404,6 +398,7 @@ def read_records(
pagination_complete = False

next_page_token = None
self.current_page = 0
while not pagination_complete:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
Expand All @@ -426,14 +421,27 @@ def read_records(
else:
response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

self.current_page += 1
next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []

def log_request(self, request):
self.logger.debug(
"Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body}
)

def log_response(self, response):
# Evaluation of response.text can be heavy, for example, if streaming a large response
# Do it only in debug mode
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
"Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text}
)


class HttpSubStream(HttpStream, ABC):
def __init__(self, parent: HttpStream, **kwargs):
Expand Down