Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connector builder server: Add inferred schema to read API response #20942

Merged
merged 16 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ class StreamRead(BaseModel):

logs: The logs of this StreamRead.
slices: The slices of this StreamRead.
inferred_schema: The inferred_schema of this StreamRead [Optional].
"""

logs: List[object]
slices: List[StreamReadSlices]
inferred_schema: Optional[Dict[str, Any]] = None

StreamRead.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
from urllib.parse import parse_qs, urljoin, urlparse

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type
from fastapi import Body, HTTPException
from jsonschema import ValidationError

from airbyte_cdk.utils.schema_inferrer import SchemaInferrer

from connector_builder.generated.apis.default_api_interface import DefaultApi
from connector_builder.generated.models.http_request import HttpRequest
from connector_builder.generated.models.http_response import HttpResponse
Expand All @@ -21,8 +26,6 @@
from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams
from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter
from fastapi import Body, HTTPException
from jsonschema import ValidationError


class DefaultApiImpl(DefaultApi):
Expand Down Expand Up @@ -108,12 +111,14 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
:return: Airbyte record messages produced by the sync grouped by slice and page
"""
adapter = self._create_low_code_adapter(manifest=stream_read_request_body.manifest)
schema_inferrer = SchemaInferrer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class is stateful right? we'll need to reset it or create a new SchemaInferrer for every read to avoid leaking the type of records produced from a different configuration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this a local variable in the scope of the current read_stream invocation? If that's the case there should be a new schema inferrer for every call of read_stream which is exactly what we want, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

derp. my bad


single_slice = StreamReadSlices(pages=[])
log_messages = []
try:
for message_group in self._get_message_groups(
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config)
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config),
schema_inferrer
):
if isinstance(message_group, AirbyteLogMessage):
log_messages.append({"message": message_group.message})
Expand All @@ -126,9 +131,9 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
detail=f"Could not perform read with with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}",
)

return StreamRead(logs=log_messages, slices=[single_slice])
return StreamRead(logs=log_messages, slices=[single_slice], inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream))

def _get_message_groups(self, messages: Iterable[AirbyteMessage]) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
def _get_message_groups(self, messages: Iterable[AirbyteMessage], schema_inferrer: SchemaInferrer) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
Expand Down Expand Up @@ -165,6 +170,7 @@ def _get_message_groups(self, messages: Iterable[AirbyteMessage]) -> Iterable[Un
yield message.log
elif message.type == Type.RECORD:
current_records.append(message.record.data)
schema_inferrer.accumulate(message.record)
else:
if not current_page_request or not current_page_response:
raise ValueError("Every message grouping should have at least one request and response")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ components:
type: object
description: The STATE AirbyteMessage emitted at the end of this slice. This can be omitted if a stream slicer is not configured.
# $ref: "#/components/schemas/AirbyteProtocol/definitions/AirbyteStateMessage"
inferred_schema:
type: object
description: The narrowest JSON Schema against which every AirbyteRecord in the slices can validate successfully. This is inferred from reading every record in the output slices.
StreamReadRequestBody:
type: object
required:
Expand Down