diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py index 338d591b62e8..f2e6ad5461c5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -37,9 +37,9 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.") stream = stream_name_to_stream[stream_name] - availability_strategy = stream.availability_strategy or HttpAvailabilityStrategy() + availability_strategy = HttpAvailabilityStrategy() try: - stream_is_available, reason = availability_strategy.check_availability(stream, logger, source) + stream_is_available, reason = availability_strategy.check_availability(stream, logger) if not stream_is_available: return False, reason except Exception as error: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py index e7a61bca4a4e..f2042bc1cb92 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/availability_strategy.py @@ -5,9 +5,10 @@ import logging import typing from abc import ABC, abstractmethod -from typing import Optional, Tuple +from typing import Any, Mapping, Optional, Tuple -from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.core import Stream, StreamData if typing.TYPE_CHECKING: from airbyte_cdk.sources import Source @@ -19,7 +20,7 @@ class AvailabilityStrategy(ABC): """ @abstractmethod - def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]: + def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]: """ Checks stream availability. @@ -31,3 +32,47 @@ def check_availability(self, stream: Stream, logger: logging.Logger, source: Opt for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible. """ + + @staticmethod + def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: + """ + Gets the first stream_slice from a given stream's stream_slices. + :param stream: stream + :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) + :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) + """ + # We wrap the return output of stream_slices() because some implementations return types that are iterable, + # but not iterators such as lists or tuples + slices = iter( + stream.stream_slices( + cursor_field=stream.cursor_field, # type: ignore[arg-type] + sync_mode=SyncMode.full_refresh, + ) + ) + return next(slices) + + @staticmethod + def get_first_record_for_slice(stream: Stream, stream_slice: Optional[Mapping[str, Any]]) -> StreamData: + """ + Gets the first record for a stream_slice of a stream. + + :param stream: stream instance from which to read records + :param stream_slice: stream_slice parameters for slicing the stream + :raises StopIteration: if there is no first record to return (the read_records generator is empty) + :return: StreamData containing the first record in the slice + """ + # Store the original value of exit_on_rate_limit + original_exit_on_rate_limit = stream.exit_on_rate_limit + + try: + # Ensure exit_on_rate_limit is safely set to True if possible + stream.exit_on_rate_limit = True + + # We wrap the return output of read_records() because some implementations return types that are iterable, + # but not iterators such as lists or tuples + records_for_slice = iter(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)) + + return next(records_for_slice) + finally: + # Restore the original exit_on_rate_limit value + stream.exit_on_rate_limit = original_exit_on_rate_limit diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 3567892b9e84..3c91021da5f1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -5,11 +5,10 @@ import inspect import itertools import logging -import typing from abc import ABC, abstractmethod from dataclasses import dataclass from functools import lru_cache -from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union +from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union import airbyte_cdk.sources.utils.casing as casing from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode @@ -32,10 +31,6 @@ from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from deprecated import deprecated -if typing.TYPE_CHECKING: - from airbyte_cdk.sources import Source - from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy - # A stream's read method can return one of the following types: # Mapping[str, Any]: The content of an AirbyteRecordMessage # AirbyteMessage: An AirbyteMessage. Could be of any type @@ -366,30 +361,6 @@ def exit_on_rate_limit(self, value: bool) -> None: """Exit on rate limit setter, accept bool value.""" self._exit_on_rate_limit = value - @deprecated(version="3.7.0") - def check_availability(self, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]: - """ - Checks whether this stream is available. - - :param logger: source logger - :param source: (optional) source - :return: A tuple of (boolean, str). If boolean is true, then this stream - is available, and no str is required. Otherwise, this stream is unavailable - for some reason and the str should describe what went wrong and how to - resolve the unavailability, if possible. - """ - if self.availability_strategy: - return self.availability_strategy.check_availability(self, logger, source) - return True, None - - @property - @deprecated(version="3.7.0") - def availability_strategy(self) -> Optional["AvailabilityStrategy"]: - """ - :return: The AvailabilityStrategy used to check whether this stream is available. - """ - return None - @property @abstractmethod def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py index e5da2c3d5ba5..4b3dba106c6e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/availability_strategy.py @@ -4,21 +4,18 @@ import logging import typing -from typing import Dict, Optional, Tuple +from typing import Optional, Tuple -import requests from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy -from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice, get_first_stream_slice from airbyte_cdk.utils.traced_exception import AirbyteTracedException -from requests import HTTPError if typing.TYPE_CHECKING: from airbyte_cdk.sources import Source class HttpAvailabilityStrategy(AvailabilityStrategy): - def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]: + def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]: """ Check stream availability by attempting to read the first record of the stream. @@ -31,119 +28,25 @@ def check_availability(self, stream: Stream, logger: logging.Logger, source: Opt for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible. """ - reason: Optional[str] = None + reason: Optional[str] try: # Some streams need a stream slice to read records (e.g. if they have a SubstreamPartitionRouter) # Streams that don't need a stream slice will return `None` as their first stream slice. - stream_slice = get_first_stream_slice(stream) + stream_slice = self.get_first_stream_slice(stream) except StopIteration: # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` # without accounting for the case in which the parent stream is empty. reason = f"Cannot attempt to connect to stream {stream.name} - no stream slices were found, likely because the parent stream is empty." return False, reason - except HTTPError as error: - is_available, reason = self.handle_http_error(stream, logger, source, error) - if not is_available: - reason = f"Unable to get slices for {stream.name} stream, because of error in parent stream. {reason}" - return is_available, reason except AirbyteTracedException as error: return False, error.message try: - get_first_record_for_slice(stream, stream_slice) + self.get_first_record_for_slice(stream, stream_slice) return True, None except StopIteration: logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") return True, None - except HTTPError as error: - is_available, reason = self.handle_http_error(stream, logger, source, error) - if not is_available: - reason = f"Unable to read {stream.name} stream. {reason}" - return is_available, reason except AirbyteTracedException as error: return False, error.message - - def handle_http_error( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError - ) -> Tuple[bool, Optional[str]]: - """ - Override this method to define error handling for various `HTTPError`s - that are raised while attempting to check a stream's availability. - - Checks whether an error's status_code is in a list of unavailable_error_codes, - and gets the associated reason for that error. - - :param stream: stream - :param logger: source logger - :param source: optional (source) - :param error: HTTPError raised while checking stream's availability. - :return: A tuple of (boolean, str). If boolean is true, then the stream - is available, and no str is required. Otherwise, the stream is unavailable - for some reason and the str should describe what went wrong and how to - resolve the unavailability, if possible. - """ - status_code = error.response.status_code - known_status_codes = self.reasons_for_unavailable_status_codes(stream, logger, source, error) - known_reason = known_status_codes.get(status_code) - if not known_reason: - # If the HTTPError is not in the dictionary of errors we know how to handle, don't except - raise error - - doc_ref = self._visit_docs_message(logger, source) - reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {known_reason}. {doc_ref} " - response_error_message = stream.parse_response_error_message(error.response) # type: ignore # noqa ; method will be deprecated in https://github.com/airbytehq/airbyte-internal-issues/issues/8521 - if response_error_message: - reason += response_error_message - return False, reason - - def reasons_for_unavailable_status_codes( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError - ) -> Dict[int, str]: - """ - Returns a dictionary of HTTP status codes that indicate stream - unavailability and reasons explaining why a given status code may - have occurred and how the user can resolve that error, if applicable. - - :param stream: stream - :param logger: source logger - :param source: optional (source) - :return: A dictionary of (status code, reason) where the 'reason' explains - why 'status code' may have occurred and how the user can resolve that - error, if applicable. - """ - reasons_for_codes: Dict[int, str] = { - requests.codes.FORBIDDEN: "This is most likely due to insufficient permissions on the credentials in use. " - "Try to grant required permissions/scopes or re-authenticate" - } - return reasons_for_codes - - @staticmethod - def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str: - """ - Creates a message indicating where to look in the documentation for - more information on a given source by checking the spec of that source - (if provided) for a 'documentationUrl'. - - :param logger: source logger - :param source: optional (source) - :return: A message telling the user where to go to learn more about the source. - """ - if not source: - return "Please visit the connector's documentation to learn more. " - - try: - connector_spec = source.spec(logger) - docs_url = connector_spec.documentationUrl - if docs_url: - return f"Please visit {docs_url} to learn more. " - else: - return "Please visit the connector's documentation to learn more. " - - except FileNotFoundError: # If we are unit testing without implementing spec() method in source - if source: - docs_url = f"https://docs.airbyte.com/integrations/sources/{source.name}" - else: - docs_url = "https://docs.airbyte.com/integrations/sources/test" - - return f"Please visit {docs_url} to learn more." diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index b5fda7377d3a..026846fb6ac2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -12,12 +12,10 @@ from airbyte_cdk.models import AirbyteMessage, FailureType, SyncMode from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message.repository import InMemoryMessageRepository -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.call_rate import APIBudget from airbyte_cdk.sources.streams.checkpoint.cursor import Cursor from airbyte_cdk.sources.streams.checkpoint.resumable_full_refresh_cursor import ResumableFullRefreshCursor from airbyte_cdk.sources.streams.core import CheckpointMixin, Stream, StreamData -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, HttpStatusErrorHandler from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, ResponseAction from airbyte_cdk.sources.streams.http.http_client import HttpClient @@ -132,11 +130,6 @@ def retry_factor(self) -> float: """ return 5 - @property - @deprecated(version="3.7.0", reason="This functionality is handled by combination of HttpClient.ErrorHandler and AirbyteStreamStatus") - def availability_strategy(self) -> Optional[AvailabilityStrategy]: - return HttpAvailabilityStrategy() - @abstractmethod def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py deleted file mode 100644 index a460f8811e9f..000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/utils/stream_helper.py +++ /dev/null @@ -1,52 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from typing import Any, Mapping, Optional - -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.core import Stream, StreamData - - -def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: - """ - Gets the first stream_slice from a given stream's stream_slices. - :param stream: stream - :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) - :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) - """ - # We wrap the return output of stream_slices() because some implementations return types that are iterable, - # but not iterators such as lists or tuples - slices = iter( - stream.stream_slices( - cursor_field=stream.cursor_field, # type: ignore[arg-type] - sync_mode=SyncMode.full_refresh, - ) - ) - return next(slices) - - -def get_first_record_for_slice(stream: Stream, stream_slice: Optional[Mapping[str, Any]]) -> StreamData: - """ - Gets the first record for a stream_slice of a stream. - - :param stream: stream instance from which to read records - :param stream_slice: stream_slice parameters for slicing the stream - :raises StopIteration: if there is no first record to return (the read_records generator is empty) - :return: StreamData containing the first record in the slice - """ - # Store the original value of exit_on_rate_limit - original_exit_on_rate_limit = stream.exit_on_rate_limit - - try: - # Ensure exit_on_rate_limit is safely set to True if possible - stream.exit_on_rate_limit = True - - # We wrap the return output of read_records() because some implementations return types that are iterable, - # but not iterators such as lists or tuples - records_for_slice = iter(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)) - - return next(records_for_slice) - finally: - # Restore the original exit_on_rate_limit value - stream.exit_on_rate_limit = original_exit_on_rate_limit diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py index 8057bf588918..8ccf70b4e7a9 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/checks/test_check_stream.py @@ -10,7 +10,6 @@ import requests from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream from airbyte_cdk.sources.streams.http import HttpStream -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy logger = logging.getLogger("test") config = dict() @@ -60,7 +59,6 @@ def mock_read_records(responses, default_response=None, **kwargs): def test_check_empty_stream(): stream = MagicMock() stream.name = "s1" - stream.availability_strategy = None stream.read_records.return_value = iter([]) stream.stream_slices.return_value = iter([None]) @@ -75,7 +73,6 @@ def test_check_empty_stream(): def test_check_stream_with_no_stream_slices_aborts(): stream = MagicMock() stream.name = "s1" - stream.availability_strategy = None stream.stream_slices.return_value = iter([]) source = MagicMock() @@ -126,7 +123,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp http_stream = MockHttpStream() assert isinstance(http_stream, HttpStream) - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) source = MagicMock() source.streams.return_value = [http_stream] diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py index 53229dd238b7..a474498ab7f8 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_availability_strategy.py @@ -3,12 +3,10 @@ # import logging -from typing import Any, Iterable, List, Mapping, Optional, Tuple +from typing import Any, Iterable, Mapping, Optional import pytest import requests -from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from airbyte_cdk.sources.streams.http.http import HttpStream @@ -81,30 +79,12 @@ def read_records(self, *args, **kvargs): return super().read_records(*args, **kvargs) http_stream = MockListHttpStream() - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) - - class MockSource(AbstractSource): - def __init__(self, streams: List[Stream] = None): - self._streams = streams - - def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: - return True, "" - - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - if not self._streams: - raise Exception("Stream is not set") - return self._streams - response = requests.Response() response.status_code = status_code response.raw = json_contents mocker.patch.object(requests.Session, "send", return_value=response) - if include_source: - source = MockSource(streams=[http_stream]) - actual_is_available, reason = http_stream.check_availability(logger, source) - else: - actual_is_available, reason = http_stream.check_availability(logger) + actual_is_available, reason = HttpAvailabilityStrategy().check_availability(http_stream, logger) assert actual_is_available == expected_is_available if expected_is_available: @@ -117,19 +97,17 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: def test_http_availability_raises_unhandled_error(mocker): http_stream = MockHttpStream() - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) req = requests.Response() req.status_code = 404 mocker.patch.object(requests.Session, "send", return_value=req) - assert (False, 'Not found. The requested resource was not found on the server.') == http_stream.check_availability(logger) + assert (False, 'Not found. The requested resource was not found on the server.') == HttpAvailabilityStrategy().check_availability(http_stream, logger) def test_send_handles_retries_when_checking_availability(mocker, caplog): mocker.patch("time.sleep", lambda x: None) http_stream = MockHttpStream() - assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy) req_1 = requests.Response() req_1.status_code = 429 @@ -140,7 +118,7 @@ def test_send_handles_retries_when_checking_availability(mocker, caplog): mock_send = mocker.patch.object(requests.Session, "send", side_effect=[req_1, req_2, req_3]) with caplog.at_level(logging.INFO): - stream_is_available, _ = http_stream.check_availability(logger) + stream_is_available, _ = HttpAvailabilityStrategy().check_availability(stream=http_stream,logger=logger) assert stream_is_available assert mock_send.call_count == 3 @@ -158,8 +136,6 @@ def __init__(self, *args, **kvargs): empty_stream = MockEmptyHttpStream() assert isinstance(empty_stream, HttpStream) - assert isinstance(empty_stream.availability_strategy, HttpAvailabilityStrategy) - # Generator should have no values to generate if records_as_list: empty_stream.read_records.return_value = [] @@ -167,7 +143,7 @@ def __init__(self, *args, **kvargs): empty_stream.read_records.return_value = iter([]) logger = logging.getLogger("airbyte.test-source") - stream_is_available, _ = empty_stream.check_availability(logger) + stream_is_available, _ = HttpAvailabilityStrategy().check_availability(stream=empty_stream, logger=logger) assert stream_is_available assert empty_stream.read_records.called diff --git a/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py b/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py deleted file mode 100644 index f1184c0c986e..000000000000 --- a/airbyte-cdk/python/unit_tests/sources/streams/test_availability_strategy.py +++ /dev/null @@ -1,70 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import logging -from typing import Any, Iterable, List, Mapping, Optional, Tuple, Union - -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources import Source -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy -from airbyte_cdk.sources.streams.core import StreamData - -logger = logging.getLogger("airbyte") - - -class MockStream(Stream): - def __init__(self, name: str) -> Stream: - self._name = name - - @property - def name(self) -> str: - return self._name - - @property - def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: - pass - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[StreamData]: - pass - - -def test_no_availability_strategy(): - stream_1 = MockStream("stream") - assert stream_1.availability_strategy is None - - stream_1_is_available, _ = stream_1.check_availability(logger) - assert stream_1_is_available - - -def test_availability_strategy(): - class MockAvailabilityStrategy(AvailabilityStrategy): - def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional[Source]) -> Tuple[bool, any]: - if stream.name == "available_stream": - return True, None - return False, f"Could not reach stream '{stream.name}'." - - class MockStreamWithAvailabilityStrategy(MockStream): - @property - def availability_strategy(self) -> Optional["AvailabilityStrategy"]: - return MockAvailabilityStrategy() - - stream_1 = MockStreamWithAvailabilityStrategy("available_stream") - stream_2 = MockStreamWithAvailabilityStrategy("unavailable_stream") - - for stream in [stream_1, stream_2]: - assert isinstance(stream.availability_strategy, MockAvailabilityStrategy) - - stream_1_is_available, _ = stream_1.check_availability(logger) - assert stream_1_is_available - - stream_2_is_available, reason = stream_2.check_availability(logger) - assert not stream_2_is_available - assert "Could not reach stream 'unavailable_stream'" in reason diff --git a/airbyte-cdk/python/unit_tests/sources/streams/utils/test_stream_helper.py b/airbyte-cdk/python/unit_tests/sources/streams/utils/test_stream_helper.py index 7ccacfebf62f..8cf1996853fd 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/utils/test_stream_helper.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/utils/test_stream_helper.py @@ -3,7 +3,7 @@ # import pytest -from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy class MockStream: @@ -38,9 +38,9 @@ def test_get_first_record_for_slice(records, stream_slice, exit_on_rate_limit, e if raises_exception: with pytest.raises(StopIteration): - get_first_record_for_slice(stream, stream_slice) + HttpAvailabilityStrategy().get_first_record_for_slice(stream, stream_slice) else: - result = get_first_record_for_slice(stream, stream_slice) + result = HttpAvailabilityStrategy().get_first_record_for_slice(stream, stream_slice) assert result == expected_result assert stream.exit_on_rate_limit == exit_on_rate_limit