Skip to content

Commit

Permalink
refactor!(airbyte-cdk): deprecate availability strategy (#42039)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 authored Jul 23, 2024
1 parent 250c02d commit 5056e67
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi
raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}.")

stream = stream_name_to_stream[stream_name]
availability_strategy = stream.availability_strategy or HttpAvailabilityStrategy()
availability_strategy = HttpAvailabilityStrategy()
try:
stream_is_available, reason = availability_strategy.check_availability(stream, logger, source)
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
if not stream_is_available:
return False, reason
except Exception as error:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import logging
import typing
from abc import ABC, abstractmethod
from typing import Optional, Tuple
from typing import Any, Mapping, Optional, Tuple

from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream, StreamData

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source
Expand All @@ -19,7 +20,7 @@ class AvailabilityStrategy(ABC):
"""

@abstractmethod
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]:
"""
Checks stream availability.
Expand All @@ -31,3 +32,47 @@ def check_availability(self, stream: Stream, logger: logging.Logger, source: Opt
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""

@staticmethod
def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
"""
Gets the first stream_slice from a given stream's stream_slices.
:param stream: stream
:raises StopIteration: if there is no first slice to return (the stream_slices generator is empty)
:return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice)
"""
# We wrap the return output of stream_slices() because some implementations return types that are iterable,
# but not iterators such as lists or tuples
slices = iter(
stream.stream_slices(
cursor_field=stream.cursor_field, # type: ignore[arg-type]
sync_mode=SyncMode.full_refresh,
)
)
return next(slices)

@staticmethod
def get_first_record_for_slice(stream: Stream, stream_slice: Optional[Mapping[str, Any]]) -> StreamData:
"""
Gets the first record for a stream_slice of a stream.
:param stream: stream instance from which to read records
:param stream_slice: stream_slice parameters for slicing the stream
:raises StopIteration: if there is no first record to return (the read_records generator is empty)
:return: StreamData containing the first record in the slice
"""
# Store the original value of exit_on_rate_limit
original_exit_on_rate_limit = stream.exit_on_rate_limit

try:
# Ensure exit_on_rate_limit is safely set to True if possible
stream.exit_on_rate_limit = True

# We wrap the return output of read_records() because some implementations return types that are iterable,
# but not iterators such as lists or tuples
records_for_slice = iter(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))

return next(records_for_slice)
finally:
# Restore the original exit_on_rate_limit value
stream.exit_on_rate_limit = original_exit_on_rate_limit
31 changes: 1 addition & 30 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import inspect
import itertools
import logging
import typing
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import lru_cache
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode
Expand All @@ -32,10 +31,6 @@
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from deprecated import deprecated

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy

# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
# AirbyteMessage: An AirbyteMessage. Could be of any type
Expand Down Expand Up @@ -366,30 +361,6 @@ def exit_on_rate_limit(self, value: bool) -> None:
"""Exit on rate limit setter, accept bool value."""
self._exit_on_rate_limit = value

