Skip to content

Commit

Permalink
Connector builder: support for test read with message grouping per sl…
Browse files Browse the repository at this point in the history
…ices (#23925)

* New connector_builder module for handling requests from the Connector Builder.

Also implements `resolve_manifest` handler

* Automated Commit - Formatting Changes

* Rename ConnectorBuilderSource to ConnectorBuilderHandler

* Update source_declarative_manifest README

* Reorganize

* read records

* paste unit tests from connector builder server

* compiles but tests fail

* first test passes

* Second test passes

* 3rd test passes

* one more test

* another test

* one more test

* test

* return StreamRead

* test

* test

* rename

* test

* test

* test

* main seems to work

* Update

* Update

* Update

* Update

* update

* error message

* rename

* update

* Update

* CR improvements

* fix test_source_declarative_manifest

* fix tests

* Update

* Update

* Update

* Update

* rename

* rename

* rename

* format

* Give connector_builder its own main.py

* Update

* reset

* delete dead code

* remove debug print

* update test

* Update

* set right stream

* Add --catalog argument

* Remove unneeded preparse

* Update README

* handle error

* tests pass

* more explicit test

* reset

* format

* fix merge

* raise exception

* fix

* black format

* raise with config

* update

* fix flake

* __test_read_config is optional

* fix

* Automated Commit - Formatting Changes

* fix

* exclude_unset

---------

Co-authored-by: Catherine Noll <noll.catherine@gmail.com>
Co-authored-by: clnoll <clnoll@users.noreply.github.com>
Co-authored-by: girarda <girarda@users.noreply.github.com>
  • Loading branch information
4 people authored and erohmensing committed Mar 22, 2023
1 parent 14389bb commit ab58901
Show file tree
Hide file tree
Showing 8 changed files with 1,015 additions and 39 deletions.
5 changes: 3 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,6 @@ def _emit_legacy_state_format(self, state_obj) -> Union[List[AirbyteStateMessage
return []

# can be overridden to change an input catalog
def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(self._read_json_file(catalog_path))
@classmethod
def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(cls._read_json_file(catalog_path))
32 changes: 29 additions & 3 deletions airbyte-cdk/python/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,45 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import dataclasses
from datetime import datetime
from typing import Any, Mapping

from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, Type
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import Type
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from connector_builder.message_grouper import MessageGrouper


def list_streams() -> AirbyteMessage:
raise NotImplementedError


def stream_read() -> AirbyteMessage:
raise NotImplementedError
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
DEFAULT_MAX_RECORDS = 100


def read_stream(source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog) -> AirbyteMessage:
try:
command_config = config.get("__test_read_config", {})
max_pages_per_slice = command_config.get("max_pages_per_slice", DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
max_slices = command_config.get("max_slices", DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
max_records = command_config.get("max_records", DEFAULT_MAX_RECORDS)
handler = MessageGrouper(max_pages_per_slice, max_slices)
stream_name = configured_catalog.streams[0].stream.name # The connector builder only supports a single stream
stream_read = handler.get_message_groups(source, config, configured_catalog, max_records)
return AirbyteMessage(type=MessageType.RECORD, record=AirbyteRecordMessage(
data=dataclasses.asdict(stream_read),
stream=stream_name,
emitted_at=_emitted_at()
))
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message=f"Error reading stream with config={config} and catalog={configured_catalog}")
return error.as_airbyte_message()


def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
Expand Down
30 changes: 18 additions & 12 deletions airbyte-cdk/python/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,53 @@


import sys
from typing import Any, List, Mapping
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from connector_builder.connector_builder_handler import resolve_manifest
from connector_builder.connector_builder_handler import read_stream, resolve_manifest


def create_source(config: Mapping[str, Any]) -> ManifestDeclarativeSource:
manifest = config.get("__injected_declarative_manifest")
return ManifestDeclarativeSource(manifest)
return ManifestDeclarativeSource(manifest, True)


def get_config_from_args(args: List[str]) -> Mapping[str, Any]:
def get_config_and_catalog_from_args(args: List[str]) -> Tuple[Mapping[str, Any], ConfiguredAirbyteCatalog]:
parsed_args = AirbyteEntrypoint.parse_args(args)
config_path, catalog_path = parsed_args.config, parsed_args.catalog
if parsed_args.command != "read":
raise ValueError("Only read commands are allowed for Connector Builder requests.")

config = BaseConnector.read_config(parsed_args.config)
config = BaseConnector.read_config(config_path)
catalog = ConfiguredAirbyteCatalog.parse_obj(BaseConnector.read_config(catalog_path))

if "__injected_declarative_manifest" not in config:
raise ValueError(
f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
)

return config
return config, catalog


def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any]):
def handle_connector_builder_request(source: ManifestDeclarativeSource, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog):
command = config.get("__command")
if command == "resolve_manifest":
return resolve_manifest(source)
raise ValueError(f"Unrecognized command {command}.")
elif command == "test_read":
return read_stream(source, config, catalog)
else:
raise ValueError(f"Unrecognized command {command}.")


def handle_request(args: List[str]):
config = get_config_from_args(args)
source = create_source(config)
config, catalog = get_config_and_catalog_from_args(args)
if "__command" in config:
return handle_connector_builder_request(source, config).json()
source = create_source(config)
return handle_connector_builder_request(source, config, catalog).json(exclude_unset=True)
else:
raise ValueError("Missing __command argument in config file.")

Expand All @@ -55,4 +61,4 @@ def handle_request(args: List[str]):
except Exception as exc:
error = AirbyteTracedException.from_exception(exc, message="Error handling request.")
m = error.as_airbyte_message()
print(error.as_airbyte_message().json())
print(error.as_airbyte_message().json(exclude_unset=True))
190 changes: 190 additions & 0 deletions airbyte-cdk/python/connector_builder/message_grouper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
import logging
from copy import deepcopy
from json import JSONDecodeError
from typing import Any, Iterable, Iterator, Mapping, Optional, Union
from urllib.parse import parse_qs, urlparse

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer
from airbyte_protocol.models.airbyte_protocol import ConfiguredAirbyteCatalog
from connector_builder.models import HttpRequest, HttpResponse, StreamRead, StreamReadPages, StreamReadSlices


class MessageGrouper:
logger = logging.getLogger("airbyte.connector-builder")

def __init__(self, max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000):
self._max_pages_per_slice = max_pages_per_slice
self._max_slices = max_slices
self._max_record_limit = max_record_limit

def get_message_groups(self,
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
record_limit: Optional[int] = None,
) -> StreamRead:
if record_limit is not None and not (1 <= record_limit <= 1000):
raise ValueError(f"Record limit must be between 1 and 1000. Got {record_limit}")
schema_inferrer = SchemaInferrer()

if record_limit is None:
record_limit = self._max_record_limit
else:
record_limit = min(record_limit, self._max_record_limit)

slices = []
log_messages = []
state = {} # No support for incremental sync
for message_group in self._get_message_groups(
source.read(self.logger, config, configured_catalog, state),
schema_inferrer,
record_limit,
):
if isinstance(message_group, AirbyteLogMessage):
log_messages.append({"message": message_group.message})
else:
slices.append(message_group)

return StreamRead(
logs=log_messages,
slices=slices,
test_read_limit_reached=self._has_reached_limit(slices),
inferred_schema=schema_inferrer.get_stream_schema(configured_catalog.streams[0].stream.name) # The connector builder currently only supports reading from a single stream at a time
)

def _get_message_groups(
self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int
) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
message group, until <limit> records have been read.
Messages received from the CDK read operation will always arrive in the following order:
{type: LOG, log: {message: "request: ..."}}
{type: LOG, log: {message: "response: ..."}}
... 0 or more record messages
{type: RECORD, record: {data: ...}}
{type: RECORD, record: {data: ...}}
Repeats for each request/response made
Note: The exception is that normal log messages can be received at any time which are not incorporated into grouping
"""
records_count = 0
at_least_one_page_in_group = False
current_page_records = []
current_slice_pages = []
current_page_request: Optional[HttpRequest] = None
current_page_response: Optional[HttpResponse] = None

while records_count < limit and (message := next(messages, None)):
if self._need_to_close_page(at_least_one_page_in_group, message):
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records)
current_page_request = None
current_page_response = None

if at_least_one_page_in_group and message.type == Type.LOG and message.log.message.startswith("slice:"):
yield StreamReadSlices(pages=current_slice_pages)
current_slice_pages = []
at_least_one_page_in_group = False
elif message.type == Type.LOG and message.log.message.startswith("request:"):
if not at_least_one_page_in_group:
at_least_one_page_in_group = True
current_page_request = self._create_request_from_log_message(message.log)
elif message.type == Type.LOG and message.log.message.startswith("response:"):
current_page_response = self._create_response_from_log_message(message.log)
elif message.type == Type.LOG:
yield message.log
elif message.type == Type.RECORD:
current_page_records.append(message.record.data)
records_count += 1
schema_inferrer.accumulate(message.record)
else:
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records)
yield StreamReadSlices(pages=current_slice_pages)

@staticmethod
def _need_to_close_page(at_least_one_page_in_group, message):
return (
at_least_one_page_in_group
and message.type == Type.LOG
and (message.log.message.startswith("request:") or message.log.message.startswith("slice:"))
)

@staticmethod
def _close_page(current_page_request, current_page_response, current_slice_pages, current_page_records):
if not current_page_request or not current_page_response:
raise ValueError("Every message grouping should have at least one request and response")

current_slice_pages.append(
StreamReadPages(request=current_page_request, response=current_page_response, records=deepcopy(current_page_records))
)
current_page_records.clear()

def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]:
# TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
# protocol change is worked on.
raw_request = log_message.message.partition("request:")[2]
try:
request = json.loads(raw_request)
url = urlparse(request.get("url", ""))
full_path = f"{url.scheme}://{url.hostname}{url.path}" if url else ""
parameters = parse_qs(url.query) or None
return HttpRequest(
url=full_path,
http_method=request.get("http_method", ""),
headers=request.get("headers"),
parameters=parameters,
body=request.get("body"),
)
except JSONDecodeError as error:
self.logger.warning(f"Failed to parse log message into request object with error: {error}")
return None

def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpResponse]:
# TODO: As a temporary stopgap, the CDK emits response data as a log message string. Ideally this should come in the
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
# protocol change is worked on.
raw_response = log_message.message.partition("response:")[2]
try:
response = json.loads(raw_response)
body = response.get("body", "{}")
return HttpResponse(status=response.get("status_code"), body=body, headers=response.get("headers"))
except JSONDecodeError as error:
self.logger.warning(f"Failed to parse log message into response object with error: {error}")
return None

def _has_reached_limit(self, slices):
if len(slices) >= self._max_slices:
return True

for slice in slices:
if len(slice.pages) >= self._max_pages_per_slice:
return True
return False

@classmethod
def _create_configure_catalog(cls, stream_name: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(
{
"streams": [
{
"stream": {
"name": stream_name,
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
}
)
Loading

0 comments on commit ab58901

Please sign in to comment.