Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: Embedded reader utils #28873

Merged
merged 16 commits into from
Aug 3, 2023
Merged
3 changes: 3 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/embedded/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Generic, Iterable, Optional, TypeVar

from airbyte_cdk.connector import TConfig
from airbyte_cdk.sources.embedded.catalog import create_configured_catalog, get_stream, get_stream_names
from airbyte_cdk.sources.embedded.runner import SourceRunner
from airbyte_cdk.sources.embedded.tools import get_defined_id
from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, SyncMode, Type

TOutput = TypeVar("TOutput")


class BaseEmbeddedIntegration(ABC, Generic[TConfig, TOutput]):
def __init__(self, runner: SourceRunner[TConfig], config: TConfig):
self.source = runner
self.config = config

self.last_state: Optional[AirbyteStateMessage] = None

@abstractmethod
def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Optional[TOutput]:
"""
Turn an Airbyte record into the appropriate output type for the integration.
"""
pass

def _load_data(self, stream_name: str, state: Optional[AirbyteStateMessage] = None) -> Iterable[TOutput]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method probably shouldn't be private since it's used by child classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood the underscore as meaning protected, not private (https://jellis18.github.io/post/2022-01-15-access-modifiers-python/#:~:text=Private%20vs%20Protected,and%20not%20a%20private%20value.) - ChatGPT agrees with me as well ;) I'm going to leave it like this for now as you shouldn't call it on the integration from the outside.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting. I guess we don't have many private fields in our codebase 🤷

catalog = self.source.discover(self.config)
stream = get_stream(catalog, stream_name)
if not stream:
raise ValueError(f"Stream {stream_name} not found, the following streams are available: {', '.join(get_stream_names(catalog))}")
if not state or SyncMode.incremental not in stream.supported_sync_modes:
configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.full_refresh)
else:
configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.incremental)

for message in self.source.read(self.config, configured_catalog, state):
if message.type == Type.RECORD:
output = self._handle_record(message.record, get_defined_id(stream, message.record.data))
if output:
yield output
elif message.type is Type.STATE and message.state:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it matters for embedded, but we generally need to output state messages to destination so they can checkpoint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it's just in-memory I think it's OK - if they want the guarantees from checkpointing, they can use hosted Airbyte (not trying to re-build the whole platform here :) )

self.last_state = message.state
50 changes: 50 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import List, Optional

from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
)
from airbyte_cdk.sources.embedded.tools import get_first


def get_stream(catalog: AirbyteCatalog, stream_name: str) -> Optional[AirbyteStream]:
return get_first(catalog.streams, lambda s: s.name == stream_name)


def get_configured_stream(catalog: ConfiguredAirbyteCatalog, stream_name: str) -> Optional[ConfiguredAirbyteStream]:
return get_first(catalog.streams, lambda s: s.name == stream_name)


def get_stream_names(catalog: AirbyteCatalog) -> List[str]:
return [stream.name for stream in catalog.streams]


def to_configured_stream(
stream: AirbyteStream,
sync_mode: SyncMode = SyncMode.full_refresh,
destination_sync_mode: DestinationSyncMode = DestinationSyncMode.append,
cursor_field: Optional[List[str]] = None,
primary_key: Optional[List[List[str]]] = None,
) -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStream(
stream=stream, sync_mode=sync_mode, destination_sync_mode=destination_sync_mode, cursor_field=cursor_field, primary_key=primary_key
)


def to_configured_catalog(configured_streams: List[ConfiguredAirbyteStream]) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog(streams=configured_streams)


def create_configured_catalog(stream: AirbyteStream, sync_mode: SyncMode = SyncMode.full_refresh) -> ConfiguredAirbyteCatalog:
configured_streams = []
configured_streams.append(to_configured_stream(stream, sync_mode=sync_mode, primary_key=stream.source_defined_primary_key))

return to_configured_catalog(configured_streams)
34 changes: 34 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import logging
from abc import ABC, abstractmethod
from typing import Generic, Iterable, Optional

from airbyte_cdk.connector import TConfig
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.abstract_source import AbstractSource


class SourceRunner(ABC, Generic[TConfig]):
@abstractmethod
def discover(self, config: TConfig) -> AirbyteCatalog:
pass

@abstractmethod
def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Optional[AirbyteStateMessage]) -> Iterable[AirbyteMessage]:
pass


class CDKRunner(SourceRunner[TConfig]):
def __init__(self, source: AbstractSource, name: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this have to be an AbstractSource / why not accept any Source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be any source, fixed this

self._source = source
self._logger = logging.getLogger(name)

def discover(self, config: TConfig) -> AirbyteCatalog:
return self._source.discover(self._logger, config)

def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Optional[AirbyteStateMessage]) -> Iterable[AirbyteMessage]:
return self._source.read(self._logger, config, catalog, state=[state] if state else [])
25 changes: 25 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Callable, Dict, Iterable, Optional

