Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: WordCountIT.test_wordcount_impersonation_it failing #23676

Closed
lukecwik opened this issue Oct 17, 2022 · 0 comments · Fixed by #23677
Closed

[Bug]: WordCountIT.test_wordcount_impersonation_it failing #23676

lukecwik opened this issue Oct 17, 2022 · 0 comments · Fixed by #23677
Assignees
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. P0 python

Comments

@lukecwik
Copy link
Member

lukecwik commented Oct 17, 2022

What happened?

#23644 causes Python PostCommit to fail.

Example:
https://ci-beam.apache.org/view/PostCommit/job/beam_PostCommit_Python39/974

Error Message
apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions {'gs://dataflow-samples/shakespeare/kinglear.txt': RefreshError('Unable to acquire impersonated credentials', '{\n  "error": {\n    "code": 400,\n    "message": "Request contains an invalid argument.",\n    "status": "INVALID_ARGUMENT"\n  }\n}\n')}
Stacktrace
self = <apache_beam.examples.wordcount_it_test.WordCountIT testMethod=test_wordcount_impersonation_it>

    @pytest.mark.it_postcommit
    @pytest.mark.sickbay_direct
    @pytest.mark.sickbay_spark
    @pytest.mark.sickbay_flink
    def test_wordcount_impersonation_it(self):
      """Tests impersonation on dataflow.
    
      For testing impersonation, we use three ingredients:
      - a principal to impersonate
      - a dataflow service account that only that principal is
        allowed to launch jobs as
      - a temp root that only the above two accounts have access to
    
      Jenkins and Dataflow workers both run as GCE default service account.
      So we remove that account from all the above.
      """
      # Credentials need to be reset or this test will fail and credentials
      # from a previous test will be used.
      auth._Credentials._credentials_init = False
    
      ACCOUNT_TO_IMPERSONATE = (
          'allows-impersonation@apache-'
          'beam-testing.iam.gserviceaccount.com')
      RUNNER_ACCOUNT = (
          'impersonation-dataflow-worker@'
          'apache-beam-testing.iam.gserviceaccount.com')
      TEMP_DIR = 'gs://impersonation-test-bucket/temp-it'
      STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it'
      extra_options = {
          'impersonate_service_account': ACCOUNT_TO_IMPERSONATE,
          'service_account_email': RUNNER_ACCOUNT,
          'temp_location': TEMP_DIR,
          'staging_location': STAGING_LOCATION
      }
>     self._run_wordcount_it(wordcount.run, **extra_options)

apache_beam/examples/wordcount_it_test.py:85: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/examples/wordcount_it_test.py:146: in _run_wordcount_it
    run_wordcount(
apache_beam/examples/wordcount.py:90: in run
    lines = p | 'Read' >> ReadFromText(known_args.input)
apache_beam/io/textio.py:765: in __init__
    self._source = self._source_class(
apache_beam/io/textio.py:131: in __init__
    super().__init__(
apache_beam/io/filebasedsource.py:127: in __init__
    self._validate()
apache_beam/options/value_provider.py:193: in _f
    return fnc(self, *args, **kwargs)
apache_beam/io/filebasedsource.py:188: in _validate
    match_result = FileSystems.match([pattern], limits=[1])[0]
apache_beam/io/filesystems.py:204: in match
    return filesystem.match(patterns, limits)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.io.gcp.gcsfilesystem.GCSFileSystem object at 0x7f86bd27bfa0>
patterns = ['gs://dataflow-samples/shakespeare/kinglear.txt'], limits = [1]

    def match(self, patterns, limits=None):
      """Find all matching paths to the patterns provided.
    
      See Also:
        :meth:`translate_pattern`
    
      Patterns ending with '/' or '\\' will be appended with '*'.
    
      Args:
        patterns: list of string for the file path pattern to match against
        limits: list of maximum number of responses that need to be fetched
    
      Returns: list of ``MatchResult`` objects.
    
      Raises:
        ``BeamIOError``: if any of the pattern match operations fail
      """
      if limits is None:
        limits = [None] * len(patterns)
      else:
        err_msg = "Patterns and limits should be equal in length"
        assert len(patterns) == len(limits), err_msg
    
      def _match(pattern, limit):
        """Find all matching paths to the pattern provided."""
        if pattern.endswith('/') or pattern.endswith('\\'):
          pattern += '*'
        # Get the part of the pattern before the first globbing character.
        # For example scheme://path/foo* will become scheme://path/foo for
        # filesystems like GCS, or converted to scheme://path for filesystems with
        # directories.
        prefix_or_dir = re.match('^[^[*?]*', pattern).group(0)
    
        file_metadatas = []
        if prefix_or_dir == pattern:
          # Short-circuit calling self.list() if there's no glob pattern to match.
          if self.exists(pattern):
            file_metadatas = [self.metadata(pattern)]
        else:
          if self.has_dirs():
            prefix_dirname = self._url_dirname(prefix_or_dir)
            if not prefix_dirname == prefix_or_dir:
              logger.debug(
                  "Changed prefix_or_dir %r -> %r", prefix_or_dir, prefix_dirname)
              prefix_or_dir = prefix_dirname
    
          logger.debug("Listing files in %r", prefix_or_dir)
          file_metadatas = self._list(prefix_or_dir)
    
        metadata_list = []
        for file_metadata in self.match_files(file_metadatas, pattern):
          if limit is not None and len(metadata_list) >= limit:
            break
          metadata_list.append(file_metadata)
    
        return MatchResult(pattern, metadata_list)
    
      exceptions = {}
      result = []
      for pattern, limit in zip(patterns, limits):
        try:
          result.append(_match(pattern, limit))
        except Exception as e:  # pylint: disable=broad-except
          exceptions[pattern] = e
    
      if exceptions:
>       raise BeamIOError("Match operation failed", exceptions)
E       apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions {'gs://dataflow-samples/shakespeare/kinglear.txt': RefreshError('Unable to acquire impersonated credentials', '{\n  "error": {\n    "code": 400,\n    "message": "Request contains an invalid argument.",\n    "status": "INVALID_ARGUMENT"\n  }\n}\n')}

apache_beam/io/filesystem.py:787: BeamIOError

This does cause other tests to fail since the credentials object isn't reset at the end of the test causing other integration tests to pick up the failing impersonation credential.

Issue Priority

Priority: 0

Issue Component

Component: sdk-py-core

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. P0 python
Projects
None yet
2 participants