From 35fe28a518c850e5512669eff3e28fae4739d31d Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 31 Jul 2023 13:31:13 +0200 Subject: [PATCH 01/11] relax pydantic dep --- airbyte-cdk/python/setup.py | 2 +- .../connector-acceptance-test/CHANGELOG.md | 3 + .../connector-acceptance-test/Dockerfile | 2 +- .../tests/test_core.py | 8 +-- .../unit_tests/test_spec.py | 70 +++++++++++++++++++ 5 files changed, 79 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 23983cd995df..f7de104d3131 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -58,7 +58,7 @@ "jsonref~=0.2", "pendulum", "genson==1.2.2", - "pydantic~=1.9.2", + "pydantic>=1.9.2,<2.0.0", "python-dateutil", "PyYAML>=6.0.1", "requests", diff --git a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md index 7fe65d7ad06a..a0c5f9891d45 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.11.4 +Relax checking of `oneOf` common property and allow optional `default` keyword additional to `const` keyword. + ## 0.11.3 Refactor test_oauth_flow_parameters to validate advanced_auth instead of the deprecated authSpecification diff --git a/airbyte-integrations/bases/connector-acceptance-test/Dockerfile b/airbyte-integrations/bases/connector-acceptance-test/Dockerfile index ceeb4bf0a596..d0312c8b0840 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/connector-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY connector_acceptance_test ./connector_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.11.3 +LABEL io.airbyte.version=0.11.4 LABEL io.airbyte.name=airbyte/connector-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index 88b05c116612..56184e7b0b35 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -182,11 +182,11 @@ def test_oneof_usage(self, actual_connector_spec: ConnectorSpecification): for n, variant in enumerate(variants): prop_obj = variant["properties"][const_common_prop] assert ( - "default" not in prop_obj - ), f"There should not be 'default' keyword in common property {oneof_path}[{n}].{const_common_prop}. Use `const` instead. {docs_msg}" + "default" not in prop_obj or prop_obj["default"] == prop_obj["const"] + ), f"'default' needs to be identical to const in common property {oneof_path}[{n}].{const_common_prop}. It's recommended to just use `const`. {docs_msg}" assert ( - "enum" not in prop_obj - ), f"There should not be 'enum' keyword in common property {oneof_path}[{n}].{const_common_prop}. Use `const` instead. {docs_msg}" + "enum" not in prop_obj or (len(prop_obj["enum"]) == 1 and prop_obj["enum"][0] == prop_obj["const"]) + ), f"'enum' needs to be an array with a single item identical to const in common property {oneof_path}[{n}].{const_common_prop}. It's recommended to just use `const`. {docs_msg}" def test_required(self): """Check that connector will fail if any required field is missing""" diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_spec.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_spec.py index 99f5a1bfd9b6..757f544a68c5 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_spec.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_spec.py @@ -414,6 +414,76 @@ def parametrize_test_case(*test_cases: Dict[str, Any]) -> Callable: } }, }, + "should_fail": False, + }, + { + "test_id": "different_default_in_common_property", + "connector_spec": { + "type": "object", + "properties": { + "credentials": { + "type": "object", + "oneOf": [ + { + "type": "object", + "properties": { + "common": {"type": "string", "const": "option1", "default": "optionX"}, + "option1": {"type": "string"}, + }, + } + ], + } + }, + }, + "should_fail": True, + }, + { + "test_id": "enum_keyword_in_common_property", + "connector_spec": { + "type": "object", + "properties": { + "credentials": { + "type": "object", + "oneOf": [ + { + "type": "object", + "properties": { + "common": {"type": "string", "const": "option1", "enum": ["option1"]}, + "option1": {"type": "string"}, + }, + }, + { + "type": "object", + "properties": { + "common": {"type": "string", "const": "option2", "enum": ["option2"]}, + "option2": {"type": "string"}, + }, + }, + ], + } + }, + }, + "should_fail": False, + }, + { + "test_id": "different_enum_in_common_property", + "connector_spec": { + "type": "object", + "properties": { + "credentials": { + "type": "object", + "oneOf": [ + { + "type": "object", + "properties": { + "common": {"type": "string", "const": "option1", "enum": ["option1", "option2"]}, + "option1": {"type": "string"}, + }, + } + ], + } + }, + }, "should_fail": True, }, ) From b96bb2900634ea255b9368ecaa9e8e7066d83a84 Mon Sep 17 00:00:00 2001 From: flash1293 Date: Mon, 31 Jul 2023 11:55:26 +0000 Subject: [PATCH 02/11] Automated Commit - Format and Process Resources Changes --- .../connector_acceptance_test/tests/test_core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index 56184e7b0b35..ff8e8a240a33 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -184,8 +184,8 @@ def test_oneof_usage(self, actual_connector_spec: ConnectorSpecification): assert ( "default" not in prop_obj or prop_obj["default"] == prop_obj["const"] ), f"'default' needs to be identical to const in common property {oneof_path}[{n}].{const_common_prop}. It's recommended to just use `const`. {docs_msg}" - assert ( - "enum" not in prop_obj or (len(prop_obj["enum"]) == 1 and prop_obj["enum"][0] == prop_obj["const"]) + assert "enum" not in prop_obj or ( + len(prop_obj["enum"]) == 1 and prop_obj["enum"][0] == prop_obj["const"] ), f"'enum' needs to be an array with a single item identical to const in common property {oneof_path}[{n}].{const_common_prop}. It's recommended to just use `const`. {docs_msg}" def test_required(self): From 7bc3fb06875610262293a64f6b958336bd1862c4 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 31 Jul 2023 19:49:59 +0200 Subject: [PATCH 03/11] wip --- .../sources/embedded/base_integration.py | 44 ++++++++++++++++ .../airbyte_cdk/sources/embedded/catalog.py | 51 +++++++++++++++++++ .../airbyte_cdk/sources/embedded/runner.py | 18 +++++++ .../airbyte_cdk/sources/embedded/tools.py | 24 +++++++++ .../embedded/test_embedded_integration.py | 38 ++++++++++++++ 5 files changed, 175 insertions(+) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py create mode 100644 airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py new file mode 100644 index 000000000000..2db77f12814b --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py @@ -0,0 +1,44 @@ +from abc import ABC, abstractmethod +from typing import Generic, Iterable, List, Optional, TypeVar + +from airbyte_cdk.connector import TConfig +from airbyte_cdk.sources.source import TState +from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, Type, SyncMode + +from airbyte_cdk.sources.embedded.catalog import create_configured_catalog, get_stream_names, retrieve_catalog, get_stream +from airbyte_cdk.sources.embedded.tools import get_defined_id +from airbyte_cdk.sources.embedded.runner import SourceRunner + +TOutput = TypeVar("TOutput") + + +class BaseEmbeddedIntegration(ABC, Generic[TConfig, TState, TOutput]): + def __init__(self, source: SourceRunner[TConfig, TState], config: TConfig): + self.source = source + self.config = config + + self.last_state: Optional[AirbyteStateMessage] = None + + @abstractmethod + def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Optional[TOutput]: + """ + Turn an Airbyte record into the appropriate output type for the integration. + """ + pass + + def _load_data(self, stream: str, state: Optional[TState]) -> Iterable[TOutput]: + catalog = self.source.discover(self.config) + if not state: + configured_catalog = create_configured_catalog([stream], catalog, sync_mode=SyncMode.full_refresh) + else: + configured_catalog = create_configured_catalog([stream], catalog, sync_mode=SyncMode.incremental) + + stream = get_stream(catalog, stream) + + for message in self.source.read(self.config, configured_catalog, state): + if message.type == Type.RECORD: + output = self._handle_record(message.record, get_defined_id(stream, message.record.data)) + if output: + yield output + elif message.type is Type.STATE and message.state: + self.last_state = message.state diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py new file mode 100644 index 000000000000..56cc624a1fc7 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py @@ -0,0 +1,51 @@ +from typing import List, Optional + +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + SyncMode, + Type, +) + +from airbyte_cdk.connector import TConfig +from airbyte_cdk.sources.embedded.tools import get_first + + +def get_stream(catalog: AirbyteCatalog, stream_name: str) -> Optional[AirbyteStream]: + return get_first(catalog.streams, lambda s: s.name == stream_name) + +def get_configured_stream(catalog: ConfiguredAirbyteCatalog, stream_name: str) -> Optional[ConfiguredAirbyteStream]: + return get_first(catalog.streams, lambda s: s.name == stream_name) + + +def get_stream_names(catalog: AirbyteCatalog) -> List[str]: + return [stream.name for stream in catalog.streams] + + +def to_configured_stream( + stream: AirbyteStream, + sync_mode: SyncMode = SyncMode.full_refresh, + destination_sync_mode: DestinationSyncMode = DestinationSyncMode.append, + cursor_field: Optional[List[str]] = None, + primary_key: Optional[List[List[str]]] = None, +) -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=stream, sync_mode=sync_mode, destination_sync_mode=destination_sync_mode, cursor_field=cursor_field, primary_key=primary_key + ) + + +def to_configured_catalog(configured_streams: List[ConfiguredAirbyteStream]) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog(streams=configured_streams) + + +def create_configured_catalog(stream_names: List[str], catalog: AirbyteCatalog, sync_mode: SyncMode = SyncMode.full_refresh) -> ConfiguredAirbyteCatalog: + configured_streams = [] + + for stream_name in stream_names: + stream = get_stream(catalog, stream_name) + configured_streams.append(to_configured_stream(stream, sync_mode=sync_mode)) + + return to_configured_catalog(configured_streams) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py new file mode 100644 index 000000000000..0bca4433f3f7 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py @@ -0,0 +1,18 @@ + +from abc import ABC, abstractmethod +from typing import Generic, Iterable, Optional +from logging import Logger + +from airbyte_cdk.sources.source import TState, Source +from airbyte_cdk.models import AirbyteConnectionStatus, ConfiguredAirbyteCatalog, AirbyteCatalog, AirbyteMessage +from airbyte_cdk.connector import TConfig + + +class SourceRunner(ABC, Generic[TConfig, TState]): + @abstractmethod + def discover(self, config: TConfig) -> AirbyteCatalog: + pass + + @abstractmethod + def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Optional[TState]) -> Iterable[AirbyteMessage]: + pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py new file mode 100644 index 000000000000..31f8257f1a60 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py @@ -0,0 +1,24 @@ +import json +from json import JSONDecodeError +from pathlib import Path +from typing import Any, Callable, Dict, Iterable, Optional, Union + +from airbyte_cdk.models import AirbyteMessage, Type, ConfiguredAirbyteStream +from pydantic.main import BaseModel + + +def get_first(iterable: Iterable[Any], predicate: Callable[[Any], bool] = lambda m: True) -> Optional[Any]: + return next(filter(predicate, iterable), None) + +def get_defined_id(stream: ConfiguredAirbyteStream, data: Dict[str, Any]) -> Optional[str]: + import dpath + + if not stream.primary_key: + return None + primary_key = [] + for key in stream.primary_key: + try: + primary_key.append(str(dpath.util.get(data, key))) + except KeyError: + primary_key.append("__not_found__") + return "_".join(primary_key) diff --git a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py new file mode 100644 index 000000000000..72dbacd2676a --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py @@ -0,0 +1,38 @@ + +from typing import Any, Mapping, Optional +import unittest +from unittest.mock import MagicMock +from airbyte_protocol.models import AirbyteRecordMessage, AirbyteLogMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode, DestinationSyncMode +from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration + +class TestIntegration(BaseEmbeddedIntegration): + def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Mapping[str, Any]: + return {"data": record.data, "id": id} + +class EmbeddedIntegrationTestCase(unittest.TestCase): + def test_integration(self): + source_class = MagicMock() + source = MagicMock() + source_class.return_value = source + config = MagicMock() + integration = TestIntegration(source, config) + stream1 = MagicMock(name="test", source_defined_primary_key=[["test"]]) + stream2 = MagicMock(name="test2") + source.discover.return_value = MagicMock(streams=[stream2, stream1]) + source.read.return_value = [ + AirbyteLogMessage(level="info", message="test"), + AirbyteRecordMessage(stream="test", data={"test": 1}), + AirbyteRecordMessage(stream="test", data={"test": 2}), + AirbyteRecordMessage(stream="test", data={"test": 3}), + ] + result = [integration._load_data("test", None)] + self.assertEqual( + result, + [ + {"data": {"test": 1}, "id": "1"}, + {"data": {"test": 2}, "id": "2"}, + {"data": {"test": 3}, "id": "3"}, + ], + ) + source.discover.assert_called_once_with(config) + source.read.assert_called_once_with(config, ConfiguredAirbyteCatalog(streams=ConfiguredAirbyteStream(stream=stream1, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append, primary_key=[["test"]])), None) \ No newline at end of file From abbc62000067db5214dfd4858caaaed8e963822f Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 1 Aug 2023 11:33:28 +0200 Subject: [PATCH 04/11] wrap up base integration --- .../sources/embedded/base_integration.py | 28 ++-- .../airbyte_cdk/sources/embedded/catalog.py | 15 +-- .../airbyte_cdk/sources/embedded/runner.py | 9 +- .../airbyte_cdk/sources/embedded/tools.py | 19 +-- .../embedded/test_embedded_integration.py | 126 +++++++++++++++--- 5 files changed, 147 insertions(+), 50 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py index 2db77f12814b..7cdd60d48a4b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py @@ -1,13 +1,16 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + from abc import ABC, abstractmethod -from typing import Generic, Iterable, List, Optional, TypeVar +from typing import Generic, Iterable, Optional, TypeVar from airbyte_cdk.connector import TConfig -from airbyte_cdk.sources.source import TState -from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, Type, SyncMode - -from airbyte_cdk.sources.embedded.catalog import create_configured_catalog, get_stream_names, retrieve_catalog, get_stream -from airbyte_cdk.sources.embedded.tools import get_defined_id +from airbyte_cdk.sources.embedded.catalog import create_configured_catalog, get_stream from airbyte_cdk.sources.embedded.runner import SourceRunner +from airbyte_cdk.sources.embedded.tools import get_defined_id +from airbyte_cdk.sources.source import TState +from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, SyncMode, Type TOutput = TypeVar("TOutput") @@ -26,14 +29,15 @@ def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Opt """ pass - def _load_data(self, stream: str, state: Optional[TState]) -> Iterable[TOutput]: + def _load_data(self, stream_name: str, state: Optional[TState]) -> Iterable[TOutput]: catalog = self.source.discover(self.config) - if not state: - configured_catalog = create_configured_catalog([stream], catalog, sync_mode=SyncMode.full_refresh) + stream = get_stream(catalog, stream_name) + if not stream: + raise ValueError(f"Stream {stream_name} not found") + if not state or SyncMode.incremental not in stream.supported_sync_modes: + configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.full_refresh) else: - configured_catalog = create_configured_catalog([stream], catalog, sync_mode=SyncMode.incremental) - - stream = get_stream(catalog, stream) + configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.incremental) for message in self.source.read(self.config, configured_catalog, state): if message.type == Type.RECORD: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py index 56cc624a1fc7..cd47efd4d411 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py @@ -1,3 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + from typing import List, Optional from airbyte_cdk.models import ( @@ -7,16 +11,14 @@ ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, - Type, ) - -from airbyte_cdk.connector import TConfig from airbyte_cdk.sources.embedded.tools import get_first def get_stream(catalog: AirbyteCatalog, stream_name: str) -> Optional[AirbyteStream]: return get_first(catalog.streams, lambda s: s.name == stream_name) + def get_configured_stream(catalog: ConfiguredAirbyteCatalog, stream_name: str) -> Optional[ConfiguredAirbyteStream]: return get_first(catalog.streams, lambda s: s.name == stream_name) @@ -41,11 +43,8 @@ def to_configured_catalog(configured_streams: List[ConfiguredAirbyteStream]) -> return ConfiguredAirbyteCatalog(streams=configured_streams) -def create_configured_catalog(stream_names: List[str], catalog: AirbyteCatalog, sync_mode: SyncMode = SyncMode.full_refresh) -> ConfiguredAirbyteCatalog: +def create_configured_catalog(stream: AirbyteStream, sync_mode: SyncMode = SyncMode.full_refresh) -> ConfiguredAirbyteCatalog: configured_streams = [] - - for stream_name in stream_names: - stream = get_stream(catalog, stream_name) - configured_streams.append(to_configured_stream(stream, sync_mode=sync_mode)) + configured_streams.append(to_configured_stream(stream, sync_mode=sync_mode, primary_key=stream.source_defined_primary_key)) return to_configured_catalog(configured_streams) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py index 0bca4433f3f7..ffb987325c2f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py @@ -1,11 +1,14 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + from abc import ABC, abstractmethod from typing import Generic, Iterable, Optional -from logging import Logger -from airbyte_cdk.sources.source import TState, Source -from airbyte_cdk.models import AirbyteConnectionStatus, ConfiguredAirbyteCatalog, AirbyteCatalog, AirbyteMessage from airbyte_cdk.connector import TConfig +from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.sources.source import TState class SourceRunner(ABC, Generic[TConfig, TState]): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py index 31f8257f1a60..97ef631169fe 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py @@ -1,22 +1,23 @@ -import json -from json import JSONDecodeError -from pathlib import Path -from typing import Any, Callable, Dict, Iterable, Optional, Union +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# -from airbyte_cdk.models import AirbyteMessage, Type, ConfiguredAirbyteStream -from pydantic.main import BaseModel +from typing import Any, Callable, Dict, Iterable, Optional + +from airbyte_cdk.models import AirbyteStream def get_first(iterable: Iterable[Any], predicate: Callable[[Any], bool] = lambda m: True) -> Optional[Any]: return next(filter(predicate, iterable), None) -def get_defined_id(stream: ConfiguredAirbyteStream, data: Dict[str, Any]) -> Optional[str]: + +def get_defined_id(stream: AirbyteStream, data: Dict[str, Any]) -> Optional[str]: import dpath - if not stream.primary_key: + if not stream.source_defined_primary_key: return None primary_key = [] - for key in stream.primary_key: + for key in stream.source_defined_primary_key: try: primary_key.append(str(dpath.util.get(data, key))) except KeyError: diff --git a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py index 72dbacd2676a..b205818a7e44 100644 --- a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py +++ b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py @@ -1,31 +1,57 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# -from typing import Any, Mapping, Optional import unittest +from typing import Any, Mapping, Optional from unittest.mock import MagicMock -from airbyte_protocol.models import AirbyteRecordMessage, AirbyteLogMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode, DestinationSyncMode + from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration +from airbyte_protocol.models import ( + AirbyteCatalog, + AirbyteLogMessage, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStateMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Level, + SyncMode, + Type, +) + class TestIntegration(BaseEmbeddedIntegration): def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Mapping[str, Any]: return {"data": record.data, "id": id} + class EmbeddedIntegrationTestCase(unittest.TestCase): + def setUp(self): + self.source_class = MagicMock() + self.source = MagicMock() + self.source_class.return_value = self.source + self.config = MagicMock() + self.integration = TestIntegration(self.source, self.config) + self.stream1 = AirbyteStream( + name="test", + source_defined_primary_key=[["test"]], + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + ) + self.stream2 = AirbyteStream(name="test2", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]) + self.source.discover.return_value = AirbyteCatalog(streams=[self.stream2, self.stream1]) + def test_integration(self): - source_class = MagicMock() - source = MagicMock() - source_class.return_value = source - config = MagicMock() - integration = TestIntegration(source, config) - stream1 = MagicMock(name="test", source_defined_primary_key=[["test"]]) - stream2 = MagicMock(name="test2") - source.discover.return_value = MagicMock(streams=[stream2, stream1]) - source.read.return_value = [ - AirbyteLogMessage(level="info", message="test"), - AirbyteRecordMessage(stream="test", data={"test": 1}), - AirbyteRecordMessage(stream="test", data={"test": 2}), - AirbyteRecordMessage(stream="test", data={"test": 3}), + self.source.read.return_value = [ + AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="test")), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test", data={"test": 1}, emitted_at=1)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test", data={"test": 2}, emitted_at=2)), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test", data={"test": 3}, emitted_at=3)), ] - result = [integration._load_data("test", None)] + result = list(self.integration._load_data("test", None)) self.assertEqual( result, [ @@ -34,5 +60,69 @@ def test_integration(self): {"data": {"test": 3}, "id": "3"}, ], ) - source.discover.assert_called_once_with(config) - source.read.assert_called_once_with(config, ConfiguredAirbyteCatalog(streams=ConfiguredAirbyteStream(stream=stream1, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append, primary_key=[["test"]])), None) \ No newline at end of file + self.source.discover.assert_called_once_with(self.config) + self.source.read.assert_called_once_with( + self.config, + ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=self.stream1, + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + primary_key=[["test"]], + ) + ] + ), + None, + ) + + def test_state(self): + state = AirbyteStateMessage(data={}) + self.source.read.return_value = [ + AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="test")), + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test", data={"test": 1}, emitted_at=1)), + AirbyteMessage(type=Type.STATE, state=state), + ] + result = list(self.integration._load_data("test", None)) + self.assertEqual( + result, + [ + {"data": {"test": 1}, "id": "1"}, + ], + ) + self.integration.last_state = state + + def test_incremental(self): + state = AirbyteStateMessage(data={}) + list(self.integration._load_data("test", state)) + self.source.read.assert_called_once_with( + self.config, + ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=self.stream1, + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + primary_key=[["test"]], + ) + ] + ), + state, + ) + + def test_incremental_unsupported(self): + state = AirbyteStateMessage(data={}) + list(self.integration._load_data("test2", state)) + self.source.read.assert_called_once_with( + self.config, + ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=self.stream2, + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + ) + ] + ), + state, + ) From fd28a42e550992bbf3a9dd48516988bf8eea7a89 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 1 Aug 2023 12:57:33 +0200 Subject: [PATCH 05/11] add init file --- airbyte-cdk/python/airbyte_cdk/sources/embedded/__init__.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/embedded/__init__.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/__init__.py new file mode 100644 index 000000000000..46b7376756ec --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# From e2899721b9cf82f4ff4c497abfbdc49be35d8f2e Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 1 Aug 2023 15:19:33 +0200 Subject: [PATCH 06/11] introduce CDK runner and improve error message --- .../sources/embedded/base_integration.py | 13 ++++++------ .../airbyte_cdk/sources/embedded/runner.py | 21 +++++++++++++++---- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py index 7cdd60d48a4b..912798eb1b43 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py @@ -6,18 +6,17 @@ from typing import Generic, Iterable, Optional, TypeVar from airbyte_cdk.connector import TConfig -from airbyte_cdk.sources.embedded.catalog import create_configured_catalog, get_stream +from airbyte_cdk.sources.embedded.catalog import create_configured_catalog, get_stream, get_stream_names from airbyte_cdk.sources.embedded.runner import SourceRunner from airbyte_cdk.sources.embedded.tools import get_defined_id -from airbyte_cdk.sources.source import TState from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, SyncMode, Type TOutput = TypeVar("TOutput") -class BaseEmbeddedIntegration(ABC, Generic[TConfig, TState, TOutput]): - def __init__(self, source: SourceRunner[TConfig, TState], config: TConfig): - self.source = source +class BaseEmbeddedIntegration(ABC, Generic[TConfig, TOutput]): + def __init__(self, runner: SourceRunner[TConfig], config: TConfig): + self.source = runner self.config = config self.last_state: Optional[AirbyteStateMessage] = None @@ -29,11 +28,11 @@ def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Opt """ pass - def _load_data(self, stream_name: str, state: Optional[TState]) -> Iterable[TOutput]: + def _load_data(self, stream_name: str, state: Optional[AirbyteStateMessage]) -> Iterable[TOutput]: catalog = self.source.discover(self.config) stream = get_stream(catalog, stream_name) if not stream: - raise ValueError(f"Stream {stream_name} not found") + raise ValueError(f"Stream {stream_name} not found, the following streams are available: {', '.join(get_stream_names(catalog))}") if not state or SyncMode.incremental not in stream.supported_sync_modes: configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.full_refresh) else: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py index ffb987325c2f..06d180374538 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py @@ -3,19 +3,32 @@ # +import logging from abc import ABC, abstractmethod from typing import Generic, Iterable, Optional from airbyte_cdk.connector import TConfig -from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog -from airbyte_cdk.sources.source import TState +from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.sources.abstract_source import AbstractSource -class SourceRunner(ABC, Generic[TConfig, TState]): +class SourceRunner(ABC, Generic[TConfig]): @abstractmethod def discover(self, config: TConfig) -> AirbyteCatalog: pass @abstractmethod - def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Optional[TState]) -> Iterable[AirbyteMessage]: + def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Optional[AirbyteStateMessage]) -> Iterable[AirbyteMessage]: pass + + +class CDKRunner(SourceRunner[TConfig]): + def __init__(self, source: AbstractSource, name: str): + self._source = source + self._logger = logging.getLogger(name) + + def discover(self, config: TConfig) -> AirbyteCatalog: + return self._source.discover(self._logger, config) + + def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Optional[AirbyteStateMessage]) -> Iterable[AirbyteMessage]: + return self._source.read(self._logger, config, catalog, state=[state] if state else []) From 343134537be0459e1eed6d8b4bef9878a694b37d Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 1 Aug 2023 15:25:12 +0200 Subject: [PATCH 07/11] make state param optional --- .../python/airbyte_cdk/sources/embedded/base_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py index 912798eb1b43..7ee78edeadbc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py @@ -28,7 +28,7 @@ def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Opt """ pass - def _load_data(self, stream_name: str, state: Optional[AirbyteStateMessage]) -> Iterable[TOutput]: + def _load_data(self, stream_name: str, state: Optional[AirbyteStateMessage] = None) -> Iterable[TOutput]: catalog = self.source.discover(self.config) stream = get_stream(catalog, stream_name) if not stream: From c60f3eebcaaf4adceb4d1d6d890f4d7336c771dd Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Wed, 2 Aug 2023 10:59:07 +0200 Subject: [PATCH 08/11] update protocol models --- airbyte-cdk/python/airbyte_cdk/models/__init__.py | 3 --- airbyte-cdk/python/setup.py | 2 +- airbyte-cdk/python/unit_tests/sources/test_config.py | 4 ++-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/models/__init__.py b/airbyte-cdk/python/airbyte_cdk/models/__init__.py index b0ecf17e6cea..9545af7b044c 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/models/__init__.py @@ -27,8 +27,6 @@ AirbyteStreamStatusTraceMessage, AirbyteTraceMessage, AuthFlowType, - AuthSpecification, - AuthType, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, ConnectorSpecification, @@ -36,7 +34,6 @@ EstimateType, FailureType, Level, - OAuth2Specification, OAuthConfigSpecification, OrchestratorType, Status, diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index e5f08ac90026..dc815bc0f2ee 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -50,7 +50,7 @@ packages=find_packages(exclude=("unit_tests",)), package_data={"airbyte_cdk": ["py.typed", "sources/declarative/declarative_component_schema.yaml"]}, install_requires=[ - "airbyte-protocol-models==0.3.6", + "airbyte-protocol-models==0.4.0", "backoff", "dpath~=2.0.1", "isodate~=0.6.1", diff --git a/airbyte-cdk/python/unit_tests/sources/test_config.py b/airbyte-cdk/python/unit_tests/sources/test_config.py index c988e60df406..e9617da3684a 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_config.py +++ b/airbyte-cdk/python/unit_tests/sources/test_config.py @@ -43,7 +43,7 @@ class TestBaseConfig: "properties": { "count": {"title": "Count", "type": "integer"}, "name": {"title": "Name", "type": "string"}, - "selected_strategy": {"const": "option1", "title": "Selected " "Strategy", "type": "string"}, + "selected_strategy": {"const": "option1", "title": "Selected " "Strategy", "type": "string", "default": "option1"}, }, "required": ["name", "count"], "title": "Choice1", @@ -51,7 +51,7 @@ class TestBaseConfig: }, { "properties": { - "selected_strategy": {"const": "option2", "title": "Selected " "Strategy", "type": "string"}, + "selected_strategy": {"const": "option2", "title": "Selected " "Strategy", "type": "string", "default": "option2"}, "sequence": {"items": {"type": "string"}, "title": "Sequence", "type": "array"}, }, "required": ["sequence"], From c27fd9b06c4cbbbab8294cf59ee3ccd9284651dc Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Wed, 2 Aug 2023 12:43:35 +0200 Subject: [PATCH 09/11] review comments --- airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py | 7 +------ airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py | 4 ++-- airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py | 3 +-- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py index cd47efd4d411..765e9b260233 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/catalog.py @@ -19,10 +19,6 @@ def get_stream(catalog: AirbyteCatalog, stream_name: str) -> Optional[AirbyteStr return get_first(catalog.streams, lambda s: s.name == stream_name) -def get_configured_stream(catalog: ConfiguredAirbyteCatalog, stream_name: str) -> Optional[ConfiguredAirbyteStream]: - return get_first(catalog.streams, lambda s: s.name == stream_name) - - def get_stream_names(catalog: AirbyteCatalog) -> List[str]: return [stream.name for stream in catalog.streams] @@ -44,7 +40,6 @@ def to_configured_catalog(configured_streams: List[ConfiguredAirbyteStream]) -> def create_configured_catalog(stream: AirbyteStream, sync_mode: SyncMode = SyncMode.full_refresh) -> ConfiguredAirbyteCatalog: - configured_streams = [] - configured_streams.append(to_configured_stream(stream, sync_mode=sync_mode, primary_key=stream.source_defined_primary_key)) + configured_streams = [to_configured_stream(stream, sync_mode=sync_mode, primary_key=stream.source_defined_primary_key)] return to_configured_catalog(configured_streams) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py index 06d180374538..47f185a6e4c3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py @@ -9,7 +9,7 @@ from airbyte_cdk.connector import TConfig from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog -from airbyte_cdk.sources.abstract_source import AbstractSource +from airbyte_cdk.sources.source import Source class SourceRunner(ABC, Generic[TConfig]): @@ -23,7 +23,7 @@ def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Option class CDKRunner(SourceRunner[TConfig]): - def __init__(self, source: AbstractSource, name: str): + def __init__(self, source: Source, name: str): self._source = source self._logger = logging.getLogger(name) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py index 97ef631169fe..5777e567dd4c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/tools.py @@ -4,6 +4,7 @@ from typing import Any, Callable, Dict, Iterable, Optional +import dpath from airbyte_cdk.models import AirbyteStream @@ -12,8 +13,6 @@ def get_first(iterable: Iterable[Any], predicate: Callable[[Any], bool] = lambda def get_defined_id(stream: AirbyteStream, data: Dict[str, Any]) -> Optional[str]: - import dpath - if not stream.source_defined_primary_key: return None primary_key = [] From b23abc41454f4cacf56a14b1bc8b3d14ac7e31f0 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Wed, 2 Aug 2023 15:26:23 +0200 Subject: [PATCH 10/11] always run incremental if possible --- .../sources/embedded/base_integration.py | 2 +- .../embedded/test_embedded_integration.py | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py index 7ee78edeadbc..d5f96d024a00 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py @@ -33,7 +33,7 @@ def _load_data(self, stream_name: str, state: Optional[AirbyteStateMessage] = No stream = get_stream(catalog, stream_name) if not stream: raise ValueError(f"Stream {stream_name} not found, the following streams are available: {', '.join(get_stream_names(catalog))}") - if not state or SyncMode.incremental not in stream.supported_sync_modes: + if SyncMode.incremental not in stream.supported_sync_modes: configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.full_refresh) else: configured_catalog = create_configured_catalog(stream, sync_mode=SyncMode.incremental) diff --git a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py index b205818a7e44..b49992c57363 100644 --- a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py +++ b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py @@ -67,7 +67,7 @@ def test_integration(self): streams=[ ConfiguredAirbyteStream( stream=self.stream1, - sync_mode=SyncMode.full_refresh, + sync_mode=SyncMode.incremental, destination_sync_mode=DestinationSyncMode.append, primary_key=[["test"]], ) @@ -110,6 +110,24 @@ def test_incremental(self): state, ) + def test_incremental_without_state(self): + state = AirbyteStateMessage(data={}) + list(self.integration._load_data("test")) + self.source.read.assert_called_once_with( + self.config, + ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=self.stream1, + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + primary_key=[["test"]], + ) + ] + ), + None, + ) + def test_incremental_unsupported(self): state = AirbyteStateMessage(data={}) list(self.integration._load_data("test2", state)) From eeced02c979c4ec66306972e43e11e0a1174d989 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Wed, 2 Aug 2023 15:27:30 +0200 Subject: [PATCH 11/11] fix --- .../unit_tests/sources/embedded/test_embedded_integration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py index b49992c57363..db05a8d38335 100644 --- a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py +++ b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py @@ -111,7 +111,6 @@ def test_incremental(self): ) def test_incremental_without_state(self): - state = AirbyteStateMessage(data={}) list(self.integration._load_data("test")) self.source.read.assert_called_once_with( self.config,