Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SimpleRetriever yield request and response as log messages #18644

Merged
merged 139 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 134 commits
Commits
Show all changes
139 commits
Select commit Hold shift + click to select a range
5f88aa6
method yielding airbytemessage
girarda Oct 27, 2022
0de8395
move to Stream
girarda Oct 27, 2022
e5acdcf
update abstract source
girarda Oct 27, 2022
8a63af7
reset
girarda Oct 27, 2022
d0dcb52
missing file
girarda Oct 28, 2022
d021da6
Yield request and response as log messages
girarda Oct 28, 2022
dc9ff7b
only emit request and responses if the debug flag is on
girarda Oct 29, 2022
adf07fa
add test docker image
girarda Oct 31, 2022
c68e01d
script to run acceptance tests with local cdk
girarda Oct 31, 2022
258990d
Update conftest to use a different image
girarda Oct 31, 2022
b0f80a7
extract to method
girarda Oct 31, 2022
68528b2
dont use a different image tag
girarda Oct 31, 2022
b31f9bd
Always install local cdk
girarda Oct 31, 2022
893beab
break the cdk
girarda Oct 31, 2022
c3964c9
get path from current working directory
girarda Oct 31, 2022
c69774a
or
girarda Oct 31, 2022
152f5df
ignore unit test
girarda Oct 31, 2022
8ea5e85
debug log
girarda Oct 31, 2022
1684823
Revert "AMI change: ami-0f23be2f917510c26 -> ami-005924fb76f7477ce (#…
girarda Oct 31, 2022
b08a9c4
build from the top
girarda Oct 31, 2022
4fd9ed0
Update source-acceptance-test
girarda Oct 31, 2022
06b1c70
fix
girarda Oct 31, 2022
3e4b135
copy setup
girarda Oct 31, 2022
4702f4d
some work on the gradle plugin
girarda Oct 31, 2022
ae7b7fb
reset to master
girarda Nov 1, 2022
3d86bce
delete unused file
girarda Nov 1, 2022
9dc4486
delete unused file
girarda Nov 1, 2022
d3852f3
reset to master
girarda Nov 1, 2022
179d689
optional argument
girarda Nov 1, 2022
65c416c
delete dead code
girarda Nov 1, 2022
b13fe7a
use latest cdk with sendgrid
girarda Nov 2, 2022
0fff056
Merge branch 'master' into alex/read_with_logs
girarda Nov 2, 2022
01dcd2a
fix sendgrid dockerfile
girarda Nov 2, 2022
5d46bb0
Merge branch 'alex/always_install_local_cdk' into alex/read_with_logs
girarda Nov 2, 2022
600c195
break the cdk
girarda Nov 2, 2022
0b48fea
use local file
girarda Nov 2, 2022
613e876
Revert "break the cdk"
girarda Nov 2, 2022
ea2089d
dont raise an exception
girarda Nov 3, 2022
e2d11b3
reset to master
girarda Nov 3, 2022
d901e1c
unit tests
girarda Nov 3, 2022
0600e59
missing test
girarda Nov 3, 2022
82e46c5
more unit tests
girarda Nov 3, 2022
235f9d4
Merge branch 'alex/read_with_logs' into alex/read_with_logs_lowcode
girarda Nov 3, 2022
50bb3ed
Merge branch 'master' into alex/read_with_logs
girarda Nov 3, 2022
608c074
remove deprecated comment
girarda Nov 3, 2022
acfcec8
newline
girarda Nov 3, 2022
3fa6a00
reset to master
girarda Nov 3, 2022
048508d
Merge branch 'master' into alex/read_with_logs
girarda Nov 3, 2022
3e85dca
Merge branch 'master' into alex/read_with_logs
girarda Nov 3, 2022
6d9d0aa
remove files
girarda Nov 3, 2022
464b247
reset
girarda Nov 3, 2022
4acd199
Merge branch 'master' into alex/read_with_logs
girarda Nov 4, 2022
0c36dcb
Update abstract source
girarda Nov 4, 2022
46a807e
remove method from stream
girarda Nov 4, 2022
bad305b
convert to airbytemessage
girarda Nov 5, 2022
d78628f
unittests
girarda Nov 5, 2022
7fd5cc9
Update
girarda Nov 5, 2022
0cbd28e
unit test
girarda Nov 5, 2022
a1a139e
remove debug logs
girarda Nov 5, 2022
e12dad8
Merge branch 'alex/read_with_logs' into alex/read_with_logs_lowcode
girarda Nov 5, 2022
b1d62cd
Revert "remove debug logs"
girarda Nov 5, 2022
1da2ed1
Revert "Revert "remove debug logs""
girarda Nov 5, 2022
5dac7c2
Revert "reset to master"
girarda Nov 5, 2022
883b400
fix
girarda Nov 5, 2022
f27919f
slightly better test
girarda Nov 5, 2022
a6f6ebb
typing
girarda Nov 5, 2022
23c3f2c
extract method
girarda Nov 5, 2022
594ad06
Revert "Revert "reset to master""
girarda Nov 5, 2022
2d91e9a
reset to master
girarda Nov 5, 2022
cff30a1
reset to master
girarda Nov 5, 2022
5833c84
Revert "reset to master"
girarda Nov 5, 2022
102640f
Comment
girarda Nov 5, 2022
319fdbe
operate on the message
girarda Nov 5, 2022
610d4ec
Revert "Revert "reset to master""
girarda Nov 5, 2022
bbc7c57
comment
girarda Nov 5, 2022
2f91b80
test
girarda Nov 5, 2022
3fc697c
Revert "test"
girarda Nov 5, 2022
62d95eb
test
girarda Nov 5, 2022
003b860
Revert "test"
girarda Nov 5, 2022
27150ba
test
girarda Nov 5, 2022
26c3289
Revert "test"
girarda Nov 5, 2022
b36842a
format
girarda Nov 5, 2022
8df28d9
format
girarda Nov 5, 2022
a6f9fb8
format
girarda Nov 5, 2022
75f525b
symlink
girarda Nov 7, 2022
ce03775
Update setup
girarda Nov 7, 2022
2ac84c8
update path
girarda Nov 7, 2022
19da01c
reset to master
girarda Nov 7, 2022
a1106cb
update
girarda Nov 7, 2022
42b61d7
Add local files
girarda Nov 7, 2022
f81a093
Merge branch 'master' into alex/always_install_local_cdk
girarda Nov 7, 2022
632ee94
greenhouse
girarda Nov 7, 2022
76a064a
format
girarda Nov 7, 2022
415a0e4
symlink
girarda Nov 7, 2022
076412a
try reordering
girarda Nov 8, 2022
4f1c005
better error message
girarda Nov 8, 2022
7196c22
better log message
girarda Nov 8, 2022
7cec110
reset to master
girarda Nov 8, 2022
ad7128f
merge for qa
girarda Nov 8, 2022
9a1760a
Revert "merge for qa"
girarda Nov 8, 2022
390ad5c
Merge branch 'master' into alex/read_with_logs_lowcode
girarda Nov 8, 2022
85d3998
reset to master
girarda Nov 8, 2022
aec12ca
reset to master
girarda Nov 8, 2022
9f86498
reset to master
girarda Nov 8, 2022
b0f71b8
format
girarda Nov 8, 2022
3459e62
gradlew format
girarda Nov 8, 2022
c0233bb
Merge branch 'master' into alex/read_with_logs_lowcode
girarda Nov 8, 2022
1c142e5
right type hints
girarda Nov 8, 2022
b346d4f
reset to master
girarda Nov 8, 2022
456f8ea
reset to master
girarda Nov 8, 2022
43d6e22
gradlew format
girarda Nov 8, 2022
46c5b5a
a bunch of small fixes
girarda Nov 8, 2022
f8d5664
Update output format
girarda Nov 8, 2022
0e16a90
fixes from feedback
girarda Nov 9, 2022
01a90aa
fixme comment
girarda Nov 9, 2022
73a52dd
streams cannot return AirbyteRecordMessage
girarda Nov 9, 2022
aa03ee8
fix
girarda Nov 9, 2022
132028e
format
girarda Nov 9, 2022
94c3af1
only return logs when running on debug mode
girarda Nov 9, 2022
d8e38aa
Merge branch 'master' into alex/read_with_logs_lowcode
girarda Nov 9, 2022
880e08e
move branching
girarda Nov 9, 2022
7a0f6ae
update typing
girarda Nov 9, 2022
948a372
remove dead code
girarda Nov 9, 2022
26565d4
fix simpleretriever name
girarda Nov 9, 2022
6ca1a5a
i think this is better
girarda Nov 9, 2022
58c05f1
log response.text
girarda Nov 9, 2022
9b42f31
debug flag
girarda Nov 9, 2022
21845ee
comment
girarda Nov 9, 2022
87bcac0
pass config
girarda Nov 9, 2022
8a5e16c
comments
girarda Nov 9, 2022
a8a8a2d
run SATs
girarda Nov 9, 2022
9060419
fix most of the unit tests
girarda Nov 9, 2022
64bb445
fix unit test
girarda Nov 9, 2022
e9a867c
reset to master
girarda Nov 9, 2022
85979a8
runFromPath
girarda Nov 9, 2022
846bed7
Revert "runFromPath"
girarda Nov 9, 2022
b114add
Revert "run SATs"
girarda Nov 9, 2022
adfb495
no need to convert to dict
girarda Nov 9, 2022
3dbeeb3
fix test
girarda Nov 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
Expand Down Expand Up @@ -240,9 +241,7 @@ def _read_incremental(
)
record_counter = 0
for message_counter, record_data_or_message in enumerate(records, start=1):
message = stream_data_to_airbyte_message(
stream_name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
)
message = self._get_message(record_data_or_message, stream_instance)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract logic to convert to an AirbyteMessage to _get_message to avoid duplication between _read_incremental and _read_full_refresh

