From 315c8c579d7c86dff595ffa1334d5ff61476b0ae Mon Sep 17 00:00:00 2001 From: oleh Date: Mon, 12 Jul 2021 23:44:10 +0300 Subject: [PATCH 01/13] Few updates for GitHub source Set correct `cursor_field` for `IssueEvents` stream. Add rate limit handling. Add handling for 403 error. Add handling for 502 error. --- .../source-github/source_github/streams.py | 53 +++++++++++++++++-- docs/integrations/sources/github.md | 22 +++++--- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 306065401f54..26a9d2cdce0e 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -24,6 +24,7 @@ import tempfile +import time from abc import ABC from typing import Any, Iterable, List, Mapping, MutableMapping, Optional @@ -67,6 +68,19 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, self._page += 1 return {"page": self._page} + def should_retry(self, response: requests.Response) -> bool: + # We don't call `super()` here because we have custom error handling and GitHub API sometimes returns strange + # errors. So in `read_records()` we have custom error handling which don't require to call `super()` here. + return response.headers.get("X-RateLimit-Remaining") == "0" + + def backoff_time(self, response: requests.Response) -> Optional[int]: + # This method is called if we run into the rate limit. GitHub limits requests to 5000 per hour and provides + # `X-RateLimit-Reset` header which contains time when this hour will be finished and limits will be reset so + # we again could have 5000 per another hour. + reset_time = int(response.headers.get("X-RateLimit-Reset", time.time() + 60)) + time_now = int(time.time()) + return reset_time - time_now + def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: try: yield from super().read_records(**kwargs) @@ -75,11 +89,34 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: # This whole try/except situation in `read_records()` isn't good but right now in `self._send_request()` # function we have `response.raise_for_status()` so we don't have much choice on how to handle errors. - # We added this try/except code because for private repositories `Teams` stream is not available and we get - # "404 Client Error: Not Found for url: https://api.github.com/orgs/sherifnada/teams?per_page=100" error. - # Blocked on https://github.com/airbytehq/airbyte/issues/3514. - if "/teams?" in error_msg: - error_msg = f"Syncing Team stream isn't available for repository {self.repository}" + # Bocked on https://github.com/airbytehq/airbyte/issues/3514. + if "403 Client Error" in error_msg: + error_msg = ( + f"Syncing `{self.__class__.__name__}` stream isn't available for repository " + f"`{self.repository}` and your `access_token`, seems like you don't have permissions for " + f"this stream." + ) + elif "404 Client Error" in error_msg and "/teams?" in error_msg: + # For private repositories `Teams` stream is not available and we get "404 Client Error: Not Found for + # url: https://api.github.com/orgs/sherifnada/teams?per_page=100" error. + error_msg = f"Syncing `Team` stream isn't available for repository `{self.repository}`." + elif "502 Server Error" in error_msg: + # Sometimes we may receive "502 Server Error: Bad Gateway for url:" errors. Normal behaviour in to just + # repeat request later, but not with GitHub API because we may create an infinite loop. For large + # streams (like `comments`) we are providing following parameters: `since`, `sort` and `direction`. + # 502 error can occur when you specify very distant start_date in the past. For example, if you try to + # do request to the following url: + # https://api.github.com/repos/apache/superset/issues/comments?per_page=100&page=1&since=2015-01-01T00:00:00Z&sort=updated&direction=desc + # you will be getting `502 Server Error` in 95% of responses. This is because `since` value is very + # distant in the past and apparently GitHub can't handle it. So since we handle streams synchronously + # we may stuck in this loop for a long time (hours). That's why it's better to skip stream and show + # message to user explaining what went wrong. Also explanation will be added to documentation for + # GitHub connector. + error_msg = ( + f"Syncing `{self.__class__.__name__}` stream isn't available right now. We got 502 error " + f"code from GitHub API meaning that GitHub has some issues while processing request. " + f"You may try again later or specify more recent `start_date` parameter." + ) self.logger.warn(error_msg) @@ -282,6 +319,11 @@ class Events(SemiIncrementalGithubStream): "org", ) + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + link = response.headers.get("Link") + if 'rel="next"' in link: + return super().next_page_token(response=response) + class PullRequests(SemiIncrementalGithubStream): fields_to_minimize = ( @@ -372,6 +414,7 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]: class IssueEvents(SemiIncrementalGithubStream): + cursor_field = "created_at" fields_to_minimize = ( "actor", "issue", diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index 4ddbb9e0fba1..759d9b58ba8c 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -30,16 +30,23 @@ This connector outputs the following incremental streams: * [Releases](https://docs.github.com/en/rest/reference/repos#list-releases) * [Stargazers](https://docs.github.com/en/rest/reference/activity#list-stargazers) -**Note:** Only 3 streams from above 11 incremental streams (`comments`, `commits` and `issues`) are pure incremental +### Notes + +1. Only 3 streams from above 11 incremental streams (`comments`, `commits` and `issues`) are pure incremental meaning that they: -- read only new records; -- output only new records. + - read only new records; + - output only new records. + + Other 8 incremental streams are also incremental but with one difference, they: + - read all records; + - output only new records. -Other 8 incremental streams are also incremental but with one difference, they: -- read all records; -- output only new records. + Please, consider this behaviour when using those 8 incremental streams because it may affect you API call limits. -Please, consider this behaviour when using those 8 incremental streams because it may affect you API call limits. +1. We are passing few parameters (`since`, `sort` and `direction`) to GitHub in order to filter records and sometimes + for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub + instead of records (respective `WARN` log message will be outputted). In this case Specifying more recent + `start_date` may help. ### Features @@ -78,5 +85,6 @@ Your token should have at least the `repo` scope. Depending on which streams you | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.1.2 | 2021-07-13 | [](https://github.com/airbytehq/airbyte/pull/) | Fix bugs | | 0.1.1 | 2021-07-07 | [4590](https://github.com/airbytehq/airbyte/pull/4590) | Fix schema in the `pull_request` stream | | 0.1.0 | 2021-07-06 | [4174](https://github.com/airbytehq/airbyte/pull/4174) | New Source: GitHub | From 38373365820ab9b14d4d76e4309e49a3fba0a36a Mon Sep 17 00:00:00 2001 From: oleh Date: Mon, 12 Jul 2021 23:49:25 +0300 Subject: [PATCH 02/13] Update `github.md` file --- docs/integrations/sources/github.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index 759d9b58ba8c..06d3d0bf5694 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -85,6 +85,6 @@ Your token should have at least the `repo` scope. Depending on which streams you | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| 0.1.2 | 2021-07-13 | [](https://github.com/airbytehq/airbyte/pull/) | Fix bugs | +| 0.1.2 | 2021-07-13 | [4708](https://github.com/airbytehq/airbyte/pull/4708) | Fix bug with IssueEvents stream and add handling for rate limiting | | 0.1.1 | 2021-07-07 | [4590](https://github.com/airbytehq/airbyte/pull/4590) | Fix schema in the `pull_request` stream | | 0.1.0 | 2021-07-06 | [4174](https://github.com/airbytehq/airbyte/pull/4174) | New Source: GitHub | From 0e6e00af535e96059ccab55435f23b3509898410 Mon Sep 17 00:00:00 2001 From: oleh Date: Wed, 14 Jul 2021 19:58:03 +0300 Subject: [PATCH 03/13] Implement change request --- .../source-github/source_github/source.py | 25 ++- .../source-github/source_github/streams.py | 173 +++++++++++++----- 2 files changed, 145 insertions(+), 53 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/source.py b/airbyte-integrations/connectors/source-github/source_github/source.py index ce0b36cbc1cb..6e29bc152e05 100644 --- a/airbyte-integrations/connectors/source-github/source_github/source.py +++ b/airbyte-integrations/connectors/source-github/source_github/source.py @@ -23,10 +23,10 @@ # -from typing import Any, List, Mapping, Tuple +from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple from airbyte_cdk import AirbyteLogger -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator @@ -43,7 +43,8 @@ IssueMilestones, Issues, Projects, - PullRequests, + PullRequestsAsc, + PullRequestsDesc, Releases, Reviews, Stargazers, @@ -52,6 +53,17 @@ class SourceGithub(AbstractSource): + def __init__(self): + self._first_run_for_pull_requests_stream = True + + def read( + self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + ) -> Iterator[AirbyteMessage]: + if "pull_requests" in state and state["pull_requests"].get(config["repository"]) is not None: + self._first_run_for_pull_requests_stream = False + + yield from super().read(logger=logger, config=config, catalog=catalog, state=state) + def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: try: authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token") @@ -65,6 +77,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token") full_refresh_args = {"authenticator": authenticator, "repository": config["repository"]} incremental_args = {"authenticator": authenticator, "repository": config["repository"], "start_date": config["start_date"]} + + pull_requests_class = PullRequestsAsc if self._first_run_for_pull_requests_stream is True else PullRequestsDesc + pull_requests_stream = pull_requests_class(**incremental_args) + pull_requests_stream.name = "pull_requests" + return [ Assignees(**full_refresh_args), Reviews(**full_refresh_args), @@ -74,7 +91,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Releases(**incremental_args), Events(**incremental_args), Comments(**incremental_args), - PullRequests(**incremental_args), + pull_requests_stream, CommitComments(**incremental_args), IssueMilestones(**incremental_args), Commits(**incremental_args), diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 26a9d2cdce0e..1a0ec5a3965c 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -26,10 +26,9 @@ import tempfile import time from abc import ABC -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import requests -import vcr from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream from requests.exceptions import HTTPError @@ -63,20 +62,29 @@ def path(self, **kwargs) -> str: return f"repos/{self.repository}/{self.name}" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - response_data = response.json() - if response_data and len(response_data) == self.page_size: - self._page += 1 - return {"page": self._page} + link = response.headers.get("Link") + if link and 'rel="next"' in link: + response_data = response.json() + if response_data and len(response_data) == self.page_size: + self._page += 1 + return {"page": self._page} def should_retry(self, response: requests.Response) -> bool: # We don't call `super()` here because we have custom error handling and GitHub API sometimes returns strange # errors. So in `read_records()` we have custom error handling which don't require to call `super()` here. - return response.headers.get("X-RateLimit-Remaining") == "0" + return response.headers.get("X-RateLimit-Remaining") == "0" or response.status_code in ( + 500, + 502, + ) - def backoff_time(self, response: requests.Response) -> Optional[int]: + def backoff_time(self, response: requests.Response) -> Optional[Union[int, float]]: # This method is called if we run into the rate limit. GitHub limits requests to 5000 per hour and provides # `X-RateLimit-Reset` header which contains time when this hour will be finished and limits will be reset so # we again could have 5000 per another hour. + + if response.status_code == 502: + return 0.5 + reset_time = int(response.headers.get("X-RateLimit-Reset", time.time() + 60)) time_now = int(time.time()) return reset_time - time_now @@ -90,33 +98,16 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: # This whole try/except situation in `read_records()` isn't good but right now in `self._send_request()` # function we have `response.raise_for_status()` so we don't have much choice on how to handle errors. # Bocked on https://github.com/airbytehq/airbyte/issues/3514. - if "403 Client Error" in error_msg: + if e.response.status_code == 403: error_msg = ( f"Syncing `{self.__class__.__name__}` stream isn't available for repository " f"`{self.repository}` and your `access_token`, seems like you don't have permissions for " f"this stream." ) - elif "404 Client Error" in error_msg and "/teams?" in error_msg: + elif e.response.status_code == 404 and "/teams?" in error_msg: # For private repositories `Teams` stream is not available and we get "404 Client Error: Not Found for # url: https://api.github.com/orgs/sherifnada/teams?per_page=100" error. error_msg = f"Syncing `Team` stream isn't available for repository `{self.repository}`." - elif "502 Server Error" in error_msg: - # Sometimes we may receive "502 Server Error: Bad Gateway for url:" errors. Normal behaviour in to just - # repeat request later, but not with GitHub API because we may create an infinite loop. For large - # streams (like `comments`) we are providing following parameters: `since`, `sort` and `direction`. - # 502 error can occur when you specify very distant start_date in the past. For example, if you try to - # do request to the following url: - # https://api.github.com/repos/apache/superset/issues/comments?per_page=100&page=1&since=2015-01-01T00:00:00Z&sort=updated&direction=desc - # you will be getting `502 Server Error` in 95% of responses. This is because `since` value is very - # distant in the past and apparently GitHub can't handle it. So since we handle streams synchronously - # we may stuck in this loop for a long time (hours). That's why it's better to skip stream and show - # message to user explaining what went wrong. Also explanation will be added to documentation for - # GitHub connector. - error_msg = ( - f"Syncing `{self.__class__.__name__}` stream isn't available right now. We got 502 error " - f"code from GitHub API meaning that GitHub has some issues while processing request. " - f"You may try again later or specify more recent `start_date` parameter." - ) self.logger.warn(error_msg) @@ -205,7 +196,8 @@ class SemiIncrementalGithubStream(GithubStream): # we should break processing records if possible. If `sort` is set to `updated` and `direction` is set to `desc` # this means that latest records will be at the beginning of the response and after we processed those latest # records we can just stop and not process other record. This will increase speed of each incremental stream - # which supports those 2 request parameters. + # which supports those 2 request parameters. Currently only `IssueMilestones` and `PullRequests` streams are + # supporting this. is_sorted_descending = False def __init__(self, start_date: str, **kwargs): @@ -260,10 +252,16 @@ def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMa class Assignees(GithubStream): - pass + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-assignees + """ class Reviews(GithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request + """ + fields_to_minimize = ("user",) def path( @@ -273,21 +271,31 @@ def path( return f"repos/{self.repository}/pulls/{pull_request_number}/reviews" def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - pull_requests_stream = PullRequests(authenticator=self.authenticator, repository=self.repository, start_date="") + pull_requests_stream = PullRequestsAsc(authenticator=self.authenticator, repository=self.repository, start_date="") for pull_request in pull_requests_stream.read_records(sync_mode=SyncMode.full_refresh): yield {"pull_request_number": pull_request["number"]} class Collaborators(GithubStream): - pass + """ + API docs: https://docs.github.com/en/rest/reference/repos#list-repository-collaborators + """ class IssueLabels(GithubStream): + """ + API docs: https://docs.github.com/en/free-pro-team@latest/rest/reference/issues#list-labels-for-a-repository + """ + def path(self, **kwargs) -> str: return f"repos/{self.repository}/labels" class Teams(GithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/teams#list-teams + """ + def path(self, **kwargs) -> str: owner, _ = self.repository.split("/") return f"orgs/{owner}/teams" @@ -297,6 +305,10 @@ def path(self, **kwargs) -> str: class Releases(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/repos#list-releases + """ + cursor_field = "created_at" fields_to_minimize = ("author",) @@ -312,6 +324,10 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: class Events(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/activity#list-repository-events + """ + cursor_field = "created_at" fields_to_minimize = ( "actor", @@ -319,13 +335,14 @@ class Events(SemiIncrementalGithubStream): "org", ) - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - link = response.headers.get("Link") - if 'rel="next"' in link: - return super().next_page_token(response=response) +class PullRequestsBase(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests + """ -class PullRequests(SemiIncrementalGithubStream): + name = "pull_requests" + page_size = 50 fields_to_minimize = ( "user", "milestone", @@ -335,15 +352,13 @@ class PullRequests(SemiIncrementalGithubStream): "requested_reviewers", "requested_teams", ) - stream_base_params = { - "state": "all", - "sort": "updated", - "direction": "desc", - } - def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: - with vcr.use_cassette(cache_file.name, record_mode="new_episodes", serializer="json"): - yield from super().read_records(**kwargs) + # TODO Fix vcr error: + # UnicodeDecodeError: 'utf-8' codec can't decode byte 0x72 in position 1: invalid start byteDoes this HTTP + # interaction contain binary data? If so, use a different serializer (like the yaml serializer) for this request? + # def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: + # with vcr.use_cassette(cache_file.name, record_mode="new_episodes", serializer="json"): + # yield from super().read_records(**kwargs) def path(self, **kwargs) -> str: return f"repos/{self.repository}/pulls" @@ -366,7 +381,38 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: return record +class PullRequestsAsc(PullRequestsBase): + """ + API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests + This class is used when this is the first sync (we don't have state yet). + """ + + state_checkpoint_interval = PullRequestsBase.page_size + stream_base_params = { + "state": "all", + "sort": "updated", + } + + +class PullRequestsDesc(PullRequestsBase): + """ + API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests + This class is used when this is not the first sync (we already have state). + """ + + is_sorted_descending = True + stream_base_params = { + "state": "all", + "sort": "updated", + "direction": "desc", + } + + class CommitComments(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/repos#list-commit-comments-for-a-repository + """ + fields_to_minimize = ("user",) def path(self, **kwargs) -> str: @@ -374,6 +420,11 @@ def path(self, **kwargs) -> str: class IssueMilestones(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-milestones + """ + + state_checkpoint_interval = SemiIncrementalGithubStream.page_size is_sorted_descending = True fields_to_minimize = ("creator",) stream_base_params = { @@ -387,6 +438,10 @@ def path(self, **kwargs) -> str: class Stargazers(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/activity#list-stargazers + """ + primary_key = "user_id" cursor_field = "starred_at" fields_to_minimize = ("user",) @@ -400,6 +455,10 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]: class Projects(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/projects#list-repository-projects + """ + fields_to_minimize = ("creator",) stream_base_params = { "state": "all", @@ -414,6 +473,10 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]: class IssueEvents(SemiIncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-issue-events-for-a-repository + """ + cursor_field = "created_at" fields_to_minimize = ( "actor", @@ -428,17 +491,23 @@ def path(self, **kwargs) -> str: class Comments(IncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-issue-comments-for-a-repository + """ + fields_to_minimize = ("user",) - stream_base_params = { - "sort": "updated", - "direction": "desc", - } + page_size = 30 # `comments` is a large stream so it's better to set smaller page size. + state_checkpoint_interval = page_size def path(self, **kwargs) -> str: return f"repos/{self.repository}/issues/comments" class Commits(IncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-issue-comments-for-a-repository + """ + primary_key = "sha" cursor_field = "created_at" fields_to_minimize = ( @@ -459,6 +528,12 @@ def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: class Issues(IncrementalGithubStream): + """ + API docs: https://docs.github.com/en/rest/reference/issues#list-repository-issues + """ + + page_size = 50 # `issues` is a large stream so it's better to set smaller page size. + state_checkpoint_interval = page_size fields_to_minimize = ( "user", "assignee", @@ -469,5 +544,5 @@ class Issues(IncrementalGithubStream): stream_base_params = { "state": "all", "sort": "updated", - "direction": "desc", + "direction": "asc", } From 7e9926375dbac9c3cb0698d2db4c30075c683f2b Mon Sep 17 00:00:00 2001 From: oleh Date: Wed, 14 Jul 2021 21:31:07 +0300 Subject: [PATCH 04/13] Implement requested changes --- .../source-github/source_github/streams.py | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 1a0ec5a3965c..295bd29941b2 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -27,6 +27,7 @@ import time from abc import ABC from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from urllib import parse import requests from airbyte_cdk.models import SyncMode @@ -44,9 +45,6 @@ class GithubStream(HttpStream, ABC): # GitHub pagination could be from 1 to 100. page_size = 100 - # Default page value for pagination. - _page = 1 - stream_base_params = {} # Fields in below variable will be used for data clearing. Put there keys which represent: @@ -62,19 +60,19 @@ def path(self, **kwargs) -> str: return f"repos/{self.repository}/{self.name}" def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - link = response.headers.get("Link") - if link and 'rel="next"' in link: - response_data = response.json() - if response_data and len(response_data) == self.page_size: - self._page += 1 - return {"page": self._page} + links = response.links + if "next" in links: + next_link = links["next"]["url"] + parsed_link = parse.urlparse(next_link) + page = dict(parse.parse_qsl(parsed_link.query)).get("page") + return {"page": page} def should_retry(self, response: requests.Response) -> bool: # We don't call `super()` here because we have custom error handling and GitHub API sometimes returns strange # errors. So in `read_records()` we have custom error handling which don't require to call `super()` here. return response.headers.get("X-RateLimit-Remaining") == "0" or response.status_code in ( - 500, - 502, + requests.codes.SERVER_ERROR, + requests.codes.BAD_GATEWAY, ) def backoff_time(self, response: requests.Response) -> Optional[Union[int, float]]: @@ -82,12 +80,11 @@ def backoff_time(self, response: requests.Response) -> Optional[Union[int, float # `X-RateLimit-Reset` header which contains time when this hour will be finished and limits will be reset so # we again could have 5000 per another hour. - if response.status_code == 502: + if response.status_code == requests.codes.BAD_GATEWAY: return 0.5 - reset_time = int(response.headers.get("X-RateLimit-Reset", time.time() + 60)) - time_now = int(time.time()) - return reset_time - time_now + reset_time = response.headers.get("X-RateLimit-Reset") + return float(reset_time) - time.time() if reset_time else 60 def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: try: @@ -98,13 +95,13 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: # This whole try/except situation in `read_records()` isn't good but right now in `self._send_request()` # function we have `response.raise_for_status()` so we don't have much choice on how to handle errors. # Bocked on https://github.com/airbytehq/airbyte/issues/3514. - if e.response.status_code == 403: + if e.response.status_code == requests.codes.FORBIDDEN: error_msg = ( f"Syncing `{self.__class__.__name__}` stream isn't available for repository " f"`{self.repository}` and your `access_token`, seems like you don't have permissions for " f"this stream." ) - elif e.response.status_code == 404 and "/teams?" in error_msg: + elif e.response.status_code == requests.codes.NOT_FOUND and "/teams?" in error_msg: # For private repositories `Teams` stream is not available and we get "404 Client Error: Not Found for # url: https://api.github.com/orgs/sherifnada/teams?per_page=100" error. error_msg = f"Syncing `Team` stream isn't available for repository `{self.repository}`." From bb549f1766b0d250c302a03cb99993d876b7efa9 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Wed, 14 Jul 2021 23:10:56 +0300 Subject: [PATCH 05/13] apply suggestions from @keu --- .../source-github/source_github/source.py | 44 +++----- .../source-github/source_github/streams.py | 106 +++++++++--------- 2 files changed, 66 insertions(+), 84 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/source.py b/airbyte-integrations/connectors/source-github/source_github/source.py index 6e29bc152e05..af5f9f9fabba 100644 --- a/airbyte-integrations/connectors/source-github/source_github/source.py +++ b/airbyte-integrations/connectors/source-github/source_github/source.py @@ -23,10 +23,10 @@ # -from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple +from typing import Any, List, Mapping, Tuple from airbyte_cdk import AirbyteLogger -from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator @@ -43,8 +43,7 @@ IssueMilestones, Issues, Projects, - PullRequestsAsc, - PullRequestsDesc, + PullRequests, Releases, Reviews, Stargazers, @@ -53,17 +52,6 @@ class SourceGithub(AbstractSource): - def __init__(self): - self._first_run_for_pull_requests_stream = True - - def read( - self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None - ) -> Iterator[AirbyteMessage]: - if "pull_requests" in state and state["pull_requests"].get(config["repository"]) is not None: - self._first_run_for_pull_requests_stream = False - - yield from super().read(logger=logger, config=config, catalog=catalog, state=state) - def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: try: authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token") @@ -76,27 +64,23 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> def streams(self, config: Mapping[str, Any]) -> List[Stream]: authenticator = TokenAuthenticator(token=config["access_token"], auth_method="token") full_refresh_args = {"authenticator": authenticator, "repository": config["repository"]} - incremental_args = {"authenticator": authenticator, "repository": config["repository"], "start_date": config["start_date"]} - - pull_requests_class = PullRequestsAsc if self._first_run_for_pull_requests_stream is True else PullRequestsDesc - pull_requests_stream = pull_requests_class(**incremental_args) - pull_requests_stream.name = "pull_requests" + incremental_args = {**full_refresh_args, "start_date": config["start_date"]} return [ Assignees(**full_refresh_args), - Reviews(**full_refresh_args), Collaborators(**full_refresh_args), - Teams(**full_refresh_args), - IssueLabels(**full_refresh_args), - Releases(**incremental_args), - Events(**incremental_args), Comments(**incremental_args), - pull_requests_stream, CommitComments(**incremental_args), - IssueMilestones(**incremental_args), Commits(**incremental_args), - Stargazers(**incremental_args), - Projects(**incremental_args), - Issues(**incremental_args), + Events(**incremental_args), IssueEvents(**incremental_args), + IssueLabels(**full_refresh_args), + IssueMilestones(**incremental_args), + Issues(**incremental_args), + Projects(**incremental_args), + PullRequests(**incremental_args), + Releases(**incremental_args), + Reviews(**full_refresh_args), + Stargazers(**incremental_args), + Teams(**full_refresh_args), ] diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 295bd29941b2..60dcb17f5d65 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -137,7 +137,7 @@ def parse_response( for record in response.json(): # GitHub puts records in an array. yield self.transform(record=record) - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: """ Use this method to: - remove excessive fields from record; @@ -201,6 +201,12 @@ def __init__(self, start_date: str, **kwargs): super().__init__(**kwargs) self._start_date = start_date + @property + def state_checkpoint_interval(self) -> Optional[int]: + if not self.is_sorted_descending: + return self.page_size + return None + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state @@ -213,7 +219,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late return {self.repository: {self.cursor_field: state_value}} - def get_starting_point(self, stream_state: Mapping[str, Any]): + def get_starting_point(self, stream_state: Mapping[str, Any]) -> str: start_point = self._start_date if stream_state and stream_state.get(self.repository, {}).get(self.cursor_field): @@ -268,7 +274,7 @@ def path( return f"repos/{self.repository}/pulls/{pull_request_number}/reviews" def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - pull_requests_stream = PullRequestsAsc(authenticator=self.authenticator, repository=self.repository, start_date="") + pull_requests_stream = PullRequests(authenticator=self.authenticator, repository=self.repository, start_date="") for pull_request in pull_requests_stream.read_records(sync_mode=SyncMode.full_refresh): yield {"pull_request_number": pull_request["number"]} @@ -309,7 +315,7 @@ class Releases(SemiIncrementalGithubStream): cursor_field = "created_at" fields_to_minimize = ("author",) - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: record = super().transform(record=record) assets = record.get("assets", []) @@ -333,12 +339,11 @@ class Events(SemiIncrementalGithubStream): ) -class PullRequestsBase(SemiIncrementalGithubStream): +class PullRequests(SemiIncrementalGithubStream): """ API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests """ - name = "pull_requests" page_size = 50 fields_to_minimize = ( "user", @@ -350,6 +355,10 @@ class PullRequestsBase(SemiIncrementalGithubStream): "requested_teams", ) + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._first_read = True + # TODO Fix vcr error: # UnicodeDecodeError: 'utf-8' codec can't decode byte 0x72 in position 1: invalid start byteDoes this HTTP # interaction contain binary data? If so, use a different serializer (like the yaml serializer) for this request? @@ -357,52 +366,42 @@ class PullRequestsBase(SemiIncrementalGithubStream): # with vcr.use_cassette(cache_file.name, record_mode="new_episodes", serializer="json"): # yield from super().read_records(**kwargs) + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + """ + Decide if this a first read or not by the presence of the state object + """ + self._first_read = not bool(stream_state) + yield from super().read_records(stream_state=stream_state, **kwargs) + def path(self, **kwargs) -> str: return f"repos/{self.repository}/pulls" - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: record = super().transform(record=record) - head = record.get("head", {}) - head_user = head.pop("user", None) - head["user_id"] = head_user.get("id") if head_user else None - head_repo = head.pop("repo", None) - head["repo_id"] = head_repo.get("id") if head_repo else None - - base = record.get("base", {}) - base_user = base.pop("user", None) - base["user_id"] = base_user.get("id") if base_user else None - base_repo = base.pop("repo", None) - base["repo_id"] = base_repo.get("id") if base_repo else None + for nested in ("head", "base"): + entry = record.get(nested, {}) + entry["user_id"] = record.get("head", {}).pop("user", {}).get("id") + entry["repo_id"] = record.get("head", {}).pop("repo", {}).get("id") return record + def request_params(self, **kwargs) -> MutableMapping[str, Any]: + base_params = super().request_params(**kwargs) + params = { + "state": "all", + "sort": "updated", + "direction": "desc" if self.is_sorted_descending else "asc" + } -class PullRequestsAsc(PullRequestsBase): - """ - API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests - This class is used when this is the first sync (we don't have state yet). - """ - - state_checkpoint_interval = PullRequestsBase.page_size - stream_base_params = { - "state": "all", - "sort": "updated", - } - - -class PullRequestsDesc(PullRequestsBase): - """ - API docs: https://docs.github.com/en/rest/reference/pulls#list-pull-requests - This class is used when this is not the first sync (we already have state). - """ + return {**base_params, **params} - is_sorted_descending = True - stream_base_params = { - "state": "all", - "sort": "updated", - "direction": "desc", - } + @property + def is_sorted_descending(self) -> bool: + """ + Depending if there any state we read stream in ascending or descending order. + """ + return self._first_read class CommitComments(SemiIncrementalGithubStream): @@ -421,13 +420,11 @@ class IssueMilestones(SemiIncrementalGithubStream): API docs: https://docs.github.com/en/rest/reference/issues#list-milestones """ - state_checkpoint_interval = SemiIncrementalGithubStream.page_size - is_sorted_descending = True fields_to_minimize = ("creator",) stream_base_params = { "state": "all", "sort": "updated", - "direction": "desc", + "direction": "asc", } def path(self, **kwargs) -> str: @@ -444,11 +441,12 @@ class Stargazers(SemiIncrementalGithubStream): fields_to_minimize = ("user",) def request_headers(self, **kwargs) -> Mapping[str, Any]: - headers = super().request_headers(**kwargs) + base_headers = super().request_headers(**kwargs) # We need to send below header if we want to get `starred_at` field. See docs (Alternative response with # star creation timestamps) - https://docs.github.com/en/rest/reference/activity#list-stargazers. - headers["Accept"] = "application/vnd.github.v3.star+json" - return headers + headers = {"Accept": "application/vnd.github.v3.star+json"} + + return {**base_headers, **headers} class Projects(SemiIncrementalGithubStream): @@ -462,11 +460,12 @@ class Projects(SemiIncrementalGithubStream): } def request_headers(self, **kwargs) -> Mapping[str, Any]: - headers = super().request_headers(**kwargs) + base_headers = super().request_headers(**kwargs) # Projects stream requires sending following `Accept` header. If we won't sent it # we'll get `415 Client Error: Unsupported Media Type` error. - headers["Accept"] = "application/vnd.github.inertia-preview+json" - return headers + headers = {"Accept": "application/vnd.github.inertia-preview+json"} + + return {**base_headers, **headers} class IssueEvents(SemiIncrementalGithubStream): @@ -494,7 +493,6 @@ class Comments(IncrementalGithubStream): fields_to_minimize = ("user",) page_size = 30 # `comments` is a large stream so it's better to set smaller page size. - state_checkpoint_interval = page_size def path(self, **kwargs) -> str: return f"repos/{self.repository}/issues/comments" @@ -512,7 +510,7 @@ class Commits(IncrementalGithubStream): "committer", ) - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: record = super().transform(record=record) # Record of the `commits` stream doesn't have an updated_at/created_at field at the top level (so we could @@ -530,7 +528,7 @@ class Issues(IncrementalGithubStream): """ page_size = 50 # `issues` is a large stream so it's better to set smaller page size. - state_checkpoint_interval = page_size + fields_to_minimize = ( "user", "assignee", From badafc4f2db623f565e9caa8dd39ea22cd15172d Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Wed, 14 Jul 2021 23:26:15 +0300 Subject: [PATCH 06/13] format --- .../connectors/source-github/source_github/streams.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 60dcb17f5d65..19ab785f634d 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -388,11 +388,7 @@ def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any def request_params(self, **kwargs) -> MutableMapping[str, Any]: base_params = super().request_params(**kwargs) - params = { - "state": "all", - "sort": "updated", - "direction": "desc" if self.is_sorted_descending else "asc" - } + params = {"state": "all", "sort": "updated", "direction": "desc" if self.is_sorted_descending else "asc"} return {**base_params, **params} From b40416462445c17d1c9a5d1966924439efaa28cd Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Wed, 14 Jul 2021 13:58:05 -0700 Subject: [PATCH 07/13] Update airbyte-integrations/connectors/source-github/source_github/streams.py --- .../connectors/source-github/source_github/streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 19ab785f634d..e4120a1d551a 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -388,6 +388,8 @@ def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any def request_params(self, **kwargs) -> MutableMapping[str, Any]: base_params = super().request_params(**kwargs) + # The very first time we read this stream we want to read ascending so we can save state in case of + # a halfway failure. But if there is state, we read descending to allow incremental behavior. params = {"state": "all", "sort": "updated", "direction": "desc" if self.is_sorted_descending else "asc"} return {**base_params, **params} From d1537243df7d779691ecc5c3742949a30595cfb9 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Wed, 14 Jul 2021 23:58:17 +0300 Subject: [PATCH 08/13] re-think the decision about direction of the read --- .../connectors/source-github/source_github/streams.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index e4120a1d551a..a710302eea64 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -418,11 +418,12 @@ class IssueMilestones(SemiIncrementalGithubStream): API docs: https://docs.github.com/en/rest/reference/issues#list-milestones """ + is_sorted_descending = True fields_to_minimize = ("creator",) stream_base_params = { "state": "all", "sort": "updated", - "direction": "asc", + "direction": "desc", } def path(self, **kwargs) -> str: From 3361a424cb65ca56e27d4709d6ca3eebc8dc5b8d Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 15 Jul 2021 00:03:43 +0300 Subject: [PATCH 09/13] groom --- .../connectors/source-github/source_github/streams.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index a710302eea64..78d76638ab81 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -97,7 +97,7 @@ def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: # Bocked on https://github.com/airbytehq/airbyte/issues/3514. if e.response.status_code == requests.codes.FORBIDDEN: error_msg = ( - f"Syncing `{self.__class__.__name__}` stream isn't available for repository " + f"Syncing `{self.name}` stream isn't available for repository " f"`{self.repository}` and your `access_token`, seems like you don't have permissions for " f"this stream." ) @@ -388,8 +388,8 @@ def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any def request_params(self, **kwargs) -> MutableMapping[str, Any]: base_params = super().request_params(**kwargs) - # The very first time we read this stream we want to read ascending so we can save state in case of - # a halfway failure. But if there is state, we read descending to allow incremental behavior. + # The very first time we read this stream we want to read ascending so we can save state in case of + # a halfway failure. But if there is state, we read descending to allow incremental behavior. params = {"state": "all", "sort": "updated", "direction": "desc" if self.is_sorted_descending else "asc"} return {**base_params, **params} From ce340187809aa06661477a622cd6502b585f6431 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 15 Jul 2021 02:36:26 +0300 Subject: [PATCH 10/13] fix VCR cache --- .../source-github/source_github/streams.py | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 78d76638ab81..2b9a1997e346 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -22,22 +22,33 @@ # SOFTWARE. # - -import tempfile import time from abc import ABC from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from pathlib import Path from urllib import parse import requests +import vcr +from vcr.cassette import Cassette from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream from requests.exceptions import HTTPError -cache_file = tempfile.NamedTemporaryFile() + +def request_cache() -> Cassette: + """ + Builds VCR instance. + It deletes file everytime we create it, normally should be called only once. + We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files. + """ + filename = Path("request_cache.yml") + filename.unlink(missing_ok=True) + return vcr.use_cassette(str(filename), record_mode="new_episodes", serializer="yaml") class GithubStream(HttpStream, ABC): + cache = request_cache() url_base = "https://api.github.com/" primary_key = "id" @@ -359,19 +370,13 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self._first_read = True - # TODO Fix vcr error: - # UnicodeDecodeError: 'utf-8' codec can't decode byte 0x72 in position 1: invalid start byteDoes this HTTP - # interaction contain binary data? If so, use a different serializer (like the yaml serializer) for this request? - # def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: - # with vcr.use_cassette(cache_file.name, record_mode="new_episodes", serializer="json"): - # yield from super().read_records(**kwargs) - def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: """ Decide if this a first read or not by the presence of the state object """ self._first_read = not bool(stream_state) - yield from super().read_records(stream_state=stream_state, **kwargs) + with self.cache: + yield from super().read_records(stream_state=stream_state, **kwargs) def path(self, **kwargs) -> str: return f"repos/{self.repository}/pulls" @@ -381,8 +386,8 @@ def transform(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any for nested in ("head", "base"): entry = record.get(nested, {}) - entry["user_id"] = record.get("head", {}).pop("user", {}).get("id") - entry["repo_id"] = record.get("head", {}).pop("repo", {}).get("id") + entry["user_id"] = (record.get("head", {}).pop("user", {}) or {}).get("id") + entry["repo_id"] = (record.get("head", {}).pop("repo", {}) or {}).get("id") return record From 9441eaedbe763e866fb5fe8171276e20c6fcdc6f Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 15 Jul 2021 02:40:25 +0300 Subject: [PATCH 11/13] format --- .../connectors/source-github/source_github/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 2b9a1997e346..727716c4a68e 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -24,16 +24,16 @@ import time from abc import ABC -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union from pathlib import Path +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union from urllib import parse import requests import vcr -from vcr.cassette import Cassette from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream from requests.exceptions import HTTPError +from vcr.cassette import Cassette def request_cache() -> Cassette: From 5a51fd76ad1f74a88299a9db6a455c45e3451611 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 15 Jul 2021 03:00:51 +0300 Subject: [PATCH 12/13] fix file removal for python 3.7 --- .../connectors/source-github/source_github/streams.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 727716c4a68e..86a7f6a09695 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -22,9 +22,9 @@ # SOFTWARE. # +import os import time from abc import ABC -from pathlib import Path from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union from urllib import parse @@ -42,8 +42,12 @@ def request_cache() -> Cassette: It deletes file everytime we create it, normally should be called only once. We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files. """ - filename = Path("request_cache.yml") - filename.unlink(missing_ok=True) + filename = "request_cache.yml" + try: + os.remove(filename) + except FileNotFoundError: + pass + return vcr.use_cassette(str(filename), record_mode="new_episodes", serializer="yaml") From 3cd57bead48664a8289334500ed87e54cec92328 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 15 Jul 2021 03:40:42 +0300 Subject: [PATCH 13/13] bumping version --- .../ef69ef6e-aa7f-4af1-a01d-ef775033524e.json | 4 ++-- .../init/src/main/resources/seed/source_definitions.yaml | 4 ++-- airbyte-integrations/connectors/source-github/Dockerfile | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json index 8d7fc60f5471..2ec9e0beef4b 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "ef69ef6e-aa7f-4af1-a01d-ef775033524e", "name": "GitHub", "dockerRepository": "airbyte/source-github", - "dockerImageTag": "0.1.1", - "documentationUrl": "https://hub.docker.com/r/airbyte/source-github", + "dockerImageTag": "0.1.2", + "documentationUrl": "https://docs.airbyte.io/integrations/sources/github", "icon": "github.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index be08178f40ac..3479b5550bfa 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -34,8 +34,8 @@ - sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e name: GitHub dockerRepository: airbyte/source-github - dockerImageTag: 0.1.1 - documentationUrl: https://hub.docker.com/r/airbyte/source-github + dockerImageTag: 0.1.2 + documentationUrl: https://docs.airbyte.io/integrations/sources/github icon: github.svg - sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 name: Microsoft SQL Server (MSSQL) diff --git a/airbyte-integrations/connectors/source-github/Dockerfile b/airbyte-integrations/connectors/source-github/Dockerfile index 5a355f880043..849a14dcca16 100644 --- a/airbyte-integrations/connectors/source-github/Dockerfile +++ b/airbyte-integrations/connectors/source-github/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-github