Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Github: re-implement incremental for reviews stream #11623

Merged
merged 12 commits into from
Apr 1, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@
- name: GitHub
sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerRepository: airbyte/source-github
dockerImageTag: 0.2.25
dockerImageTag: 0.2.26
documentationUrl: https://docs.airbyte.io/integrations/sources/github
icon: github.svg
sourceType: api
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.25
LABEL io.airbyte.version=0.2.26
LABEL io.airbyte.name=airbyte/source-github
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ tests:
pull_requests: ["airbytehq/integration-test", "updated_at"]
releases: ["airbytehq/integration-test", "created_at"]
review_comments: ["airbytehq/integration-test", "updated_at"]
reviews: ["airbytehq/integration-test", "submitted_at"]
reviews: ["airbytehq/integration-test", "parent_updated_at"]
stargazers: ["airbytehq/integration-test", "starred_at"]
full_refresh:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
},
"reviews": {
"airbytehq/integration-test": {
"submitted_at": "2121-06-29T02:04:57Z"
"parent_updated_at": "2121-06-29T02:04:57Z"
}
},
"stargazers": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,12 @@
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["submitted_at"],
"default_cursor_field": ["parent_updated_at"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["submitted_at"]
"cursor_field": ["parent_updated_at"]
},
{
"stream": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
},
"reviews": {
"airbytehq/integration-test": {
"submitted_at": "2021-08-30T12:01:15Z"
"parent_updated_at": "2021-08-30T12:01:15Z"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
"type": ["null", "string"],
"format": "date-time"
},
"parent_updated_at": {
"type": "string",
"format": "date-time"
},
"commit_id": {
"type": ["null", "string"]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ def stream_slices(
parent_stream_slices = super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=parent_state)
for parent_stream_slice in parent_stream_slices:
yield {
"parent_updated_at": parent_stream_slice["parent"]["updated_at"],
grubberr marked this conversation as resolved.
Show resolved Hide resolved
"pull_request_number": parent_stream_slice["parent"]["number"],
"repository": parent_stream_slice["parent"]["repository"],
}
Expand Down Expand Up @@ -708,7 +709,7 @@ class Reviews(PullRequestSubstream):
API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request
"""

cursor_field = "submitted_at"
cursor_field = "parent_updated_at"

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
Expand All @@ -723,6 +724,11 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
parent_state[repository][self.parent.cursor_field] = parent_state[repository][self.cursor_field]
yield from super().stream_slices(stream_state=parent_state, **kwargs)

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
record = super().transform(record=record, stream_slice=stream_slice)
record[self.cursor_field] = stream_slice[self.cursor_field]
return record


class PullRequestCommits(GithubStream):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
PullRequests,
Releases,
Repositories,
Reviews,
Stargazers,
Tags,
Teams,
Users,
)

from .utils import ProjectsResponsesAPI, read_full_refresh, read_incremental
from .utils import ProjectsResponsesAPI, read_full_refresh, read_incremental, urlbase

DEFAULT_BACKOFF_DELAYS = [5, 10, 20, 40, 80]

Expand Down Expand Up @@ -659,3 +660,86 @@ def get_records(cursor_field):
stream = Stargazers(**repository_args_with_start_date)
records = read_full_refresh(stream)
assert records == [{"repository": "organization/repository", "starred_at": "2022-02-02T00:00:00Z", "user": {"id": 2}, "user_id": 2}]


@responses.activate
def test_stream_reviews_incremental_read():

url_pulls = "https://api.github.com/repos/organization/repository/pulls"

repository_args_with_start_date = {
"start_date": "2000-01-01T00:00:00Z",
"page_size_for_large_streams": 30,
"repositories": ["organization/repository"],
}
stream = Reviews(parent=PullRequests(**repository_args_with_start_date), **repository_args_with_start_date)

responses.add(
"GET",
url_pulls,
json=[
{"updated_at": "2022-01-01T00:00:00Z", "number": 1},
{"updated_at": "2022-01-02T00:00:00Z", "number": 2},
],
)

responses.add(
"GET",
"https://api.github.com/repos/organization/repository/pulls/1/reviews",
json=[{"id": 1000, "body": "commit1"}, {"id": 1001, "body": "commit1"}],
)

responses.add(
"GET",
"https://api.github.com/repos/organization/repository/pulls/2/reviews",
json=[{"id": 1002, "body": "commit1"}],
)

stream_state = {}
records = read_incremental(stream, stream_state)

assert records == [
{"body": "commit1", "id": 1000, "parent_updated_at": "2022-01-01T00:00:00Z", "repository": "organization/repository"},
{"body": "commit1", "id": 1001, "parent_updated_at": "2022-01-01T00:00:00Z", "repository": "organization/repository"},
{"body": "commit1", "id": 1002, "parent_updated_at": "2022-01-02T00:00:00Z", "repository": "organization/repository"},
]

assert stream_state == {"organization/repository": {"parent_updated_at": "2022-01-02T00:00:00Z"}}

responses.add(
"GET",
url_pulls,
json=[
{"updated_at": "2022-01-03T00:00:00Z", "number": 1},
{"updated_at": "2022-01-02T00:00:00Z", "number": 2},
{"updated_at": "2022-01-04T00:00:00Z", "number": 3},
],
)

responses.add(
"GET",
"https://api.github.com/repos/organization/repository/pulls/1/reviews",
json=[{"id": 1000, "body": "commit1"}, {"id": 1001, "body": "commit2"}],
)

responses.add(
"GET",
"https://api.github.com/repos/organization/repository/pulls/3/reviews",
json=[{"id": 1003, "body": "commit1"}],
)

records = read_incremental(stream, stream_state)

assert records == [
{"body": "commit1", "id": 1000, "parent_updated_at": "2022-01-03T00:00:00Z", "repository": "organization/repository"},
{"body": "commit2", "id": 1001, "parent_updated_at": "2022-01-03T00:00:00Z", "repository": "organization/repository"},
{"body": "commit1", "id": 1003, "parent_updated_at": "2022-01-04T00:00:00Z", "repository": "organization/repository"},
]

assert stream_state == {"organization/repository": {"parent_updated_at": "2022-01-04T00:00:00Z"}}

assert len(responses.calls) == 6
assert urlbase(responses.calls[0].request.url) == url_pulls
grubberr marked this conversation as resolved.
Show resolved Hide resolved
assert responses.calls[0].request.params["direction"] == "asc"
assert urlbase(responses.calls[3].request.url) == url_pulls
assert responses.calls[3].request.params["direction"] == "asc"
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from typing import Any, MutableMapping
from urllib.parse import urlparse

import responses
from airbyte_cdk.models import SyncMode
Expand All @@ -28,6 +29,10 @@ def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str,
return res


def urlbase(url):
return urlparse(url)._replace(params="", query="", fragment="").geturl()


class ProjectsResponsesAPI:
"""
Fake Responses API for github projects, columns, cards
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Your token should have at least the `repo` scope. Depending on which streams you

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:-------------------------------------------------------------------------------------------------------------|
| 0.2.26 | 2022-03-31 | [11623](https://github.com/airbytehq/airbyte/pull/11623) | Re-implement incremental for `reviews` stream using parent `updated_at` field |
grubberr marked this conversation as resolved.
Show resolved Hide resolved
| 0.2.25 | 2022-03-31 | [11567](https://github.com/airbytehq/airbyte/pull/11567) | Improve code for better error handling |
| 0.2.24 | 2022-03-30 | [9251](https://github.com/airbytehq/airbyte/pull/9251) | Add Streams Workflow and WorkflowRuns |
| 0.2.23 | 2022-03-17 | [11212](https://github.com/airbytehq/airbyte/pull/11212) | Improve documentation and spec for Beta |
Expand Down