yield message
if message.type == MessageType.RECORD:
record = message.record
Expand Down Expand Up @@ -287,9 +286,7 @@ def _read_full_refresh(
cursor_field=configured_stream.cursor_field,
)
for record_data_or_message in record_data_or_messages:
message = stream_data_to_airbyte_message(
stream_instance.name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
)
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
Expand All @@ -315,3 +312,12 @@ def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: S
"""
if hasattr(logger, "level"):
stream_instance.logger.setLevel(logger.level)

def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream):
"""
Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
"""
if isinstance(record_data_or_message, AirbyteMessage):
return record_data_or_message
else:
return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema())
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from typing import Any, Iterator, List, Mapping, MutableMapping, Union

from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
)
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand All @@ -35,12 +41,14 @@ class ManifestDeclarativeSource(DeclarativeSource):

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, source_config: ConnectionDefinition):
def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai the connector-builder server can set this field to true

"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._source_config = source_config
self._debug = debug
self._factory = DeclarativeComponentFactory()

self._validate_source()
Expand Down Expand Up @@ -73,6 +81,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""
self._configure_logger_level(logger)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set log level to debug if debug mode is enabled from the constructor

self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})

spec = self._source_config.get("spec")
Expand All @@ -84,6 +93,27 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
else:
return super().spec(logger)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
self._configure_logger_level(logger)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set log level to debug if debug mode is enabled from the constructor

return super().check(logger, config)

def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
self._configure_logger_level(logger)
Copy link
Contributor Author

@girarda girarda Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set log level to debug if debug mode is enabled from the constructor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the next breaking change, i feel like we should remove the logger from these parameters. It should be bound to the instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a CDK v2 wishlist #19239

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set log level to debug if debug mode is enabled

yield from super().read(logger, config, catalog, state)

def _configure_logger_level(self, logger: logging.Logger):
"""
Set the log level to logging.DEBUG if debug mode is enabled
"""
if self._debug:
logger.setLevel(logging.DEBUG)

def _validate_source(self):
full_config = {}
if "version" in self._source_config:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from typing import Iterable, List, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import StreamData
from dataclasses_jsonschema import JsonSchemaMixin


Expand All @@ -24,7 +25,7 @@ def read_records(
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[StreamSlice] = None,
stream_state: Optional[StreamState] = None,
) -> Iterable[Record]:
) -> Iterable[StreamData]:
"""
Fetch a stream's records from an HTTP API source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import logging
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import StreamData
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from dataclasses_jsonschema import JsonSchemaMixin


