diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py index 6e79356ee93b..6062b8005bfa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_source.py @@ -2,9 +2,12 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import logging from abc import abstractmethod -from typing import Tuple +from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple, Union +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, Level +from airbyte_cdk.models.airbyte_protocol import Type as MessageType from airbyte_cdk.sources.abstract_source import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker @@ -31,3 +34,23 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: The error object will be cast to string to display the problem to the user. """ return self.connection_checker.check_connection(self, logger, config) + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None, + ) -> Iterator[AirbyteMessage]: + last_request = None + for airbyte_message in super().read(logger, config, catalog, state): + if airbyte_message.record and airbyte_message.record.data["_ab_request"] != last_request: + yield AirbyteMessage( + type=MessageType.LOG, log=AirbyteLogMessage(message=str(airbyte_message.record.data["_ab_request"]), level=Level.INFO) + ) + yield AirbyteMessage( + type=MessageType.LOG, log=AirbyteLogMessage(message=str(airbyte_message.record.data["_ab_response"]), level=Level.INFO) + ) + last_request = airbyte_message.record.data.pop("_ab_request") + airbyte_message.record.data.pop("_ab_response") + yield airbyte_message diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 2eea3237daf2..deb802eced37 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -1,7 +1,6 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union @@ -59,6 +58,7 @@ def __post_init__(self, options: Mapping[str, Any]): HttpStream.__init__(self, self.requester.get_authenticator()) self._last_response = None self._last_records = None + self._request = None @property def name(self) -> str: @@ -312,6 +312,9 @@ def parse_response( response=response, stream_state=self.state, stream_slice=stream_slice, next_page_token=next_page_token ) self._last_records = records + for r in records: + r["_ab_response"] = response.content + r["_ab_request"] = self._request return records @property @@ -353,6 +356,12 @@ def read_records( self.stream_slicer.update_cursor(stream_slice, last_record=last_record) yield from [] + def log_request(self, request: requests.PreparedRequest): + self._request = {"url": request.url, "body": request.body, "headers": dict(request.headers)} + + def log_response(self, response): + pass + def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Optional[StreamState] = None ) -> Iterable[Optional[Mapping[str, Any]]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index c99dfe3bd1a9..72bd362bffc6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -48,6 +48,7 @@ def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None): self.cache_file = self.request_cache() # we need this attr to get metadata about cassettes, such as record play count, all records played, etc. self.cassete = None + self.current_page = 0 @property def cache_filename(self): @@ -286,17 +287,10 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Unexpected transient exceptions use the default backoff parameters. Unexpected persistent exceptions are not handled and will cause the sync to fail. """ - self.logger.debug( - "Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body} - ) + self.log_request(request) response: requests.Response = self._session.send(request, **request_kwargs) - # Evaluation of response.text can be heavy, for example, if streaming a large response - # Do it only in debug mode - if self.logger.isEnabledFor(logging.DEBUG): - self.logger.debug( - "Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text} - ) + self.log_response(response) if self.should_retry(response): custom_backoff_time = self.backoff_time(response) if custom_backoff_time: @@ -404,6 +398,7 @@ def read_records( pagination_complete = False next_page_token = None + self.current_page = 0 while not pagination_complete: request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) request = self._create_prepared_request( @@ -426,7 +421,7 @@ def read_records( else: response = self._send_request(request, request_kwargs) yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) - + self.current_page += 1 next_page_token = self.next_page_token(response) if not next_page_token: pagination_complete = True @@ -434,6 +429,19 @@ def read_records( # Always return an empty generator just in case no records were ever yielded yield from [] + def log_request(self, request): + self.logger.debug( + "Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body} + ) + + def log_response(self, response): + # Evaluation of response.text can be heavy, for example, if streaming a large response + # Do it only in debug mode + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug( + "Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text} + ) + class HttpSubStream(HttpStream, ABC): def __init__(self, parent: HttpStream, **kwargs):