Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to edge cases of CheckStream #21404

Merged
merged 9 commits into from
Jan 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi
stream_slice = self._get_stream_slice(stream)
records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
next(records)
except StopIteration:
# StopIteration is raised if we successfully connect to an empty stream
return True, None
except Exception as error:
return False, f"Unable to connect to stream {stream_name} - {error}"
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,16 @@ def test_check_stream_with_slices_as_list(test_name, record, streams_to_check, s

def mock_read_records(responses, default_response=None, **kwargs):
return lambda stream_slice, sync_mode: responses[frozenset(stream_slice)] if frozenset(stream_slice) in responses else default_response


def test_check_empty_stream():
stream = MagicMock()
stream.name = "s1"
stream.read_records.return_value = iter([])

source = MagicMock()
source.streams.return_value = [stream]

check_stream = CheckStream(["s1"], options={})
stream_is_available, reason = check_stream.check_connection(source, logger, config)
assert stream_is_available