diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 3fedefcbc6b0..d12dd276a1aa 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -709,7 +709,7 @@ def display_data(self): } def estimate_size(self): - bq = bigquery_tools.BigQueryWrapper() + bq = bigquery_tools.BigQueryWrapper.from_pipeline_options(self.options) if self.table_reference is not None: table_ref = self.table_reference if (isinstance(self.table_reference, vp.ValueProvider) and diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 552c22f1f770..312e3f70c2b0 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -349,13 +349,7 @@ class BigQueryWrapper(object): HISTOGRAM_METRIC_LOGGER = MetricLogger() def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): - self.client = client or bigquery.BigqueryV2( - http=get_new_http(), - credentials=auth.get_service_credentials(PipelineOptions()), - response_encoding='utf8', - additional_http_headers={ - "user-agent": "apache-beam-%s" % apache_beam.__version__ - }) + self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) self.gcp_bq_client = client or gcp_bigquery.Client( client_info=ClientInfo( user_agent="apache-beam-%s" % apache_beam.__version__)) @@ -1350,6 +1344,21 @@ def convert_row_to_dict(self, row, schema): result[field.name] = self._convert_cell_value_to_dict(value, field) return result + @staticmethod + def from_pipeline_options(pipeline_options: PipelineOptions): + return BigQueryWrapper( + client=BigQueryWrapper._bigquery_client(pipeline_options)) + + @staticmethod + def _bigquery_client(pipeline_options: PipelineOptions): + return bigquery.BigqueryV2( + http=get_new_http(), + credentials=auth.get_service_credentials(pipeline_options), + response_encoding='utf8', + additional_http_headers={ + "user-agent": "apache-beam-%s" % apache_beam.__version__ + }) + class RowAsDictJsonCoder(coders.Coder): """A coder for a table row (represented as a dict) to/from a JSON string.