Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with AvailabilityStrategy and substream stream_slices #20891

Closed
wants to merge 6 commits into from

Conversation

erohmensing
Copy link
Contributor

@erohmensing erohmensing commented Dec 27, 2022

To reproduce the KeyError issue in github locally:

Connect to the following Github repo (public, so any PAT should work)

image (2)

Turn on the commits and commit_comment_reactions streams with Full refresh | Overwrite. Note: you don't need the commits stream to reproduce, but I wanted to make sure that streams emitting records were still emitting records with the fix, and not just succeeding with 0 records.

Run a sync on a working version - 0.3.11 (latest) or or 0.3.8 (working version before broken version) should both work.
Run a sync on 0.3.9 (broken version) - sync will fail with the following traceback:

File "/airbyte/integration_code/main.py", line 13, in <module>
    launch(source, sys.argv[1:])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 122, in run
    for message in generator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 109, in read
    stream_is_available, error = stream_instance.check_availability(logger, self)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 190, in check_availability
    return self.availability_strategy.check_availability(self, logger, source)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/availability_strategy.py", line 35, in check_availability
    stream_helper.get_first_record(stream)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/utils/stream_helpers.py", line 24, in get_first_record
    return next(records)
  File "/airbyte/integration_code/source_github/streams.py", line 939, in read_records
    for record in super().read_records(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py", line 419, in read_records
    yield from self._read_pages(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py", line 435, in _read_pages
    request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py", line 450, in _fetch_next_page
    path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
  File "/airbyte/integration_code/source_github/streams.py", line 904, in path
    parent_path = self._parent_stream.path(stream_slice=stream_slice, **kwargs)
  File "/airbyte/integration_code/source_github/streams.py", line 492, in path
    return f"repos/{stream_slice['repository']}/comments"
KeyError: 'repository'

To test the potential fix locally:

Check out the branch, and add symlink to CDK:

cd airbyte-integrations/connectors/source-github
ln -s ../../../airbyte-cdk/python airbyte-cdk

Build the source-github image on the branch:

cd ../../..
./gradlew airbyte-integrations:connectors:source-github:airbyteDocker

Temporary changes make the docker build reference the local CDK (which is branched off of re-adding in the AvailabilityStrategy to the CDK). This contains no changes to the Github code (i.e. no implementation of an AvailabilityStrategy other than the default HttpAvailabilityStrategy) but with the potential fix in the CDK version. Run sync with this newly created dev image. Should succeed, with the same 12 records.

To reproduce the issue in SAT

Create a copy of secrets/config.json into secrets/test_config.json. Update the contents as so:

{
  "credentials": {
    "personal_access_token": "<existing PAT, or your own>"
  },
  "repository": "erohmensing/thismonth.rocks",
  "start_date": "2015-01-01T00:00:00Z"
}

Now you should be able to run the SAT tests locally with the existing versions and dev version built with the CDK and get the same results as reproducing the issue above. E.g. 0.3.8 should pass, 0.3.9 should fail with the error, and dev should pass again.

Note that this line was added because it would fail on returning no records (it shouldn't return any - there are none in the repo) in the working versions.

@octavia-squidington-iv octavia-squidington-iv added the CDK Connector Development Kit label Dec 27, 2022
@erohmensing erohmensing force-pushed the ella/AS-substream-bug branch from d7ec4bd to 9cf4919 Compare January 9, 2023 19:11
bad_branches_stream = Branches(repositories=[])
assert isinstance(branches_stream.availability_strategy, HttpAvailabilityStrategy)

# If the stream_slices method returns {}, this fails
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be removed before merging, it's just a note as to how I managed to reproduce the error while unit testing

@erohmensing erohmensing requested a review from a team January 9, 2023 21:07
Copy link
Contributor

@pedroslopez pedroslopez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the docker/github-related changes are for testing and won't be merged, this seems reasonable.

Mainly have a question to make sure we're not unintentionally breaking a different edge case.

Also, can you clarify what about the thismonth.rocks repo makes this fail? Is it because the commit_comment_reactions is empty?

Comment on lines -44 to -47
try:
return next(slices)
except StopIteration:
return {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any insight into why previously we explicitly had this try/except? I don't fully understand what this was trying to do and if there's some edge case this was added for that should be directly tested via a new test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not sure. Looks like it was added here- @girarda do you remember what case you added this for?

Copy link
Contributor Author

@erohmensing erohmensing Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it actually makes more senes to

 try:
    return next(slices)
 except StopIteration:
    return None

Instead of removing it all together - right now it doesn't change the behavior either way since it's only used in the method above it, but I think this would be the actual behavior we'd want when using it elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's for the case that someone messed something up in what the stream slices returns, we wanted to address it, but we just addressed it incorrectly (empty slice instead of None)?

E.g. the case in my comment below where CommitCommentReactions' stream_slices returns an empty generator

@erohmensing
Copy link
Contributor Author

erohmensing commented Jan 12, 2023

Assuming the docker/github-related changes are for testing and won't be merged, this seems reasonable.

Yup, hence the draft - will clean up stuff that won't merge before turning it into an actual PR

Also, can you clarify what about the thismonth.rocks repo makes this fail? Is it because the commit_comment_reactions is empty?

The commit_comment_reactions stream fails not because it itself is empty, but because its parent stream, commit_comments, is empty. I tested as following:

  • Add a commit comment with no reactions -> turns failure into success
  • Add commit comment with a reaction -> still succeeds
  • Add a reaction without a commit comment -- obviously you can't do this.

Let me try to rewrite the test to better reflect that...

The question therefore is, how is this logic working successfully during read but not during check_availability - I think it has do do with the stream slices there not returning the right kind of empty. We might handle this better when reading than when using get_first_record.

The stream_slices for ReactionStreams looks like

    def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
        for stream_slice in super().stream_slices(**kwargs):
            for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
                yield {self.copy_parent_key: parent_record[self.parent_key], "repository": stream_slice["repository"]}

So for commit_comments (the superclass), we'd expect to have super().stream_slices which contain the regular stream_slices for a GithubStream:

for repository in self.repositories:
            yield {"repository": repository}

However we wouldn't expect to see any parent_records. So we don't yield anything. Here's a way to reproduce the behavior of what's going on:

parent_slices = [{"repository": "a"}, {"repository": "b"}]
parent_records = iter([])
def stream_slices():
  for i in parent_slices:
    for j in parent_records:
      yield j

slices = iter(stream_slices())
next(slices)

This raises a StopIteration. We previously returned {}, causing the key error, when now we let that StopIteration get to the get_first_record try/except, so that if the slices generator doesn't return a slice, we know there's also not a record.

However if you iterate (for slice in slices: do something or for slice in stream_slices(): do something) both never do the something, but also don't error. During reading we're probably doing them this way, instead of using next(slices) or next(stream_slices()), hence no issues.

Hypothetically this could be a problem on Github's end - maybe we expect the stream_slices for ReactionStreams to return [None] if there are no parent records... I'd have to confirm if the same issue is happening in Pinterest to make sure that's true.

@erohmensing
Copy link
Contributor Author

Pinterest behavior for PinterestSubstream looks to have the same behavior:

        parent_stream_slices = self.parent.stream_slices(
            sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_state=stream_state
        )
        # iterate over all parent stream_slices
        for stream_slice in parent_stream_slices:
            parent_records = self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)

            # iterate over all parent records with current stream_slice
            for record in parent_records:
                yield {"parent": record, "sub_parent": stream_slice}

TL;DR I think PinterestSubstreams and github's ReactionStreams aren't handling well when the parent streams have no records - we tried to account for this case with a try/except but we didn't handle it quite right, by returning {} instead of None. I think we didn't see this before because we never tried using a substream for the CheckStream check. I'll try to validate that on an existing version.

# bad_slice_return_value = {}
# mocker.patch.object(RepoBasedStream, "stream_slices", return_value=bad_slice_return_value)
# assert StreamHelper.get_stream_slice(branches_stream) == bad_slice_return_value
# assert branches_stream.path(bad_slice_return_value) == f"repos/user/repo1/branches"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how changing bad_slice_return_value from {} to None fixes this line. wouldn't it just raise a Type error (TypeError: 'NoneType' object is not subscriptable)?

@erohmensing
Copy link
Contributor Author

image

Able to reproduce the same error by trying to connect to a substream of an empty stream on master (so no AvailabilityStrategy) by using this check_connection for github:

    def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
        check_stream = CheckStream(["commit_comment_reactions"], options={})
        return check_stream.check_connection(self, logger, config)

Looks like the issue was introduced during the implementation of check_stream, which functionality was reused to check streams for availability strategy. It was just never caught since no one tried to make a substream of a stream that is sometimes likely to be empty as a stream_to_check.

Going to fix that in the CDK now before putting AvailabilityStrategy back in.

@erohmensing
Copy link
Contributor Author

The edge case for substreams is being fixed separately in the CDK here (plus one bonus edge case!). Re-enabling the CDK will be done in a separate PR, so closing this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues CDK Connector Development Kit connectors/source/github
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants