Skip to content

Commit

Permalink
Connector builder server: Add inferred schema to read API response (#…
Browse files Browse the repository at this point in the history
…20942)

* fix stuff

* add inferred schema to API

* fix yaml changes

* fix yaml formatting

* add whitespace back

* reorder imports
  • Loading branch information
Joe Reuter authored and jbfbell committed Jan 13, 2023
1 parent 00780e9 commit 9a9ca55
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ class StreamRead(BaseModel):
logs: The logs of this StreamRead.
slices: The slices of this StreamRead.
inferred_schema: The inferred_schema of this StreamRead [Optional].
"""

logs: List[object]
slices: List[StreamReadSlices]
inferred_schema: Optional[Dict[str, Any]] = None

StreamRead.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from urllib.parse import parse_qs, urljoin, urlparse

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer

from connector_builder.generated.apis.default_api_interface import DefaultApi
from connector_builder.generated.models.http_request import HttpRequest
from connector_builder.generated.models.http_response import HttpResponse
Expand Down Expand Up @@ -108,12 +110,14 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
:return: Airbyte record messages produced by the sync grouped by slice and page
"""
adapter = self._create_low_code_adapter(manifest=stream_read_request_body.manifest)
schema_inferrer = SchemaInferrer()

single_slice = StreamReadSlices(pages=[])
log_messages = []
try:
for message_group in self._get_message_groups(
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config)
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config),
schema_inferrer
):
if isinstance(message_group, AirbyteLogMessage):
log_messages.append({"message": message_group.message})
Expand All @@ -126,9 +130,9 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
detail=f"Could not perform read with with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}",
)

return StreamRead(logs=log_messages, slices=[single_slice])
return StreamRead(logs=log_messages, slices=[single_slice], inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream))

def _get_message_groups(self, messages: Iterable[AirbyteMessage]) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
def _get_message_groups(self, messages: Iterable[AirbyteMessage], schema_inferrer: SchemaInferrer) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
Expand Down Expand Up @@ -165,6 +169,7 @@ def _get_message_groups(self, messages: Iterable[AirbyteMessage]) -> Iterable[Un
yield message.log
elif message.type == Type.RECORD:
current_records.append(message.record.data)
schema_inferrer.accumulate(message.record)
else:
if not current_page_request or not current_page_response:
raise ValueError("Every message grouping should have at least one request and response")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ components:
type: object
description: The STATE AirbyteMessage emitted at the end of this slice. This can be omitted if a stream slicer is not configured.
# $ref: "#/components/schemas/AirbyteProtocol/definitions/AirbyteStateMessage"
inferred_schema:
type: object
description: The narrowest JSON Schema against which every AirbyteRecord in the slices can validate successfully. This is inferred from reading every record in the output slices.
StreamReadRequestBody:
type: object
required:
Expand Down

0 comments on commit 9a9ca55

Please sign in to comment.