Skip to content

Commit

Permalink
Initialze storage client with project from pipeline option.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Nov 16, 2023
1 parent 9e885c0 commit acb57f3
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 7 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@

## I/Os

* Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676))
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).
* Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676))

## New Features / Improvements

Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from google.cloud.storage.fileio import BlobWriter

from apache_beam.internal.gcp import auth
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
Expand Down Expand Up @@ -109,7 +110,8 @@ def __init__(self, storage_client=None, pipeline_options=None):
credentials = auth.get_service_credentials(pipeline_options)
if credentials:
storage_client = storage.Client(
credentials=credentials.get_google_auth_credentials())
credentials=credentials.get_google_auth_credentials(),
project=pipeline_options.view_as(GoogleCloudOptions).project)
else:
storage_client = storage.Client.create_anonymous_client()
self.client = storage_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,6 @@ def __init__(self, options, root_staging_location=None):
else:
credentials = get_service_credentials(options)
storage_credentials = credentials.get_google_auth_credentials()
storage_project = self.google_cloud_options.project

http_client = get_new_http()
self._client = dataflow.DataflowV1b3(
Expand All @@ -509,9 +508,13 @@ def __init__(self, options, root_staging_location=None):
get_credentials=(not self.google_cloud_options.no_auth),
http=http_client,
response_encoding=get_response_encoding())
if storage_credentials and storage_project:
if storage_credentials:
# Here we explicitly set the project to the value specified in pipeline
# options, so the new storage client will be consistent with the previous
# client in terms of which GCP project to use.
self._storage_client = storage.Client(
credentials=storage_credentials, project=storage_project)
credentials=storage_credentials,
project=self.google_cloud_options.project)
else:
self._storage_client = storage.Client.create_anonymous_client()
self._sdk_image_overrides = self._get_sdk_image_overrides(options)
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/runners/interactive/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,10 @@ def assert_bucket_exists(bucket_name):
from google.cloud import storage
credentials = auth.get_service_credentials(PipelineOptions())
if credentials:
storage_client = storage.Client(credentials=credentials)
# We set project to None, so it will not try to use project id from
# the environment (ADC).
storage_client = storage.Client(
credentials=credentials.get_google_auth_credentials(), project=None)
else:
storage_client = storage.Client.create_anonymous_client()
storage_client.get_bucket(bucket_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def __init__(self, options):
from google.cloud import storage
if credentials:
self._storage_client = storage.Client(
credentials=credentials.get_google_auth_credentials())
credentials=credentials.get_google_auth_credentials(),
project=self._google_cloud_options.project)
else:
self._storage_client = storage.Client.create_anonymous_client()
self._cloudbuild_client = cloudbuild.CloudbuildV1(
Expand Down

0 comments on commit acb57f3

Please sign in to comment.