-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SimpleRetriever yield request and response as log messages #18644
Changes from 137 commits
5f88aa6
0de8395
e5acdcf
8a63af7
d0dcb52
d021da6
dc9ff7b
adf07fa
c68e01d
258990d
b0f80a7
68528b2
b31f9bd
893beab
c3964c9
c69774a
152f5df
8ea5e85
1684823
b08a9c4
4fd9ed0
06b1c70
3e4b135
4702f4d
ae7b7fb
3d86bce
9dc4486
d3852f3
179d689
65c416c
b13fe7a
0fff056
01dcd2a
5d46bb0
600c195
0b48fea
613e876
ea2089d
e2d11b3
d901e1c
0600e59
82e46c5
235f9d4
50bb3ed
608c074
acfcec8
3fa6a00
048508d
3e85dca
6d9d0aa
464b247
4acd199
0c36dcb
46a807e
bad305b
d78628f
7fd5cc9
0cbd28e
a1a139e
e12dad8
b1d62cd
1da2ed1
5dac7c2
883b400
f27919f
a6f6ebb
23c3f2c
594ad06
2d91e9a
cff30a1
5833c84
102640f
319fdbe
610d4ec
bbc7c57
2f91b80
3fc697c
62d95eb
003b860
27150ba
26c3289
b36842a
8df28d9
a6f9fb8
75f525b
ce03775
2ac84c8
19da01c
a1106cb
42b61d7
f81a093
632ee94
76a064a
415a0e4
076412a
4f1c005
7196c22
7cec110
ad7128f
9a1760a
390ad5c
85d3998
aec12ca
9f86498
b0f71b8
3459e62
c0233bb
1c142e5
b346d4f
456f8ea
43d6e22
46c5b5a
f8d5664
0e16a90
01a90aa
73a52dd
aa03ee8
132028e
94c3af1
d8e38aa
880e08e
7a0f6ae
948a372
26565d4
6ca1a5a
58c05f1
9b42f31
21845ee
87bcac0
8a5e16c
a8a8a2d
9060419
64bb445
e9a867c
85979a8
846bed7
b114add
adfb495
3dbeeb3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,9 +8,15 @@ | |
import typing | ||
from dataclasses import dataclass, fields | ||
from enum import Enum, EnumMeta | ||
from typing import Any, List, Mapping, Union | ||
|
||
from airbyte_cdk.models import ConnectorSpecification | ||
from typing import Any, Iterator, List, Mapping, MutableMapping, Union | ||
|
||
from airbyte_cdk.models import ( | ||
AirbyteConnectionStatus, | ||
AirbyteMessage, | ||
AirbyteStateMessage, | ||
ConfiguredAirbyteCatalog, | ||
ConnectorSpecification, | ||
) | ||
from airbyte_cdk.sources.declarative.checks import CheckStream | ||
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker | ||
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource | ||
|
@@ -35,12 +41,14 @@ class ManifestDeclarativeSource(DeclarativeSource): | |
|
||
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"} | ||
|
||
def __init__(self, source_config: ConnectionDefinition): | ||
def __init__(self, source_config: ConnectionDefinition, debug: bool = False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @brianjlai the connector-builder server can set this field to true |
||
""" | ||
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector | ||
:param debug(bool): True if debug mode is enabled | ||
""" | ||
self.logger = logging.getLogger(f"airbyte.{self.name}") | ||
self._source_config = source_config | ||
self._debug = debug | ||
self._factory = DeclarativeComponentFactory() | ||
|
||
self._validate_source() | ||
|
@@ -73,6 +81,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: | |
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" | ||
in the project root. | ||
""" | ||
self._configure_logger_level(logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set log level to debug if debug mode is enabled from the constructor |
||
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}) | ||
|
||
spec = self._source_config.get("spec") | ||
|
@@ -84,6 +93,27 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: | |
else: | ||
return super().spec(logger) | ||
|
||
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: | ||
self._configure_logger_level(logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set log level to debug if debug mode is enabled from the constructor |
||
return super().check(logger, config) | ||
|
||
def read( | ||
self, | ||
logger: logging.Logger, | ||
config: Mapping[str, Any], | ||
catalog: ConfiguredAirbyteCatalog, | ||
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None, | ||
) -> Iterator[AirbyteMessage]: | ||
self._configure_logger_level(logger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set log level to debug if debug mode is enabled from the constructor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for the next breaking change, i feel like we should remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created a CDK v2 wishlist #19239 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set log level to debug if debug mode is enabled |
||
yield from super().read(logger, config, catalog, state) | ||
|
||
def _configure_logger_level(self, logger: logging.Logger): | ||
""" | ||
Set the log level to logging.DEBUG if debug mode is enabled | ||
""" | ||
if self._debug: | ||
logger.setLevel(logging.DEBUG) | ||
|
||
def _validate_source(self): | ||
full_config = {} | ||
if "version" in self._source_config: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
from dataclasses import InitVar, dataclass | ||
from typing import Any, Iterable, List, Mapping, Optional | ||
|
||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type | ||
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType | ||
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer | ||
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState | ||
|
@@ -138,6 +138,12 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera | |
for parent_record in parent_stream.read_records( | ||
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None | ||
): | ||
# Skip non-records (eg AirbyteLogMessage) | ||
if isinstance(parent_record, AirbyteMessage): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to check the type of the output of |
||
if parent_record.type == Type.RECORD: | ||
parent_record = parent_record.record.data | ||
else: | ||
continue | ||
empty_parent_slice = False | ||
stream_state_value = parent_record.get(parent_field) | ||
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,13 +7,13 @@ | |
import os | ||
from abc import ABC, abstractmethod | ||
from contextlib import suppress | ||
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union | ||
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union | ||
from urllib.parse import urljoin | ||
|
||
import requests | ||
import requests_cache | ||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources.streams.core import Stream | ||
from airbyte_cdk.sources.streams.core import Stream, StreamData | ||
from requests.auth import AuthBase | ||
from requests_cache.session import CachedSession | ||
|
||
|
@@ -408,24 +408,25 @@ def read_records( | |
cursor_field: List[str] = None, | ||
stream_slice: Mapping[str, Any] = None, | ||
stream_state: Mapping[str, Any] = None, | ||
) -> Iterable[Mapping[str, Any]]: | ||
) -> Iterable[StreamData]: | ||
yield from self._read_pages( | ||
sherifnada marked this conversation as resolved.
Show resolved
Hide resolved
|
||
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state | ||
) | ||
|
||
def _read_pages( | ||
sherifnada marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self, | ||
records_generator_fn: Callable[ | ||
[requests.PreparedRequest, requests.Response, Mapping[str, Any], Mapping[str, Any]], Iterable[StreamData] | ||
], | ||
stream_slice: Mapping[str, Any] = None, | ||
stream_state: Mapping[str, Any] = None, | ||
) -> Iterable[StreamData]: | ||
stream_state = stream_state or {} | ||
pagination_complete = False | ||
|
||
next_page_token = None | ||
while not pagination_complete: | ||
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) | ||
request = self._create_prepared_request( | ||
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
headers=dict(request_headers, **self.authenticator.get_auth_header()), | ||
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
) | ||
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) | ||
|
||
response = self._send_request(request, request_kwargs) | ||
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) | ||
request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
yield from records_generator_fn(request, response, stream_state, stream_slice) | ||
|
||
next_page_token = self.next_page_token(response) | ||
if not next_page_token: | ||
|
@@ -434,6 +435,22 @@ def read_records( | |
# Always return an empty generator just in case no records were ever yielded | ||
yield from [] | ||
|
||
def _fetch_next_page( | ||
self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None | ||
) -> Tuple[requests.PreparedRequest, requests.Response]: | ||
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
request = self._create_prepared_request( | ||
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
headers=dict(request_headers, **self.authenticator.get_auth_header()), | ||
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), | ||
) | ||
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) | ||
|
||
response = self._send_request(request, request_kwargs) | ||
return request, response | ||
|
||
|
||
class HttpSubStream(HttpStream, ABC): | ||
def __init__(self, parent: HttpStream, **kwargs): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract logic to convert to an AirbyteMessage to
_get_message
to avoid duplication between_read_incremental
and_read_full_refresh