Skip to content

Commit

Permalink
✨ [source-github] Bump cdk version and enable RFR for all non-increme…
Browse files Browse the repository at this point in the history
…ntal streams (#42966)
  • Loading branch information
brianjlai authored Aug 9, 2024
1 parent 871d5b3 commit 2aaf33e
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 86 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.8.1
dockerImageTag: 1.8.2
dockerRepository: airbyte/source-github
documentationUrl: https://docs.airbyte.com/integrations/sources/github
githubIssueLabel: source-github
Expand Down
52 changes: 26 additions & 26 deletions airbyte-integrations/connectors/source-github/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-github/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.8.1"
version = "1.8.2"
name = "source-github"
description = "Source implementation for GitHub."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -16,8 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_github"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "^3"
python = "^3.10,<3.12"
airbyte-cdk = "^4"
sgqlc = "==16.3"

[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp


class ContributorActivityErrorHandler(HttpStatusErrorHandler):
"""
This custom error handler is needed for streams based on repository statistics endpoints like ContributorActivity because
when requesting data that hasn't been cached yet when the request is made, you'll receive a 202 response. And these requests
need to retried to get the actual results.
See the docs for more info:
https://docs.github.com/en/rest/metrics/statistics?apiVersion=2022-11-28#a-word-about-caching
"""

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response) and response_or_exception.status_code == requests.codes.ACCEPTED:
return ErrorResolution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

import pendulum
import requests
from airbyte_cdk import BackoffStrategy
from airbyte_cdk import BackoffStrategy, StreamSlice
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.checkpoint.substream_resumable_full_refresh_cursor import SubstreamResumableFullRefreshCursor
from airbyte_cdk.sources.streams.core import CheckpointMixin, Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, ErrorResolution, HttpStatusErrorHandler, ResponseAction
Expand Down Expand Up @@ -57,6 +58,9 @@ def __init__(self, api_url: str = "https://api.github.com", access_token_type: s
self.api_url = api_url
self.state = {}

if not self.supports_incremental:
self.cursor = SubstreamResumableFullRefreshCursor()

@property
def url_base(self) -> str:
return self.api_url
Expand Down Expand Up @@ -1613,7 +1617,8 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
return record

def get_error_handler(self) -> Optional[ErrorHandler]:
return ContributorActivityErrorHandler(logger=self.logger, max_retries=self.max_retries, error_mapping=GITHUB_DEFAULT_ERROR_MAPPING)

return ContributorActivityErrorHandler(logger=self.logger, max_retries=5, error_mapping=GITHUB_DEFAULT_ERROR_MAPPING)

def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
return ContributorActivityBackoffStrategy()
Expand Down Expand Up @@ -1645,6 +1650,13 @@ def read_records(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iter
message=f"Syncing `{self.__class__.__name__}` " f"stream isn't available for repository `{repository}`.",
),
)

# In order to retain the existing stream behavior before we added RFR to this stream, we need to close out the
# partition after we give up the maximum number of retries on the 202 response. This does lead to the question
# of if we should prematurely exit in the first place, but for now we're going to aim for feature parity
partition_obj = stream_slice.get("partition")
if self.cursor and partition_obj:
self.cursor.close_slice(StreamSlice(cursor_slice={}, partition=partition_obj))
else:
raise e

Expand Down
Loading

0 comments on commit 2aaf33e

Please sign in to comment.