from airbyte_cdk.models import AirbyteStream


def get_first(iterable: Iterable[Any], predicate: Callable[[Any], bool] = lambda m: True) -> Optional[Any]:
return next(filter(predicate, iterable), None)


def get_defined_id(stream: AirbyteStream, data: Dict[str, Any]) -> Optional[str]:
import dpath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is dpath imported from the method instead of at the top of the file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


if not stream.source_defined_primary_key:
return None
primary_key = []
for key in stream.source_defined_primary_key:
try:
primary_key.append(str(dpath.util.get(data, key)))
except KeyError:
primary_key.append("__not_found__")
return "_".join(primary_key)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"jsonref~=0.2",
"pendulum",
"genson==1.2.2",
"pydantic~=1.9.2",
"pydantic>=1.9.2,<2.0.0",
"python-dateutil",
"PyYAML>=6.0.1",
"requests",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import unittest
from typing import Any, Mapping, Optional
from unittest.mock import MagicMock

from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration
from airbyte_protocol.models import (
AirbyteCatalog,
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
Level,
SyncMode,
Type,
)


class TestIntegration(BaseEmbeddedIntegration):
def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Mapping[str, Any]:
return {"data": record.data, "id": id}


class EmbeddedIntegrationTestCase(unittest.TestCase):
def setUp(self):
self.source_class = MagicMock()
self.source = MagicMock()
self.source_class.return_value = self.source
self.config = MagicMock()
self.integration = TestIntegration(self.source, self.config)
self.stream1 = AirbyteStream(
name="test",
source_defined_primary_key=[["test"]],
json_schema={},
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
)
self.stream2 = AirbyteStream(name="test2", json_schema={}, supported_sync_modes=[SyncMode.full_refresh])
self.source.discover.return_value = AirbyteCatalog(streams=[self.stream2, self.stream1])

def test_integration(self):
self.source.read.return_value = [
AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="test")),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test", data={"test": 1}, emitted_at=1)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test", data={"test": 2}, emitted_at=2)),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test", data={"test": 3}, emitted_at=3)),
]
result = list(self.integration._load_data("test", None))
self.assertEqual(
result,
[
{"data": {"test": 1}, "id": "1"},
{"data": {"test": 2}, "id": "2"},
{"data": {"test": 3}, "id": "3"},
],
)
self.source.discover.assert_called_once_with(self.config)
self.source.read.assert_called_once_with(
self.config,
ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=self.stream1,
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
primary_key=[["test"]],
)
]
),
None,
)

def test_state(self):
state = AirbyteStateMessage(data={})
self.source.read.return_value = [
AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="test")),
AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test", data={"test": 1}, emitted_at=1)),
AirbyteMessage(type=Type.STATE, state=state),
]
result = list(self.integration._load_data("test", None))
self.assertEqual(
result,
[
{"data": {"test": 1}, "id": "1"},
],
)
self.integration.last_state = state

def test_incremental(self):
state = AirbyteStateMessage(data={})
list(self.integration._load_data("test", state))
self.source.read.assert_called_once_with(
self.config,
ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=self.stream1,
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
primary_key=[["test"]],
)
]
),
state,
)

def test_incremental_unsupported(self):
state = AirbyteStateMessage(data={})
list(self.integration._load_data("test2", state))
self.source.read.assert_called_once_with(
self.config,
ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=self.stream2,
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
)
]
),
state,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.11.4
Relax checking of `oneOf` common property and allow optional `default` keyword additional to `const` keyword.

## 0.11.3
Refactor test_oauth_flow_parameters to validate advanced_auth instead of the deprecated authSpecification

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY connector_acceptance_test ./connector_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.11.3
LABEL io.airbyte.version=0.11.4
LABEL io.airbyte.name=airbyte/connector-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ def test_oneof_usage(self, actual_connector_spec: ConnectorSpecification):
for n, variant in enumerate(variants):
prop_obj = variant["properties"][const_common_prop]
assert (
"default" not in prop_obj
), f"There should not be 'default' keyword in common property {oneof_path}[{n}].{const_common_prop}. Use `const` instead. {docs_msg}"
"default" not in prop_obj or prop_obj["default"] == prop_obj["const"]
), f"'default' needs to be identical to const in common property {oneof_path}[{n}].{const_common_prop}. It's recommended to just use `const`. {docs_msg}"
assert (
"enum" not in prop_obj
), f"There should not be 'enum' keyword in common property {oneof_path}[{n}].{const_common_prop}. Use `const` instead. {docs_msg}"
"enum" not in prop_obj or (len(prop_obj["enum"]) == 1 and prop_obj["enum"][0] == prop_obj["const"])
), f"'enum' needs to be an array with a single item identical to const in common property {oneof_path}[{n}].{const_common_prop}. It's recommended to just use `const`. {docs_msg}"

def test_required(self):
"""Check that connector will fail if any required field is missing"""
Expand Down
Loading