From 4e9b0142775776a45db79a90c4eac2f798da295e Mon Sep 17 00:00:00 2001 From: Baz Date: Tue, 13 Dec 2022 17:54:49 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Python=20CDK:=20fix=20`StopItera?= =?UTF-8?q?tion`=20error=20for=20`check=5Favailability`=20(#20429)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airbyte-cdk/python/CHANGELOG.md | 3 +++ .../sources/utils/stream_helpers.py | 11 +++++--- airbyte-cdk/python/setup.py | 2 +- .../http/test_availability_strategy.py | 27 +++++++++++++++++++ 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 1a7357cc95d3..76bb44ebfb9b 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.13.3 +Fixed `StopIteration` exception for empty streams while `check_availability` runs. + ## 0.13.2 Low-code: Enable low-code CDK users to specify schema inline in the manifest diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py index 6f972246693b..bd922f3d80aa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py @@ -17,10 +17,13 @@ def get_first_record(self, stream: Stream) -> StreamData: :param stream: stream :return: StreamData containing the first record in the stream """ - # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) - stream_slice = self.get_stream_slice(stream) - records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) - next(records) + try: + # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) + stream_slice = self.get_stream_slice(stream) + records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) + return next(records) + except StopIteration: + return {} @staticmethod def get_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 91bf0d7b69ee..ef72c6ae3735 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.13.2", + version="0.13.3", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py index 1168dc609ca0..68c0b7e04dbf 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py @@ -139,3 +139,30 @@ def test_send_handles_retries_when_checking_availability(mocker, caplog): assert mock_send.call_count == 3 for message in ["Caught retryable error", "Response Code: 429", "Response Code: 503"]: assert message in caplog.text + + +def test_http_availability_strategy_on_empty_stream(mocker): + mocker.patch.multiple(HttpStream, __abstractmethods__=set()) + mocker.patch.multiple(Stream, __abstractmethods__=set()) + + class MockEmptyStream(mocker.MagicMock, HttpStream): + page_size = None + get_json_schema = mocker.MagicMock() + + def __init__(self, *args, **kvargs): + mocker.MagicMock.__init__(self) + self.read_records = mocker.MagicMock() + + empty_stream = MockEmptyStream() + assert isinstance(empty_stream, HttpStream) + + assert isinstance(empty_stream.availability_strategy, HttpAvailabilityStrategy) + + # Generator should have no values to generate + empty_stream.read_records.return_value = iter([]) + + logger = logging.getLogger("airbyte.test-source") + stream_is_available, _ = empty_stream.check_availability(logger) + + assert stream_is_available + assert empty_stream.read_records.called