@deprecated(version="3.7.0")
def check_availability(self, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]:
"""
Checks whether this stream is available.
:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then this stream
is available, and no str is required. Otherwise, this stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
if self.availability_strategy:
return self.availability_strategy.check_availability(self, logger, source)
return True, None

@property
@deprecated(version="3.7.0")
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
"""
:return: The AvailabilityStrategy used to check whether this stream is available.
"""
return None

@property
@abstractmethod
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@

import logging
import typing
from typing import Dict, Optional, Tuple
from typing import Optional, Tuple

import requests
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.utils.stream_helper import get_first_record_for_slice, get_first_stream_slice
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests import HTTPError

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source


class HttpAvailabilityStrategy(AvailabilityStrategy):
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]:
"""
Check stream availability by attempting to read the first record of the
stream.
Expand All @@ -31,119 +28,25 @@ def check_availability(self, stream: Stream, logger: logging.Logger, source: Opt
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
reason: Optional[str] = None
reason: Optional[str]
try:
# Some streams need a stream slice to read records (e.g. if they have a SubstreamPartitionRouter)
# Streams that don't need a stream slice will return `None` as their first stream slice.
stream_slice = get_first_stream_slice(stream)
stream_slice = self.get_first_stream_slice(stream)
except StopIteration:
# If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!)
# This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>`
# without accounting for the case in which the parent stream is empty.
reason = f"Cannot attempt to connect to stream {stream.name} - no stream slices were found, likely because the parent stream is empty."
return False, reason
except HTTPError as error:
is_available, reason = self.handle_http_error(stream, logger, source, error)
if not is_available:
reason = f"Unable to get slices for {stream.name} stream, because of error in parent stream. {reason}"
return is_available, reason
except AirbyteTracedException as error:
return False, error.message

try:
get_first_record_for_slice(stream, stream_slice)
self.get_first_record_for_slice(stream, stream_slice)
return True, None
except StopIteration:
logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.")
return True, None
except HTTPError as error:
is_available, reason = self.handle_http_error(stream, logger, source, error)
if not is_available:
reason = f"Unable to read {stream.name} stream. {reason}"
return is_available, reason
except AirbyteTracedException as error:
return False, error.message

def handle_http_error(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Tuple[bool, Optional[str]]:
"""
Override this method to define error handling for various `HTTPError`s
that are raised while attempting to check a stream's availability.
Checks whether an error's status_code is in a list of unavailable_error_codes,
and gets the associated reason for that error.
:param stream: stream
:param logger: source logger
:param source: optional (source)
:param error: HTTPError raised while checking stream's availability.
:return: A tuple of (boolean, str). If boolean is true, then the stream
is available, and no str is required. Otherwise, the stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
status_code = error.response.status_code
known_status_codes = self.reasons_for_unavailable_status_codes(stream, logger, source, error)
known_reason = known_status_codes.get(status_code)
if not known_reason:
# If the HTTPError is not in the dictionary of errors we know how to handle, don't except
raise error

doc_ref = self._visit_docs_message(logger, source)
reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {known_reason}. {doc_ref} "
response_error_message = stream.parse_response_error_message(error.response) # type: ignore # noqa ; method will be deprecated in https://github.com/airbytehq/airbyte-internal-issues/issues/8521
if response_error_message:
reason += response_error_message
return False, reason

def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Dict[int, str]:
"""
Returns a dictionary of HTTP status codes that indicate stream
unavailability and reasons explaining why a given status code may
have occurred and how the user can resolve that error, if applicable.
:param stream: stream
:param logger: source logger
:param source: optional (source)
:return: A dictionary of (status code, reason) where the 'reason' explains
why 'status code' may have occurred and how the user can resolve that
error, if applicable.
"""
reasons_for_codes: Dict[int, str] = {
requests.codes.FORBIDDEN: "This is most likely due to insufficient permissions on the credentials in use. "
"Try to grant required permissions/scopes or re-authenticate"
}
return reasons_for_codes

@staticmethod
def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str:
"""
Creates a message indicating where to look in the documentation for
more information on a given source by checking the spec of that source
(if provided) for a 'documentationUrl'.
:param logger: source logger
:param source: optional (source)
:return: A message telling the user where to go to learn more about the source.
"""
if not source:
return "Please visit the connector's documentation to learn more. "

try:
connector_spec = source.spec(logger)
docs_url = connector_spec.documentationUrl
if docs_url:
return f"Please visit {docs_url} to learn more. "
else:
return "Please visit the connector's documentation to learn more. "

except FileNotFoundError: # If we are unit testing without implementing spec() method in source
if source:
docs_url = f"https://docs.airbyte.com/integrations/sources/{source.name}"
else:
docs_url = "https://docs.airbyte.com/integrations/sources/test"

return f"Please visit {docs_url} to learn more."
7 changes: 0 additions & 7 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
from airbyte_cdk.models import AirbyteMessage, FailureType, SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.call_rate import APIBudget
from airbyte_cdk.sources.streams.checkpoint.cursor import Cursor
from airbyte_cdk.sources.streams.checkpoint.resumable_full_refresh_cursor import ResumableFullRefreshCursor
from airbyte_cdk.sources.streams.core import CheckpointMixin, Stream, StreamData
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, HttpStatusErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, ResponseAction
from airbyte_cdk.sources.streams.http.http_client import HttpClient
Expand Down Expand Up @@ -132,11 +130,6 @@ def retry_factor(self) -> float:
"""
return 5

@property
@deprecated(version="3.7.0", reason="This functionality is handled by combination of HttpClient.ErrorHandler and AirbyteStreamStatus")
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return HttpAvailabilityStrategy()

@abstractmethod
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import requests
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy

logger = logging.getLogger("test")
config = dict()
Expand Down Expand Up @@ -60,7 +59,6 @@ def mock_read_records(responses, default_response=None, **kwargs):
def test_check_empty_stream():
stream = MagicMock()
stream.name = "s1"
stream.availability_strategy = None
stream.read_records.return_value = iter([])
stream.stream_slices.return_value = iter([None])

Expand All @@ -75,7 +73,6 @@ def test_check_empty_stream():
def test_check_stream_with_no_stream_slices_aborts():
stream = MagicMock()
stream.name = "s1"
stream.availability_strategy = None
stream.stream_slices.return_value = iter([])

source = MagicMock()
Expand Down Expand Up @@ -126,7 +123,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp

http_stream = MockHttpStream()
assert isinstance(http_stream, HttpStream)
assert isinstance(http_stream.availability_strategy, HttpAvailabilityStrategy)

source = MagicMock()
source.streams.return_value = [http_stream]
Expand Down
Loading

0 comments on commit 5056e67

Please sign in to comment.