-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connector builder: support for test read with message grouping per slices #23925
Merged
Merged
Changes from 22 commits
Commits
Show all changes
81 commits
Select commit
Hold shift + click to select a range
f829ac5
New connector_builder module for handling requests from the Connector…
clnoll 13a9a14
Automated Commit - Formatting Changes
clnoll 1c85330
Rename ConnectorBuilderSource to ConnectorBuilderHandler
clnoll 0dccb4e
Update source_declarative_manifest README
clnoll f7a475a
Reorganize
clnoll bd71e91
read records
girarda c6ac119
paste unit tests from connector builder server
girarda f949521
compiles but tests fail
girarda 45741cd
first test passes
girarda 0e7d6f4
Second test passes
girarda 81ff6e9
3rd test passes
girarda 14aa8ca
one more test
girarda b3764ba
another test
girarda c6040cc
one more test
girarda 5f0ead1
test
girarda 1614bad
return StreamRead
girarda b323578
test
girarda 9902dd5
test
girarda 3b3255e
rename
girarda 6242e3a
test
girarda c8631f1
test
girarda fe1da29
test
girarda 5b0750c
main seems to work
girarda 7dd5ada
Update
girarda 2f048e9
Update
girarda dd778ed
Update
girarda e79adf8
Update
girarda ab009a6
update
girarda 71f94c1
error message
girarda c4d8b84
rename
girarda 4c26009
update
girarda 31425f0
Update
girarda a7911f5
CR improvements
clnoll 11f35cb
merge
girarda 59fea95
fix test_source_declarative_manifest
girarda b190176
fix tests
girarda 8c51bb1
Update
girarda ad3a9c4
Update
girarda e1e2598
Update
girarda 639accd
Update
girarda 1fbbc8f
rename
girarda bf0175d
rename
girarda 1e6b3d2
rename
girarda 032c44c
format
girarda aea625e
Give connector_builder its own main.py
clnoll 70a9052
merge
girarda 0b15013
Update
girarda 5ad0fba
reset
girarda f782328
delete dead code
girarda 1e9c159
remove debug print
girarda 691e957
update test
girarda 2280924
Update
girarda 754c61c
set right stream
girarda d38a760
Add --catalog argument
clnoll d64cf5d
Remove unneeded preparse
clnoll 8732639
Update README
clnoll a32898a
merge
girarda 0616250
handle error
girarda fa491f7
tests pass
girarda 1e04904
more explicit test
girarda aa2839c
reset
girarda 5d3163f
format
girarda fc9c28c
merge master
girarda 9cd9105
fix merge
girarda e37851e
raise exception
girarda 0b809f9
fix
girarda 33022c9
black format
girarda cccb968
raise with config
girarda a79b43b
update
girarda 8ca4d64
Merge branch 'master' into alex/test_read
girarda 4ace71b
fix flake
girarda e74c040
Merge branch 'alex/test_read' of github.com:airbytehq/airbyte into al…
girarda fa198be
Merge branch 'master' into alex/test_read
girarda 455e65a
__test_read_config is optional
girarda 4059751
Merge branch 'master' into alex/test_read
girarda f78637c
merge
girarda 36152a1
fix
girarda b308f04
Automated Commit - Formatting Changes
girarda 40550b3
fix
girarda ae66445
Merge branch 'alex/test_read' of github.com:airbytehq/airbyte into al…
girarda b1beeb3
exclude_unset
girarda File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Connector Builder Backend | ||
|
||
This is the backend for requests from the [Connector Builder](https://docs.airbyte.com/connector-development/config-based/connector-builder-ui/). | ||
|
||
## Local development | ||
|
||
### Locally running the Connector Builder backend | ||
|
||
``` | ||
python main.py read --config secrets/config.json | ||
``` | ||
|
||
Note: Requires the keys `__injected_declarative_manifest` and `__command` in its config, where `__injected_declarative_manifest` is a JSON manifest and `__command` is one of the commands handled by the ConnectorBuilderHandler (`stream_read`, `list_streams`, or `resolve_manifest`). | ||
|
||
### Locally running the docker image | ||
|
||
#### Build | ||
|
||
First, make sure you build the latest Docker image: | ||
``` | ||
./gradlew airbyte-cdk:python:airbyteDocker | ||
``` | ||
|
||
The docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in the Dockerfile. | ||
|
||
#### Run | ||
|
||
Then run any of the connector commands as follows: | ||
|
||
``` | ||
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-declarative-manifest:dev read --config /secrets/config.json | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# |
299 changes: 299 additions & 0 deletions
299
airbyte-cdk/python/connector_builder/connector_builder_handler.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,299 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from datetime import datetime | ||
from typing import List | ||
|
||
from airbyte_cdk.models import AirbyteRecordMessage | ||
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource | ||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||
|
||
from dataclasses import asdict, dataclass | ||
from copy import deepcopy | ||
import json | ||
from json import JSONDecodeError | ||
from typing import Any, Dict, Iterable, Iterator, Optional, Union | ||
from urllib.parse import parse_qs, urlparse | ||
|
||
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type | ||
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer | ||
import logging | ||
from airbyte_protocol.models.airbyte_protocol import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode, DestinationSyncMode | ||
|
||
|
||
@dataclass | ||
class HttpResponse: | ||
status: int | ||
body: Optional[str] = None | ||
headers: Optional[Dict[str, Any]] = None | ||
|
||
@dataclass | ||
class HttpRequest: | ||
url: str | ||
parameters: Optional[Dict[str, Any]] | ||
body: Optional[Dict[str, Any]] | ||
headers: Optional[Dict[str, Any]] | ||
http_method: str | ||
@dataclass | ||
class StreamReadPages: | ||
records: List[object] | ||
request: Optional[HttpRequest] = None | ||
response: Optional[HttpResponse] = None | ||
|
||
@dataclass | ||
class StreamReadSlicesInnerPagesInner: | ||
|
||
records: List[object] | ||
request: Optional[HttpRequest] | ||
response: Optional[HttpResponse] | ||
|
||
@dataclass | ||
class StreamReadSlicesInnerSliceDescriptor: | ||
start_datetime: Optional[datetime] | ||
list_item: Optional[str] | ||
|
||
@dataclass | ||
class StreamReadSlicesInner: | ||
pages: List[StreamReadSlicesInnerPagesInner] | ||
slice_descriptor: Optional[StreamReadSlicesInnerSliceDescriptor] | ||
state: Optional[Dict[str, Any]] | ||
|
||
@dataclass | ||
class StreamRead(object): | ||
logs: List[object] | ||
slices: List[StreamReadSlicesInner] | ||
test_read_limit_reached: bool | ||
inferred_schema: Optional[Dict[str, Any]] | ||
|
||
@dataclass | ||
class StreamReadRequestBody: | ||
manifest: Dict[str, Any] | ||
stream: str | ||
config: Dict[str, Any] | ||
state: Optional[Dict[str, Any]] | ||
record_limit: Optional[int] | ||
|
||
def __post_init__(self): | ||
print(self.record_limit) | ||
raise ValueError("here") | ||
if not (1 <= self.record_limit <= 1000): | ||
raise ValueError("") #FIXME | ||
|
||
#FIXME: can dataclasses also have validators? | ||
""" | ||
@validator("record_limit") | ||
def record_limit_max(cls, value): | ||
assert value <= 1000 | ||
return value | ||
|
||
@validator("record_limit") | ||
def record_limit_min(cls, value): | ||
assert value >= 1 | ||
return value | ||
""" | ||
|
||
@dataclass | ||
class StreamReadSliceDescriptor: | ||
start_datetime: Optional[datetime] = None | ||
list_item: Optional[str] = None | ||
|
||
@dataclass | ||
class StreamReadSlices: | ||
pages: List[StreamReadPages] | ||
slice_descriptor: Optional[StreamReadSliceDescriptor] = None | ||
state: Optional[Dict[str, Any]] = None | ||
|
||
def list_streams() -> AirbyteRecordMessage: | ||
raise NotImplementedError | ||
|
||
|
||
|
||
class ConnectorBuilderHandler: | ||
logger = logging.getLogger("airbyte.connector-builder") | ||
def __init__(self, max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000): | ||
self._max_pages_per_slice = max_pages_per_slice | ||
self._max_slices = max_slices | ||
self.max_record_limit = max_record_limit | ||
def read_stream( | ||
self, | ||
source: DeclarativeSource, | ||
config: Dict[str, Any], | ||
stream: str, | ||
record_limit: Optional[int] = None, | ||
) -> StreamRead: | ||
if record_limit is not None and not (1 <= record_limit <= 1000): | ||
raise ValueError("") | ||
schema_inferrer = SchemaInferrer() | ||
|
||
if record_limit is None: | ||
record_limit = self.max_record_limit | ||
else: | ||
record_limit = min(record_limit, self.max_record_limit) | ||
|
||
slices = [] | ||
log_messages = [] | ||
state = {} # No support for incremental sync | ||
catalog = _create_configure_catalog(stream) | ||
for message_group in self._get_message_groups( | ||
source.read(self.logger, config, catalog, state), | ||
schema_inferrer, | ||
record_limit, | ||
): | ||
if isinstance(message_group, AirbyteLogMessage): | ||
log_messages.append({"message": message_group.message}) | ||
else: | ||
slices.append(message_group) | ||
|
||
return StreamRead( | ||
logs=log_messages, | ||
slices=slices, | ||
test_read_limit_reached=self._has_reached_limit(slices), | ||
inferred_schema=schema_inferrer.get_stream_schema(stream) | ||
) | ||
|
||
def _get_message_groups( | ||
self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int | ||
) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]: | ||
""" | ||
Message groups are partitioned according to when request log messages are received. Subsequent response log messages | ||
and record messages belong to the prior request log message and when we encounter another request, append the latest | ||
message group, until <limit> records have been read. | ||
|
||
Messages received from the CDK read operation will always arrive in the following order: | ||
{type: LOG, log: {message: "request: ..."}} | ||
{type: LOG, log: {message: "response: ..."}} | ||
... 0 or more record messages | ||
{type: RECORD, record: {data: ...}} | ||
{type: RECORD, record: {data: ...}} | ||
Repeats for each request/response made | ||
|
||
Note: The exception is that normal log messages can be received at any time which are not incorporated into grouping | ||
""" | ||
records_count = 0 | ||
at_least_one_page_in_group = False | ||
current_page_records = [] | ||
current_slice_pages = [] | ||
current_page_request: Optional[HttpRequest] = None | ||
current_page_response: Optional[HttpResponse] = None | ||
|
||
while records_count < limit and (message := next(messages, None)): | ||
if self._need_to_close_page(at_least_one_page_in_group, message): | ||
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) | ||
current_page_request = None | ||
current_page_response = None | ||
|
||
if at_least_one_page_in_group and message.type == Type.LOG and message.log.message.startswith("slice:"): | ||
yield StreamReadSlices(pages=current_slice_pages) | ||
current_slice_pages = [] | ||
at_least_one_page_in_group = False | ||
elif message.type == Type.LOG and message.log.message.startswith("request:"): | ||
if not at_least_one_page_in_group: | ||
at_least_one_page_in_group = True | ||
current_page_request = self._create_request_from_log_message(message.log) | ||
elif message.type == Type.LOG and message.log.message.startswith("response:"): | ||
current_page_response = self._create_response_from_log_message(message.log) | ||
elif message.type == Type.LOG: | ||
yield message.log | ||
elif message.type == Type.RECORD: | ||
print(f"record! {message.record.data}") | ||
current_page_records.append(message.record.data) | ||
records_count += 1 | ||
schema_inferrer.accumulate(message.record) | ||
else: | ||
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records) | ||
yield StreamReadSlices(pages=current_slice_pages) | ||
|
||
@staticmethod | ||
def _need_to_close_page(at_least_one_page_in_group, message): | ||
return ( | ||
at_least_one_page_in_group | ||
and message.type == Type.LOG | ||
and (message.log.message.startswith("request:") or message.log.message.startswith("slice:")) | ||
) | ||
|
||
@staticmethod | ||
def _close_page(current_page_request, current_page_response, current_slice_pages, current_page_records): | ||
if not current_page_request or not current_page_response: | ||
raise ValueError("Every message grouping should have at least one request and response") | ||
|
||
current_slice_pages.append( | ||
StreamReadPages(request=current_page_request, response=current_page_response, records=deepcopy(current_page_records)) | ||
) | ||
current_page_records.clear() | ||
|
||
def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]: | ||
# TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the | ||
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the | ||
# protocol change is worked on. | ||
raw_request = log_message.message.partition("request:")[2] | ||
try: | ||
request = json.loads(raw_request) | ||
url = urlparse(request.get("url", "")) | ||
full_path = f"{url.scheme}://{url.hostname}{url.path}" if url else "" | ||
parameters = parse_qs(url.query) or None | ||
return HttpRequest( | ||
url=full_path, | ||
http_method=request.get("http_method", ""), | ||
headers=request.get("headers"), | ||
parameters=parameters, | ||
body=request.get("body"), | ||
) | ||
except JSONDecodeError as error: | ||
self.logger.warning(f"Failed to parse log message into request object with error: {error}") | ||
return None | ||
|
||
def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpResponse]: | ||
# TODO: As a temporary stopgap, the CDK emits response data as a log message string. Ideally this should come in the | ||
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the | ||
# protocol change is worked on. | ||
raw_response = log_message.message.partition("response:")[2] | ||
try: | ||
response = json.loads(raw_response) | ||
body = response.get("body", "{}") | ||
return HttpResponse(status=response.get("status_code"), body=body, headers=response.get("headers")) | ||
except JSONDecodeError as error: | ||
self.logger.warning(f"Failed to parse log message into response object with error: {error}") | ||
return None | ||
|
||
def _has_reached_limit(self, slices): | ||
if len(slices) >= self._max_slices: | ||
return True | ||
|
||
for slice in slices: | ||
if len(slice.pages) >= self._max_pages_per_slice: | ||
return True | ||
return False | ||
|
||
|
||
def resolve_manifest(source) -> Union[AirbyteMessage, AirbyteRecordMessage]: | ||
try: | ||
return AirbyteRecordMessage( | ||
data={"manifest": source.resolved_manifest}, | ||
emitted_at=_emitted_at(), | ||
stream="", | ||
) | ||
except Exception as exc: | ||
error = AirbyteTracedException.from_exception(exc, message="Error resolving manifest.") | ||
return error.as_airbyte_message() | ||
|
||
|
||
def _emitted_at(): | ||
return int(datetime.now().timestamp()) * 1000 | ||
|
||
def _create_configure_catalog(stream_name: str) -> ConfiguredAirbyteCatalog: | ||
return ConfiguredAirbyteCatalog.parse_obj( | ||
{ | ||
"streams": [ | ||
{ | ||
"stream": { | ||
"name": stream_name, | ||
"json_schema": {}, | ||
"supported_sync_modes": ["full_refresh", "incremental"], | ||
}, | ||
"sync_mode": "full_refresh", | ||
"destination_sync_mode": "overwrite", | ||
} | ||
] | ||
} | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from typing import Iterable, Iterator | ||
|
||
from airbyte_cdk.models import AirbyteMessage | ||
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer | ||
|
||
|
||
class MessageGrouper: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
def get_message_groups(self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int) -> Iterable: #FIXME: set right return type | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
creating the catalog on the Java side would allow us to keep the same read interface as connectors