-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connector builder: support for test read with message grouping per slices #23925
Changes from all commits
f829ac5
13a9a14
1c85330
0dccb4e
f7a475a
bd71e91
c6ac119
f949521
45741cd
0e7d6f4
81ff6e9
14aa8ca
b3764ba
c6040cc
5f0ead1
1614bad
b323578
9902dd5
3b3255e
6242e3a
c8631f1
fe1da29
5b0750c
7dd5ada
2f048e9
dd778ed
e79adf8
ab009a6
71f94c1
c4d8b84
4c26009
31425f0
a7911f5
11f35cb
59fea95
b190176
8c51bb1
ad3a9c4
e1e2598
639accd
1fbbc8f
bf0175d
1e6b3d2
032c44c
aea625e
70a9052
0b15013
5ad0fba
f782328
1e9c159
691e957
2280924
754c61c
d38a760
d64cf5d
8732639
a32898a
0616250
fa491f7
1e04904
aa2839c
5d3163f
fc9c28c
9cd9105
e37851e
0b809f9
33022c9
cccb968
a79b43b
8ca4d64
4ace71b
e74c040
fa198be
455e65a
4059751
f78637c
36152a1
b308f04
40550b3
ae66445
b1beeb3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,19 +2,45 @@ | |
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import dataclasses | ||
from datetime import datetime | ||
from typing import Any, Mapping | ||
|
||
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, Type | ||
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog | ||
from airbyte_cdk.models import Type | ||
from airbyte_cdk.models import Type as MessageType | ||
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource | ||
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource | ||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||
from connector_builder.message_grouper import MessageGrouper | ||
|
||
|
||
def list_streams() -> AirbyteMessage: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should return an AirbyteMessage containing an AirbyteRecord |
||
raise NotImplementedError | ||
|
||
|
||
def stream_read() -> AirbyteMessage: | ||
raise NotImplementedError | ||
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 | ||
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5 | ||
DEFAULT_MAX_RECORDS = 100 | ||
|
||
|
||
def read_stream(source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog) -> AirbyteMessage: | ||
try: | ||
command_config = config.get("__test_read_config", {}) | ||
max_pages_per_slice = command_config.get("max_pages_per_slice", DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE) | ||
max_slices = command_config.get("max_slices", DEFAULT_MAXIMUM_NUMBER_OF_SLICES) | ||
max_records = command_config.get("max_records", DEFAULT_MAX_RECORDS) | ||
handler = MessageGrouper(max_pages_per_slice, max_slices) | ||
stream_name = configured_catalog.streams[0].stream.name # The connector builder only supports a single stream | ||
stream_read = handler.get_message_groups(source, config, configured_catalog, max_records) | ||
return AirbyteMessage(type=MessageType.RECORD, record=AirbyteRecordMessage( | ||
data=dataclasses.asdict(stream_read), | ||
stream=stream_name, | ||
emitted_at=_emitted_at() | ||
)) | ||
except Exception as exc: | ||
error = AirbyteTracedException.from_exception(exc, message=f"Error reading stream with config={config} and catalog={configured_catalog}") | ||
return error.as_airbyte_message() | ||
|
||
|
||
def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,47 +4,53 @@ | |
|
||
|
||
import sys | ||
from typing import Any, List, Mapping | ||
from typing import Any, List, Mapping, Tuple | ||
|
||
from airbyte_cdk.connector import BaseConnector | ||
from airbyte_cdk.entrypoint import AirbyteEntrypoint | ||
from airbyte_cdk.models import ConfiguredAirbyteCatalog | ||
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource | ||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||
from connector_builder.connector_builder_handler import resolve_manifest | ||
from connector_builder.connector_builder_handler import read_stream, resolve_manifest | ||
|
||
|
||
def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource: | ||
manifest = config.get("__injected_declarative_manifest") | ||
return ManifestDeclarativeSource(manifest) | ||
return ManifestDeclarativeSource(manifest, True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set debug to True so the source returns raw requests and responses |
||
|
||
|
||
def get_config_from_args(args: List[str]) -> Mapping[str, Any]: | ||
def get_config_and_catalog_from_args(args: List[str]) -> Tuple[Mapping[str, Any], ConfiguredAirbyteCatalog]: | ||
parsed_args = AirbyteEntrypoint.parse_args(args) | ||
config_path, catalog_path = parsed_args.config, parsed_args.catalog | ||
if parsed_args.command != "read": | ||
raise ValueError("Only read commands are allowed for Connector Builder requests.") | ||
|
||
config = BaseConnector.read_config(parsed_args.config) | ||
config = BaseConnector.read_config(config_path) | ||
catalog = ConfiguredAirbyteCatalog.parse_obj(BaseConnector.read_config(catalog_path)) | ||
|
||
if "__injected_declarative_manifest" not in config: | ||
raise ValueError( | ||
f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" | ||
) | ||
|
||
return config | ||
return config, catalog | ||
|
||
|
||
def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any]): | ||
def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog): | ||
command = config.get("__command") | ||
if command == "resolve_manifest": | ||
return resolve_manifest(source) | ||
raise ValueError(f"Unrecognized command {command}.") | ||
elif command == "test_read": | ||
return read_stream(source, config, catalog) | ||
else: | ||
raise ValueError(f"Unrecognized command {command}.") | ||
|
||
|
||
def handle_request(args: List[str]): | ||
config = get_config_from_args(args) | ||
source = create_source(config) | ||
config, catalog = get_config_and_catalog_from_args(args) | ||
if "__command" in config: | ||
return handle_connector_builder_request(source, config).json() | ||
source = create_source(config) | ||
return handle_connector_builder_request(source, config, catalog).json(exclude_unset=True) | ||
else: | ||
raise ValueError("Missing __command argument in config file.") | ||
|
||
|
@@ -55,4 +61,4 @@ def handle_request(args: List[str]): | |
except Exception as exc: | ||
error = AirbyteTracedException.from_exception(exc, message="Error handling request.") | ||
m = error.as_airbyte_message() | ||
print(error.as_airbyte_message().json()) | ||
print(error.as_airbyte_message().json(exclude_unset=True)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import json | ||
import logging | ||
from copy import deepcopy | ||
from json import JSONDecodeError | ||
from typing import Any, Iterable, Iterator, Mapping, Optional, Union | ||
from urllib.parse import parse_qs, urlparse | ||
|
||
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type | ||
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource | ||
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer | ||
from airbyte_protocol.models.airbyte_protocol import ConfiguredAirbyteCatalog | ||
from connector_builder.models import HttpRequest, HttpResponse, StreamRead, StreamReadPages, StreamReadSlices | ||
|
||
|
||
class MessageGrouper: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
logger = logging.getLogger("airbyte.connector-builder") | ||
|
||
def __init__(self, max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit (also I know this may just be copy/pasted): Would it be preferable to enforce the maximums in this class, instead of allowing them to be fully configurable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the main reason why I didn't enforce maximums here is that we'll need to update the value in two places if we decide to increase it. I think the risk of not enforcing a limit here is negligible because we control the caller |
||
self._max_pages_per_slice = max_pages_per_slice | ||
self._max_slices = max_slices | ||
self._max_record_limit = max_record_limit | ||
|
||
def get_message_groups(self, | ||
source: DeclarativeSource, | ||
config: Mapping[str, Any], | ||
configured_catalog: ConfiguredAirbyteCatalog, | ||
record_limit: Optional[int] = None, | ||
) -> StreamRead: | ||
if record_limit is not None and not (1 <= record_limit <= 1000): | ||
raise ValueError(f"Record limit must be between 1 and 1000. Got {record_limit}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved from a validator because dataclasses don't have builtin validators |
||
schema_inferrer = SchemaInferrer() | ||
|
||
if record_limit is None: | ||
record_limit = self._max_record_limit | ||
else: | ||
record_limit = min(record_limit, self._max_record_limit) | ||
|
||
slices = [] | ||
log_messages = [] | ||
state = {} # No support for incremental sync | ||
for message_group in self._get_message_groups( | ||
source.read(self.logger, config, configured_catalog, state), | ||
schema_inferrer, | ||
record_limit, | ||
): | ||
if isinstance(message_group, AirbyteLogMessage): | ||
log_messages.append({"message": message_group.message}) | ||
else: | ||
slices.append(message_group) | ||
|
||
return StreamRead( | ||
logs=log_messages, | ||
slices=slices, | ||
test_read_limit_reached=self._has_reached_limit(slices), | ||
inferred_schema=schema_inferrer.get_stream_schema(configured_catalog.streams[0].stream.name) # The connector builder currently only supports reading from a single stream at a time | ||
) | ||
|
||
def _get_message_groups( | ||
self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int | ||
) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]: | ||
""" | ||
Message groups are partitioned according to when request log messages are received. Subsequent response log messages | ||
and record messages belong to the prior request log message and when we encounter another request, append the latest | ||
message group, until <limit> records have been read. | ||
|
||
Messages received from the CDK read operation will always arrive in the following order: | ||
{type: LOG, log: {message: "request: ..."}} | ||
{type: LOG, log: {message: "response: ..."}} | ||
... 0 or more record messages | ||
{type: RECORD, record: {data: ...}} | ||
{type: RECORD, record: {data: ...}} | ||
Repeats for each request/response made | ||
|
||
Note: The exception is that normal log messages can be received at any time which are not incorporated into grouping | ||
""" | ||
records_count = 0 | ||
at_least_one_page_in_group = False | ||
current_page_records = [] | ||
current_slice_pages = [] | ||
current_page_request: Optional[HttpRequest] = None | ||
current_page_response: Optional[HttpResponse] = None | ||
|
||
while records_count < limit and (message := next(messages, None)): | ||
if self._need_to_close_page(at_least_one_page_in_group, message): | ||
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) | ||
current_page_request = None | ||
current_page_response = None | ||
|
||
if at_least_one_page_in_group and message.type == Type.LOG and message.log.message.startswith("slice:"): | ||
yield StreamReadSlices(pages=current_slice_pages) | ||
current_slice_pages = [] | ||
at_least_one_page_in_group = False | ||
elif message.type == Type.LOG and message.log.message.startswith("request:"): | ||
if not at_least_one_page_in_group: | ||
at_least_one_page_in_group = True | ||
current_page_request = self._create_request_from_log_message(message.log) | ||
elif message.type == Type.LOG and message.log.message.startswith("response:"): | ||
current_page_response = self._create_response_from_log_message(message.log) | ||
elif message.type == Type.LOG: | ||
yield message.log | ||
elif message.type == Type.RECORD: | ||
current_page_records.append(message.record.data) | ||
records_count += 1 | ||
schema_inferrer.accumulate(message.record) | ||
else: | ||
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) | ||
yield StreamReadSlices(pages=current_slice_pages) | ||
|
||
@staticmethod | ||
def _need_to_close_page(at_least_one_page_in_group, message): | ||
return ( | ||
at_least_one_page_in_group | ||
and message.type == Type.LOG | ||
and (message.log.message.startswith("request:") or message.log.message.startswith("slice:")) | ||
) | ||
|
||
@staticmethod | ||
def _close_page(current_page_request, current_page_response, current_slice_pages, current_page_records): | ||
if not current_page_request or not current_page_response: | ||
raise ValueError("Every message grouping should have at least one request and response") | ||
|
||
current_slice_pages.append( | ||
StreamReadPages(request=current_page_request, response=current_page_response, records=deepcopy(current_page_records)) | ||
) | ||
current_page_records.clear() | ||
|
||
def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]: | ||
# TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the | ||
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the | ||
# protocol change is worked on. | ||
raw_request = log_message.message.partition("request:")[2] | ||
try: | ||
request = json.loads(raw_request) | ||
url = urlparse(request.get("url", "")) | ||
full_path = f"{url.scheme}://{url.hostname}{url.path}" if url else "" | ||
parameters = parse_qs(url.query) or None | ||
return HttpRequest( | ||
url=full_path, | ||
http_method=request.get("http_method", ""), | ||
headers=request.get("headers"), | ||
parameters=parameters, | ||
body=request.get("body"), | ||
) | ||
except JSONDecodeError as error: | ||
self.logger.warning(f"Failed to parse log message into request object with error: {error}") | ||
return None | ||
|
||
def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpResponse]: | ||
# TODO: As a temporary stopgap, the CDK emits response data as a log message string. Ideally this should come in the | ||
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the | ||
# protocol change is worked on. | ||
raw_response = log_message.message.partition("response:")[2] | ||
try: | ||
response = json.loads(raw_response) | ||
body = response.get("body", "{}") | ||
return HttpResponse(status=response.get("status_code"), body=body, headers=response.get("headers")) | ||
except JSONDecodeError as error: | ||
self.logger.warning(f"Failed to parse log message into response object with error: {error}") | ||
return None | ||
|
||
def _has_reached_limit(self, slices): | ||
if len(slices) >= self._max_slices: | ||
return True | ||
|
||
for slice in slices: | ||
if len(slice.pages) >= self._max_pages_per_slice: | ||
return True | ||
return False | ||
|
||
@classmethod | ||
def _create_configure_catalog(cls, stream_name: str) -> ConfiguredAirbyteCatalog: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. creating the catalog on the Java side would allow us to keep the same read interface as connectors |
||
return ConfiguredAirbyteCatalog.parse_obj( | ||
{ | ||
"streams": [ | ||
{ | ||
"stream": { | ||
"name": stream_name, | ||
"json_schema": {}, | ||
"supported_sync_modes": ["full_refresh", "incremental"], | ||
}, | ||
"sync_mode": "full_refresh", | ||
"destination_sync_mode": "overwrite", | ||
} | ||
] | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a classmethod so it can be used before creating the source