diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json index 8d7fc60f5471..2ec9e0beef4b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "ef69ef6e-aa7f-4af1-a01d-ef775033524e", "name": "GitHub", "dockerRepository": "airbyte/source-github", - "dockerImageTag": "0.1.1", - "documentationUrl": "https://hub.docker.com/r/airbyte/source-github", + "dockerImageTag": "0.1.2", + "documentationUrl": "https://docs.airbyte.io/integrations/sources/github", "icon": "github.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index be08178f40ac..3479b5550bfa 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -34,8 +34,8 @@ - sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e name: GitHub dockerRepository: airbyte/source-github - dockerImageTag: 0.1.1 - documentationUrl: https://hub.docker.com/r/airbyte/source-github + dockerImageTag: 0.1.2 + documentationUrl: https://docs.airbyte.io/integrations/sources/github icon: github.svg - sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 name: Microsoft SQL Server (MSSQL) diff --git a/airbyte-integrations/connectors/source-github/Dockerfile b/airbyte-integrations/connectors/source-github/Dockerfile index 5a355f880043..849a14dcca16 100644 --- a/airbyte-integrations/connectors/source-github/Dockerfile +++ b/airbyte-integrations/connectors/source-github/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-github diff --git a/airbyte-integrations/connectors/source-github/source_github/source.py b/airbyte-integrations/connectors/source-github/source_github/source.py index ce0b36cbc1cb..af5f9f9fabba 100644 --- a/airbyte-integrations/connectors/source-github/source_github/source.py +++ b/airbyte-integrations/connectors/source-github/source_github/source.py @@ -64,22 +64,23 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> def streams(self, config: Mapping[str, Any]) -> List[Stream]: authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token") full_refresh_args = {"authenticator": authenticator, "repository": config["repository"]} - incremental_args = {"authenticator": authenticator, "repository": config["repository"], "start_date": config["start_date"]} + incremental_args = {**full_refresh_args, "start_date": config["start_date"]} + return [ Assignees(**full_refresh_args), - Reviews(**full_refresh_args), Collaborators(**full_refresh_args), - Teams(**full_refresh_args), - IssueLabels(**full_refresh_args), - Releases(**incremental_args), - Events(**incremental_args), Comments(**incremental_args), - PullRequests(**incremental_args), CommitComments(**incremental_args), - IssueMilestones(**incremental_args), Commits(**incremental_args), - Stargazers(**incremental_args), - Projects(**incremental_args), - Issues(**incremental_args), + Events(**incremental_args), IssueEvents(**incremental_args), + IssueLabels(**full_refresh_args), + IssueMilestones(**incremental_args), + Issues(**incremental_args), + Projects(**incremental_args), + PullRequests(**incremental_args), + Releases(**incremental_args), + Reviews(**full_refresh_args), + Stargazers(**incremental_args), + Teams(**full_refresh_args), ] diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 306065401f54..86a7f6a09695 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -22,21 +22,37 @@ # SOFTWARE. # - -import tempfile +import os +import time from abc import ABC -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from urllib import parse import requests import vcr from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream from requests.exceptions import HTTPError +from vcr.cassette import Cassette + + +def request_cache() -> Cassette: + """ + Builds VCR instance. + It deletes file everytime we create it, normally should be called only once. + We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files. + """ + filename = "request_cache.yml" + try: + os.remove(filename) + except FileNotFoundError: + pass -cache_file = tempfile.NamedTemporaryFile() + return vcr.use_cassette(str(filename), record_mode="new_episodes", serializer="yaml") class GithubStream(HttpStream, ABC): + cache = request_cache() url_base = "https://api.github.com/" primary_key = "id" @@ -44,9 +60,6 @@ class GithubStream(HttpStream, ABC): # GitHub pagination could be from 1 to 100. page_size = 100 - # Default page value for pagination. - _page = 1 - stream_base_params = {} # Fields in below variable will be used for data clearing. Put there keys which represent: @@ -62,10 +75,31 @@ def path(self, **kwargs) -> str: return f"repos/{self.repository}/{self.name}" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - response_data = response.json() - if response_data and len(response_data) == self.page_size: - self._page += 1 - return {"page": self._page} + links = response.links + if "next" in links: + next_link = links["next"]["url"] + parsed_link = parse.urlparse(next_link) + page = dict(parse.parse_qsl(parsed_link.query)).get("page") + return {"page": page} + + def should_retry(self, response: requests.Response) -> bool: + # We don't call `super()` here because we have custom error handling and GitHub API sometimes returns strange + # errors. So in `read_records()` we have custom error handling which don't require to call `super()` here. + return response.headers.get("X-RateLimit-Remaining") == "0" or response.status_code in ( + requests.codes.SERVER_ERROR, + requests.codes.BAD_GATEWAY, + ) + + def backoff_time(self, response: requests.Response) -> Optional[Union[int, float]]: + # This method is called if we run into the rate limit. GitHub limits requests to 5000 per hour and provides + # `X-RateLimit-Reset` header which contains time when this hour will be finished and limits will be reset so + # we again could have 5000 per another hour. + + if response.status_code == requests.codes.BAD_GATEWAY: + return 0.5 + + reset_time = response.headers.get("X-RateLimit-Reset") + return float(reset_time) - time.time() if reset_time else 60 def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: try: @@ -75,11 +109,17 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: # This whole try/except situation in `read_records()` isn't good but right now in `self._send_request()` # function we have `response.raise_for_status()` so we don't have much choice on how to handle errors. - # We added this try/except code because for private repositories `Teams` stream is not available and we get - # "404 Client Error: Not Found for url: https://api.github.com/orgs/sherifnada/teams?per_page=100" error. - # Blocked on https://github.com/airbytehq/airbyte/issues/3514. - if "/teams?" in error_msg: - error_msg = f"Syncing Team stream isn't available for repository {self.repository}" + # Bocked on https://github.com/airbytehq/airbyte/issues/3514. + if e.response.status_code == requests.codes.FORBIDDEN: + error_msg = ( + f"Syncing `{self.name}` stream isn't available for repository " + f"`{self.repository}` and your `access_token`, seems like you don't have permissions for " + f"this stream." + ) + elif e.response.status_code == requests.codes.NOT_FOUND and "/teams?" in error_msg: + # For private repositories `Teams` stream is not available and we get "404 Client Error: Not Found for + # url: https://api.github.com/orgs/sherifnada/teams?per_page=100" error. + error_msg = f"Syncing `Team` stream isn't available for repository `{self.repository}`." self.logger.warn(error_msg) @@ -112,7 +152,7 @@ def parse_response( for record in response.json(): # GitHub puts records in an array. yield self.transform(record=record) - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: """ Use this method to: - remove excessive fields from record; @@ -168,13 +208,20 @@ class SemiIncrementalGithubStream(GithubStream): # we should break processing records if possible. If `sort` is set to `updated` and `direction` is set to `desc` # this means that latest records will be at the beginning of the response and after we processed those latest # records we can just stop and not process other record. This will increase speed of each incremental stream - # which supports those 2 request parameters. + # which supports those 2 request parameters. Currently only `IssueMilestones` and `PullRequests` streams are + # supporting this. is_sorted_descending = False def __init__(self, start_date: str, **kwargs): super().__init__(**kwargs) self._start_date = start_date + @property + def state_checkpoint_interval(self) -> Optional[int]: + if not self.is_sorted_descending: + return self.page_size + return None + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state @@ -187,7 +234,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late return {self.repository: {self.cursor_field: state_value}} - def get_starting_point(self, stream_state: Mapping[str, Any]): + def get_starting_point(self, stream_state: Mapping[str, Any]) -> str: start_point = self._start_date if stream_state and stream_state.get(self.repository, {}).get(self.cursor_field): @@ -223,10 +270,16 @@ def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMa class Assignees(GithubStream): - pass + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-assignees + """ class Reviews(GithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request + """ + fields_to_minimize = ("user",) def path( @@ -242,15 +295,25 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: class Collaborators(GithubStream): - pass + """ + API docs: https://docs.github.com/en/rest/reference/repos#list-repository-collaborators + """ class IssueLabels(GithubStream): + """ + API docs: https://docs.github.com/en/free-pro-team@latest/rest/reference/issues#list-labels-for-a-repository + """ + def path(self, **kwargs) -> str: return f"repos/{self.repository}/labels" class Teams(GithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/teams#list-teams + """ + def path(self, **kwargs) -> str: owner, _ = self.repository.split("/") return f"orgs/{owner}/teams" @@ -260,10 +323,14 @@ def path(self, **kwargs) -> str: class Releases(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/repos#list-releases + """ + cursor_field = "created_at" fields_to_minimize = ("author",) - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: record = super().transform(record=record) assets = record.get("assets", []) @@ -275,6 +342,10 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: class Events(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/activity#list-repository-events + """ + cursor_field = "created_at" fields_to_minimize = ( "actor", @@ -284,6 +355,11 @@ class Events(SemiIncrementalGithubStream): class PullRequests(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests + """ + + page_size = 50 fields_to_minimize = ( "user", "milestone", @@ -293,38 +369,53 @@ class PullRequests(SemiIncrementalGithubStream): "requested_reviewers", "requested_teams", ) - stream_base_params = { - "state": "all", - "sort": "updated", - "direction": "desc", - } - def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: - with vcr.use_cassette(cache_file.name, record_mode="new_episodes", serializer="json"): - yield from super().read_records(**kwargs) + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._first_read = True + + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + """ + Decide if this a first read or not by the presence of the state object + """ + self._first_read = not bool(stream_state) + with self.cache: + yield from super().read_records(stream_state=stream_state, **kwargs) def path(self, **kwargs) -> str: return f"repos/{self.repository}/pulls" - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: record = super().transform(record=record) - head = record.get("head", {}) - head_user = head.pop("user", None) - head["user_id"] = head_user.get("id") if head_user else None - head_repo = head.pop("repo", None) - head["repo_id"] = head_repo.get("id") if head_repo else None - - base = record.get("base", {}) - base_user = base.pop("user", None) - base["user_id"] = base_user.get("id") if base_user else None - base_repo = base.pop("repo", None) - base["repo_id"] = base_repo.get("id") if base_repo else None + for nested in ("head", "base"): + entry = record.get(nested, {}) + entry["user_id"] = (record.get("head", {}).pop("user", {}) or {}).get("id") + entry["repo_id"] = (record.get("head", {}).pop("repo", {}) or {}).get("id") return record + def request_params(self, **kwargs) -> MutableMapping[str, Any]: + base_params = super().request_params(**kwargs) + # The very first time we read this stream we want to read ascending so we can save state in case of + # a halfway failure. But if there is state, we read descending to allow incremental behavior. + params = {"state": "all", "sort": "updated", "direction": "desc" if self.is_sorted_descending else "asc"} + + return {**base_params, **params} + + @property + def is_sorted_descending(self) -> bool: + """ + Depending if there any state we read stream in ascending or descending order. + """ + return self._first_read + class CommitComments(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/repos#list-commit-comments-for-a-repository + """ + fields_to_minimize = ("user",) def path(self, **kwargs) -> str: @@ -332,6 +423,10 @@ def path(self, **kwargs) -> str: class IssueMilestones(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-milestones + """ + is_sorted_descending = True fields_to_minimize = ("creator",) stream_base_params = { @@ -345,33 +440,48 @@ def path(self, **kwargs) -> str: class Stargazers(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/activity#list-stargazers + """ + primary_key = "user_id" cursor_field = "starred_at" fields_to_minimize = ("user",) def request_headers(self, **kwargs) -> Mapping[str, Any]: - headers = super().request_headers(**kwargs) + base_headers = super().request_headers(**kwargs) # We need to send below header if we want to get `starred_at` field. See docs (Alternative response with # star creation timestamps) - https://docs.github.com/en/rest/reference/activity#list-stargazers. - headers["Accept"] = "application/vnd.github.v3.star+json" - return headers + headers = {"Accept": "application/vnd.github.v3.star+json"} + + return {**base_headers, **headers} class Projects(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/projects#list-repository-projects + """ + fields_to_minimize = ("creator",) stream_base_params = { "state": "all", } def request_headers(self, **kwargs) -> Mapping[str, Any]: - headers = super().request_headers(**kwargs) + base_headers = super().request_headers(**kwargs) # Projects stream requires sending following `Accept` header. If we won't sent it # we'll get `415 Client Error: Unsupported Media Type` error. - headers["Accept"] = "application/vnd.github.inertia-preview+json" - return headers + headers = {"Accept": "application/vnd.github.inertia-preview+json"} + + return {**base_headers, **headers} class IssueEvents(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-issue-events-for-a-repository + """ + + cursor_field = "created_at" fields_to_minimize = ( "actor", "issue", @@ -385,17 +495,22 @@ def path(self, **kwargs) -> str: class Comments(IncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-issue-comments-for-a-repository + """ + fields_to_minimize = ("user",) - stream_base_params = { - "sort": "updated", - "direction": "desc", - } + page_size = 30 # `comments` is a large stream so it's better to set smaller page size. def path(self, **kwargs) -> str: return f"repos/{self.repository}/issues/comments" class Commits(IncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-issue-comments-for-a-repository + """ + primary_key = "sha" cursor_field = "created_at" fields_to_minimize = ( @@ -403,7 +518,7 @@ class Commits(IncrementalGithubStream): "committer", ) - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: record = super().transform(record=record) # Record of the `commits` stream doesn't have an updated_at/created_at field at the top level (so we could @@ -416,6 +531,12 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: class Issues(IncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-repository-issues + """ + + page_size = 50 # `issues` is a large stream so it's better to set smaller page size. + fields_to_minimize = ( "user", "assignee", @@ -426,5 +547,5 @@ class Issues(IncrementalGithubStream): stream_base_params = { "state": "all", "sort": "updated", - "direction": "desc", + "direction": "asc", } diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index 7a866e1071b1..06feb56da3c7 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -28,16 +28,23 @@ This connector outputs the following incremental streams: * [Releases](https://docs.github.com/en/rest/reference/repos#list-releases) * [Stargazers](https://docs.github.com/en/rest/reference/activity#list-stargazers) -**Note:** Only 3 streams from above 11 incremental streams (`comments`, `commits` and `issues`) are pure incremental +### Notes + +1. Only 3 streams from above 11 incremental streams (`comments`, `commits` and `issues`) are pure incremental meaning that they: -- read only new records; -- output only new records. + - read only new records; + - output only new records. + + Other 8 incremental streams are also incremental but with one difference, they: + - read all records; + - output only new records. -Other 8 incremental streams are also incremental but with one difference, they: -- read all records; -- output only new records. + Please, consider this behaviour when using those 8 incremental streams because it may affect you API call limits. -Please, consider this behaviour when using those 8 incremental streams because it may affect you API call limits. +1. We are passing few parameters (`since`, `sort` and `direction`) to GitHub in order to filter records and sometimes + for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub + instead of records (respective `WARN` log message will be outputted). In this case Specifying more recent + `start_date` may help. ### Features @@ -76,5 +83,6 @@ Your token should have at least the `repo` scope. Depending on which streams you | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.1.2 | 2021-07-13 | [4708](https://github.com/airbytehq/airbyte/pull/4708) | Fix bug with IssueEvents stream and add handling for rate limiting | | 0.1.1 | 2021-07-07 | [4590](https://github.com/airbytehq/airbyte/pull/4590) | Fix schema in the `pull_request` stream | | 0.1.0 | 2021-07-06 | [4174](https://github.com/airbytehq/airbyte/pull/4174) | New Source: GitHub |