From 15563aab8ab4a7c771748ee49feedecd49205dc7 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Fri, 4 Aug 2023 13:23:34 +0200 Subject: [PATCH 1/2] run a check before starting to load --- .../airbyte_cdk/sources/embedded/base_integration.py | 6 +++++- .../python/airbyte_cdk/sources/embedded/runner.py | 9 ++++++++- .../sources/embedded/test_embedded_integration.py | 9 +++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py index d5f96d024a00..0df49431ab14 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py @@ -9,13 +9,17 @@ from airbyte_cdk.sources.embedded.catalog import create_configured_catalog, get_stream, get_stream_names from airbyte_cdk.sources.embedded.runner import SourceRunner from airbyte_cdk.sources.embedded.tools import get_defined_id -from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, SyncMode, Type +from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, Status, SyncMode, Type TOutput = TypeVar("TOutput") class BaseEmbeddedIntegration(ABC, Generic[TConfig, TOutput]): def __init__(self, runner: SourceRunner[TConfig], config: TConfig): + check_result = runner.check(config) + if check_result.status == Status.FAILED: + raise ValueError(f"Configuration is not valid: {check_result.message}") + self.source = runner self.config = config diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py index 47f185a6e4c3..2053ba45722b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py @@ -8,11 +8,15 @@ from typing import Generic, Iterable, Optional from airbyte_cdk.connector import TConfig -from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog from airbyte_cdk.sources.source import Source class SourceRunner(ABC, Generic[TConfig]): + @abstractmethod + def check(self, config: TConfig) -> AirbyteConnectionStatus: + pass + @abstractmethod def discover(self, config: TConfig) -> AirbyteCatalog: pass @@ -27,6 +31,9 @@ def __init__(self, source: Source, name: str): self._source = source self._logger = logging.getLogger(name) + def check(self, config: TConfig) -> AirbyteConnectionStatus: + return self._source.check(self._logger, config) + def discover(self, config: TConfig) -> AirbyteCatalog: return self._source.discover(self._logger, config) diff --git a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py index db05a8d38335..70a8525b5134 100644 --- a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py +++ b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py @@ -9,6 +9,7 @@ from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration from airbyte_protocol.models import ( AirbyteCatalog, + AirbyteConnectionStatus, AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, @@ -18,6 +19,7 @@ ConfiguredAirbyteStream, DestinationSyncMode, Level, + Status, SyncMode, Type, ) @@ -43,6 +45,7 @@ def setUp(self): ) self.stream2 = AirbyteStream(name="test2", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]) self.source.discover.return_value = AirbyteCatalog(streams=[self.stream2, self.stream1]) + self.source.check.return_value = AirbyteConnectionStatus(status=Status.SUCCEEDED) def test_integration(self): self.source.read.return_value = [ @@ -76,6 +79,12 @@ def test_integration(self): None, ) + def test_failed_check(self): + self.source.check.return_value = AirbyteConnectionStatus(status=Status.FAILED, message="my error") + with self.assertRaises(ValueError) as error: + TestIntegration(self.source, self.config) + assert str(error.exception) == "Configuration is not valid: my error" + def test_state(self): state = AirbyteStateMessage(data={}) self.source.read.return_value = [ From 958274bd52fb90db72c1331333c68e20830fc5b3 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Fri, 18 Aug 2023 17:24:54 +0200 Subject: [PATCH 2/2] check config schema compatibility --- .../sources/embedded/base_integration.py | 7 +++---- .../airbyte_cdk/sources/embedded/runner.py | 8 ++++---- .../embedded/test_embedded_integration.py | 20 ++++++++++++------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py index 0df49431ab14..158dea4d135a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/base_integration.py @@ -9,16 +9,15 @@ from airbyte_cdk.sources.embedded.catalog import create_configured_catalog, get_stream, get_stream_names from airbyte_cdk.sources.embedded.runner import SourceRunner from airbyte_cdk.sources.embedded.tools import get_defined_id -from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, Status, SyncMode, Type +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit +from airbyte_protocol.models import AirbyteRecordMessage, AirbyteStateMessage, SyncMode, Type TOutput = TypeVar("TOutput") class BaseEmbeddedIntegration(ABC, Generic[TConfig, TOutput]): def __init__(self, runner: SourceRunner[TConfig], config: TConfig): - check_result = runner.check(config) - if check_result.status == Status.FAILED: - raise ValueError(f"Configuration is not valid: {check_result.message}") + check_config_against_spec_or_exit(config, runner.spec()) self.source = runner self.config = config diff --git a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py index 2053ba45722b..c64e66ed581e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py @@ -8,13 +8,13 @@ from typing import Generic, Iterable, Optional from airbyte_cdk.connector import TConfig -from airbyte_cdk.models import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog +from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, ConnectorSpecification from airbyte_cdk.sources.source import Source class SourceRunner(ABC, Generic[TConfig]): @abstractmethod - def check(self, config: TConfig) -> AirbyteConnectionStatus: + def spec(self) -> ConnectorSpecification: pass @abstractmethod @@ -31,8 +31,8 @@ def __init__(self, source: Source, name: str): self._source = source self._logger = logging.getLogger(name) - def check(self, config: TConfig) -> AirbyteConnectionStatus: - return self._source.check(self._logger, config) + def spec(self) -> ConnectorSpecification: + return self._source.spec(self._logger) def discover(self, config: TConfig) -> AirbyteCatalog: return self._source.discover(self._logger, config) diff --git a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py index 70a8525b5134..28ff10bc8660 100644 --- a/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py +++ b/airbyte-cdk/python/unit_tests/sources/embedded/test_embedded_integration.py @@ -7,9 +7,9 @@ from unittest.mock import MagicMock from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration +from airbyte_cdk.utils import AirbyteTracedException from airbyte_protocol.models import ( AirbyteCatalog, - AirbyteConnectionStatus, AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, @@ -17,9 +17,9 @@ AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, + ConnectorSpecification, DestinationSyncMode, Level, - Status, SyncMode, Type, ) @@ -35,7 +35,14 @@ def setUp(self): self.source_class = MagicMock() self.source = MagicMock() self.source_class.return_value = self.source - self.config = MagicMock() + self.source.spec.return_value = ConnectorSpecification(connectionSpecification={ + "properties": { + "test": { + "type": "string", + } + } + }) + self.config = {"test": "abc"} self.integration = TestIntegration(self.source, self.config) self.stream1 = AirbyteStream( name="test", @@ -45,7 +52,6 @@ def setUp(self): ) self.stream2 = AirbyteStream(name="test2", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]) self.source.discover.return_value = AirbyteCatalog(streams=[self.stream2, self.stream1]) - self.source.check.return_value = AirbyteConnectionStatus(status=Status.SUCCEEDED) def test_integration(self): self.source.read.return_value = [ @@ -80,10 +86,10 @@ def test_integration(self): ) def test_failed_check(self): - self.source.check.return_value = AirbyteConnectionStatus(status=Status.FAILED, message="my error") - with self.assertRaises(ValueError) as error: + self.config = {"test": 123} + with self.assertRaises(AirbyteTracedException) as error: TestIntegration(self.source, self.config) - assert str(error.exception) == "Configuration is not valid: my error" + assert str(error.exception) == "123 is not of type 'string'" def test_state(self): state = AirbyteStateMessage(data={})