Expand Down Expand Up @@ -46,9 +52,10 @@ class SimpleRetriever(Retriever, HttpStream, JsonSchemaMixin):

requester: Requester
record_selector: HttpSelector
config: Config
options: InitVar[Mapping[str, Any]]
name: str
_name: str = field(init=False, repr=False, default="")
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False, default="")
paginator: Optional[Paginator] = None
Expand All @@ -59,13 +66,15 @@ def __post_init__(self, options: Mapping[str, Any]):
HttpStream.__init__(self, self.requester.get_authenticator())
self._last_response = None
self._last_records = None
self._options = options
self.name = InterpolatedString(self._name, options=options)

@property
def name(self) -> str:
"""
:return: Stream name
"""
return self._name
return self._name.eval(self.config)

@name.setter
def name(self, value: str) -> None:
Expand Down Expand Up @@ -347,17 +356,23 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
cursor_field: Optional[List[str]] = None,
stream_slice: Optional[StreamSlice] = None,
stream_state: Optional[StreamState] = None,
) -> Iterable[Mapping[str, Any]]:
) -> Iterable[StreamData]:
# Warning: use self.state instead of the stream_state passed as argument!
stream_slice = stream_slice or {} # None-check
self.paginator.reset()
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state)
for r in records_generator:
self.stream_slicer.update_cursor(stream_slice, last_record=r)
yield r
records_generator = self._read_pages(
lambda req, res, state, _slice: self.parse_records_and_emit_request_and_responses(
req, res, stream_slice=_slice, stream_state=state
),
stream_slice,
stream_state,
)
for record in records_generator:
self.stream_slicer.update_cursor(stream_slice, last_record=record)
yield record
else:
last_record = self._last_records[-1] if self._last_records else None
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
Expand Down Expand Up @@ -385,3 +400,37 @@ def state(self) -> MutableMapping[str, Any]:
def state(self, value: StreamState):
"""State setter, accept state serialized by state getter."""
self.stream_slicer.update_cursor(value)

def parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]:
# Only emit requests and responses when running in debug mode
if self.logger.isEnabledFor(logging.DEBUG):
yield self._create_trace_message_from_request(request)
yield self._create_trace_message_from_response(response)
# Not great to need to call _read_pages which is a private method
# A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester
yield from self._read_pages(
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
)

def _create_trace_message_from_request(self, request: requests.PreparedRequest):
# FIXME: this should return some sort of trace message
request_dict = {"url": request.url, "headers": dict(request.headers), "body": self._safe_request_body(request.body)}
log_message = filter_secrets(f"request:{json.dumps(request_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))

def _create_trace_message_from_response(self, response: requests.Response):
# FIXME: this should return some sort of trace message
response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code}
log_message = filter_secrets(f"response:{json.dumps(response_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))

@staticmethod
def _safe_request_body(request_body: Union[str, bytes]) -> dict:
if not request_body:
return {}
if isinstance(request_body, bytes):
request_body = request_body.decode("utf-8")
if isinstance(request_body, str):
return json.loads(request_body)
else:
raise ValueError(f"Unexpected request body type: {type(request_body)}")
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, List, Mapping, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
Expand Down Expand Up @@ -138,6 +138,12 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera
for parent_record in parent_stream.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
):
# Skip non-records (eg AirbyteLogMessage)
if isinstance(parent_record, AirbyteMessage):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to check the type of the output of read_records now

if parent_record.type == Type.RECORD:
parent_record = parent_record.record.data
else:
continue
empty_parent_slice = False
stream_state_value = parent_record.get(parent_field)
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
class YamlDeclarativeSource(ManifestDeclarativeSource):
"""Declarative source defined by a yaml file"""

def __init__(self, path_to_yaml):
def __init__(self, path_to_yaml, debug: bool = False):
"""
:param path_to_yaml: Path to the yaml file describing the source
"""
self._path_to_yaml = path_to_yaml
source_config = self._read_and_parse_yaml_file(path_to_yaml)
super().__init__(source_config)
super().__init__(source_config, debug)

def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:
package = self.__class__.__module__.split(".")[0]
Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteLogMessage, AirbyteRecordMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode

# list of all possible HTTP methods which can be used for sending of request bodies
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
Expand All @@ -22,7 +22,7 @@
# AirbyteRecordMessage: An AirbyteRecordMessage
# AirbyteLogMessage: A log message
# AirbyteTraceMessage: A trace message
StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage]
StreamData = Union[Mapping[str, Any], AirbyteLogMessage, AirbyteTraceMessage]


def package_name_from_class(cls: object) -> str:
Expand Down
49 changes: 33 additions & 16 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import os
from abc import ABC, abstractmethod
from contextlib import suppress
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from urllib.parse import urljoin

import requests
import requests_cache
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.core import Stream, StreamData
from requests.auth import AuthBase
from requests_cache.session import CachedSession

Expand Down Expand Up @@ -408,24 +408,25 @@ def read_records(
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
) -> Iterable[StreamData]:
yield from self._read_pages(
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
)

def _read_pages(
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
self,
records_generator_fn: Callable[
[requests.PreparedRequest, requests.Response, Mapping[str, Any], Mapping[str, Any]], Iterable[StreamData]
],
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[StreamData]:
stream_state = stream_state or {}
pagination_complete = False

next_page_token = None
while not pagination_complete:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yield from records_generator_fn(request, response, stream_state, stream_slice)

next_page_token = self.next_page_token(response)
if not next_page_token:
Expand All @@ -434,6 +435,22 @@ def read_records(
# Always return an empty generator just in case no records were ever yielded
yield from []

def _fetch_next_page(
self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Tuple[requests.PreparedRequest, requests.Response]:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

response = self._send_request(request, request_kwargs)
return request, response


class HttpSubStream(HttpStream, ABC):
def __init__(self, parent: HttpStream, **kwargs):
Expand Down
Loading