From c8666c1e3d1fba5e54c8e904fbfbc57a5f145231 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Fri, 13 Jan 2023 11:06:55 -0600 Subject: [PATCH 1/7] Add test for failure case --- .../sources/declarative/checks/test_check_stream.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 10e43ea7c4d1..f30ea10227ac 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -49,3 +49,16 @@ def test_check_stream_with_slices_as_list(test_name, record, streams_to_check, s def mock_read_records(responses, default_response=None, **kwargs): return lambda stream_slice, sync_mode: responses[frozenset(stream_slice)] if frozenset(stream_slice) in responses else default_response + + +def test_check_empty_stream(): + stream = MagicMock() + stream.name = "s1" + stream.read_records.return_value = iter([]) + + source = MagicMock() + source.streams.return_value = [stream] + + check_stream = CheckStream(["s1"], options={}) + stream_is_available, reason = check_stream.check_connection(source, logger, config) + assert stream_is_available From 7c7d5b61210a95dea80f3fe45f83b1f447653383 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Fri, 13 Jan 2023 11:08:08 -0600 Subject: [PATCH 2/7] Except StopIteration - make test pass --- .../airbyte_cdk/sources/declarative/checks/check_stream.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index c982f354f3b4..2f7d3ec9e3ec 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -40,6 +40,9 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi stream_slice = self._get_stream_slice(stream) records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) next(records) + except StopIteration: + # StopIteration is raised if we successfully connect to an empty stream + return True, None except Exception as error: return False, f"Unable to connect to stream {stream_name} - {error}" else: From fbb34ec0d51f30b25c95ccf69b87aa79e2acc616 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Fri, 13 Jan 2023 13:25:53 -0600 Subject: [PATCH 3/7] Don't attempt to connect to a stream if we get no stream slices --- .../declarative/checks/check_stream.py | 14 +++++++++----- .../declarative/checks/test_check_stream.py | 19 +++++++++++++++++-- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 2f7d3ec9e3ec..ec5d0d108487 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -3,6 +3,7 @@ # import logging +import traceback from dataclasses import InitVar, dataclass from typing import Any, List, Mapping, Tuple @@ -38,10 +39,16 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi try: # Some streams need a stream slice to read records (eg if they have a SubstreamSlicer) stream_slice = self._get_stream_slice(stream) + except StopIteration: + return ( + False, + f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty.", + ) + try: records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) next(records) except StopIteration: - # StopIteration is raised if we successfully connect to an empty stream + logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") return True, None except Exception as error: return False, f"Unable to connect to stream {stream_name} - {error}" @@ -58,7 +65,4 @@ def _get_stream_slice(self, stream): sync_mode=SyncMode.full_refresh, ) ) - try: - return next(slices) - except StopIteration: - return {} + return next(slices) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index f30ea10227ac..97cc4f522ad6 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -1,13 +1,13 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - +import logging from unittest.mock import MagicMock import pytest from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream -logger = None +logger = logging.getLogger("test") config = dict() stream_names = ["s1"] @@ -55,6 +55,7 @@ def test_check_empty_stream(): stream = MagicMock() stream.name = "s1" stream.read_records.return_value = iter([]) + stream.stream_slices.return_value = iter([None]) source = MagicMock() source.streams.return_value = [stream] @@ -62,3 +63,17 @@ def test_check_empty_stream(): check_stream = CheckStream(["s1"], options={}) stream_is_available, reason = check_stream.check_connection(source, logger, config) assert stream_is_available + + +def test_check_stream_with_no_stream_slices_aborts(): + stream = MagicMock() + stream.name = "s1" + stream.stream_slices.return_value = iter([]) + + source = MagicMock() + source.streams.return_value = [stream] + + check_stream = CheckStream(["s1"], options={}) + stream_is_available, reason = check_stream.check_connection(source, logger, config) + assert not stream_is_available + assert "no stream slices were found, likely because the parent stream is empty" in reason From a799eb7afb4aab4f07093c68eaf39d76c1136232 Mon Sep 17 00:00:00 2001 From: erohmensing Date: Fri, 13 Jan 2023 13:40:26 -0600 Subject: [PATCH 4/7] Make helper method for getting first record for a slice --- .../declarative/checks/check_stream.py | 22 ++++++++++--------- .../declarative/checks/test_check_stream.py | 1 + 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index ec5d0d108487..066784494141 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -38,25 +38,23 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi stream = stream_name_to_stream[stream_name] try: # Some streams need a stream slice to read records (eg if they have a SubstreamSlicer) - stream_slice = self._get_stream_slice(stream) + stream_slice = self._get_first_stream_slice(stream) except StopIteration: - return ( - False, - f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty.", - ) + reason = f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty." + return False, reason try: - records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) - next(records) + self._get_first_record_for_slice(stream, stream_slice) + return True, None except StopIteration: logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") return True, None except Exception as error: + logger.error(f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}") return False, f"Unable to connect to stream {stream_name} - {error}" else: - raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}") - return True, None + raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.") - def _get_stream_slice(self, stream): + def _get_first_stream_slice(self, stream): # We wrap the return output of stream_slices() because some implementations return types that are iterable, # but not iterators such as lists or tuples slices = iter( @@ -66,3 +64,7 @@ def _get_stream_slice(self, stream): ) ) return next(slices) + + def _get_first_record_for_slice(self, stream, stream_slice): + records_for_slice = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) + return next(records_for_slice) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 97cc4f522ad6..1ea86bf88937 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -1,6 +1,7 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # + import logging from unittest.mock import MagicMock From 54558dce70401bea30e1982d9ab01cb3385ae02c Mon Sep 17 00:00:00 2001 From: erohmensing Date: Fri, 13 Jan 2023 13:52:57 -0600 Subject: [PATCH 5/7] Add comments and exit early if stream to check isn't in list of source streams --- .../declarative/checks/check_stream.py | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 066784494141..a372a7caa684 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -29,30 +29,36 @@ def __post_init__(self, options: Mapping[str, Any]): self._options = options def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]: + """Check configuration parameters for a source by attempting to get the first record for each stream in the CheckStream's `stream_name` list.""" streams = source.streams(config) stream_name_to_stream = {s.name: s for s in streams} if len(streams) == 0: return False, f"No streams to connect to from source {source}" for stream_name in self.stream_names: - if stream_name in stream_name_to_stream.keys(): - stream = stream_name_to_stream[stream_name] - try: - # Some streams need a stream slice to read records (eg if they have a SubstreamSlicer) - stream_slice = self._get_first_stream_slice(stream) - except StopIteration: - reason = f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty." - return False, reason - try: - self._get_first_record_for_slice(stream, stream_slice) - return True, None - except StopIteration: - logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") - return True, None - except Exception as error: - logger.error(f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}") - return False, f"Unable to connect to stream {stream_name} - {error}" - else: + if stream_name not in stream_name_to_stream.keys(): raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.") + stream = stream_name_to_stream[stream_name] + + try: + # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) + # Streams that don't need a stream slice will return `None` as their first stream slice. + stream_slice = self._get_first_stream_slice(stream) + except StopIteration: + # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) + # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` + # without accounting for the case in which the parent stream is empty. + reason = f"Cannot attempt to connect to stream {stream_name} - no stream slices were found, likely because the parent stream is empty." + return False, reason + + try: + self._get_first_record_for_slice(stream, stream_slice) + return True, None + except StopIteration: + logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") + return True, None + except Exception as error: + logger.error(f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}") + return False, f"Unable to connect to stream {stream_name} - {error}" def _get_first_stream_slice(self, stream): # We wrap the return output of stream_slices() because some implementations return types that are iterable, From b00da3ed3f805072d1aa987d05ed34e843e994de Mon Sep 17 00:00:00 2001 From: erohmensing Date: Fri, 13 Jan 2023 14:08:18 -0600 Subject: [PATCH 6/7] move helpers to helper module --- .../declarative/checks/check_stream.py | 21 ++--------- .../sources/streams/utils/stream_helper.py | 36 +++++++++++++++++++ 2 files changed, 39 insertions(+), 18 deletions(-) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index a372a7caa684..f71e5ab47ce6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -7,9 +7,9 @@ from dataclasses import InitVar, dataclass from typing import Any, List, Mapping, Tuple -from airbyte_cdk.models.airbyte_protocol import SyncMode from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.source import Source +from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice, get_first_stream_slice from dataclasses_jsonschema import JsonSchemaMixin @@ -42,7 +42,7 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi try: # Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer) # Streams that don't need a stream slice will return `None` as their first stream slice. - stream_slice = self._get_first_stream_slice(stream) + stream_slice = get_first_stream_slice(stream) except StopIteration: # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` @@ -51,7 +51,7 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi return False, reason try: - self._get_first_record_for_slice(stream, stream_slice) + get_first_record_for_slice(stream, stream_slice) return True, None except StopIteration: logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") @@ -59,18 +59,3 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi except Exception as error: logger.error(f"Encountered an error trying to connect to stream {stream.name}. Error: \n {traceback.format_exc()}") return False, f"Unable to connect to stream {stream_name} - {error}" - - def _get_first_stream_slice(self, stream): - # We wrap the return output of stream_slices() because some implementations return types that are iterable, - # but not iterators such as lists or tuples - slices = iter( - stream.stream_slices( - cursor_field=stream.cursor_field, - sync_mode=SyncMode.full_refresh, - ) - ) - return next(slices) - - def _get_first_record_for_slice(self, stream, stream_slice): - records_for_slice = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) - return next(records_for_slice) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py new file mode 100644 index 000000000000..9046301dea2d --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py @@ -0,0 +1,36 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Mapping, Optional + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.core import Stream, StreamData + + +def get_first_stream_slice(stream) -> Optional[Mapping[str, Any]]: + """ + Gets the first stream_slice from a given stream's stream_slices. + :param stream: stream + :return: First stream slice from 'stream_slices' generator + """ + # We wrap the return output of stream_slices() because some implementations return types that are iterable, + # but not iterators such as lists or tuples + slices = iter( + stream.stream_slices( + cursor_field=stream.cursor_field, + sync_mode=SyncMode.full_refresh, + ) + ) + return next(slices) + + +def get_first_record_for_slice(stream: Stream, stream_slice: Optional[Mapping[str, Any]]) -> StreamData: + """ + Gets the first record for a stream_slice of a stream. + :param stream: stream + :param stream_slice: stream_slice + :return: StreamData containing the first record in the slice. + """ + records_for_slice = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) + return next(records_for_slice) From 60d8bc4613587552859b0095f6fcd7622656938c Mon Sep 17 00:00:00 2001 From: erohmensing Date: Fri, 13 Jan 2023 15:08:00 -0600 Subject: [PATCH 7/7] Clarify what it means when StopIteration is returned by helper methods --- .../airbyte_cdk/sources/streams/utils/stream_helper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py index 9046301dea2d..9b09e9d12268 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py @@ -12,7 +12,8 @@ def get_first_stream_slice(stream) -> Optional[Mapping[str, Any]]: """ Gets the first stream_slice from a given stream's stream_slices. :param stream: stream - :return: First stream slice from 'stream_slices' generator + :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) + :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) """ # We wrap the return output of stream_slices() because some implementations return types that are iterable, # but not iterators such as lists or tuples @@ -30,7 +31,8 @@ def get_first_record_for_slice(stream: Stream, stream_slice: Optional[Mapping[st Gets the first record for a stream_slice of a stream. :param stream: stream :param stream_slice: stream_slice - :return: StreamData containing the first record in the slice. + :raises StopIteration: if there is no first record to return (the read_records generator is empty) + :return: StreamData containing the first record in the slice """ records_for_slice = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) return next(records_for_slice)