Skip to content

Commit

Permalink
feat(bigquery): add use_bqstorage_api param to to_dataframe and `…
Browse files Browse the repository at this point in the history
…to_arrow`

When the `use_bqstorage_api` parameter is set to `True`, the BigQuery
client constructs a BigQuery Storage API client for you. This removes
the need for boilerplate code to manually construct both clients
explitly with the same credentials.

Does this make the `bqstorage_client` parameter unnecessary? In most
cases, yes, but there are a few cases where we'll want to continue using
it. Specifically, when partner tools use `to_dataframe`, they should
continue to use `bqstorage_client` so that they can set the correct
amended user-agent strings.

Thought: maybe the `bigquery.Client._create_bqstorage_client()` helper
should try to do the right thing with regards to the user-agent strings?
The implementation for that feels like it could get more tricky than
it's worth.
  • Loading branch information
tswast committed Oct 30, 2019
1 parent abdfa3f commit 8d098de
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 8 deletions.
13 changes: 13 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,19 @@ def dataset(self, dataset_id, project=None):

return DatasetReference(project, dataset_id)

def _create_bqstorage_client(self):
"""Create a BigQuery Storage API client using this client's credentials.
Returns:
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient:
A BigQuery Storage API client.
"""
from google.cloud import bigquery_storage_v1beta1

return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=self._credentials
)

def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY):
"""API call: create the dataset via a POST request.
Expand Down
37 changes: 34 additions & 3 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3109,7 +3109,9 @@ def result(

# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_arrow()
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
def to_arrow(
self, progress_bar_type=None, bqstorage_client=None, use_bqstorage_api=False
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Expand Down Expand Up @@ -3142,6 +3144,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
Reading from a specific partition or snapshot is not
currently supported by this method.
use_bqstorage_api (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
*Storage API client and
use the faster BigQuery Storage API to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.22.0
Returns:
pyarrow.Table
Expand All @@ -3156,12 +3168,20 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
..versionadded:: 1.17.0
"""
return self.result().to_arrow(
progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
use_bqstorage_api=use_bqstorage_api,
)

# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_dataframe()
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
use_bqstorage_api=False,
):
"""Return a pandas DataFrame from a QueryJob
Args:
Expand Down Expand Up @@ -3194,6 +3214,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
for details.
..versionadded:: 1.11.0
use_bqstorage_api (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
*Storage API client and
use the faster BigQuery Storage API to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.22.0
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
Expand All @@ -3207,6 +3237,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
bqstorage_client=bqstorage_client,
dtypes=dtypes,
progress_bar_type=progress_bar_type,
use_bqstorage_api=use_bqstorage_api,
)

def __iter__(self):
Expand Down
55 changes: 50 additions & 5 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,9 @@ def _to_arrow_iterable(self, bqstorage_client=None):

# If changing the signature of this method, make sure to apply the same
# changes to job.QueryJob.to_arrow()
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
def to_arrow(
self, progress_bar_type=None, bqstorage_client=None, use_bqstorage_api=False
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Expand Down Expand Up @@ -1473,6 +1475,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
Reading from a specific partition or snapshot is not
currently supported by this method.
use_bqstorage_api (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
*Storage API client and
use the faster BigQuery Storage API to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.22.0
Returns:
pyarrow.Table
Expand All @@ -1488,6 +1500,9 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)

if not bqstorage_client and use_bqstorage_api:
bqstorage_client = self.client._create_bqstorage_client()

progress_bar = self._get_progress_bar(progress_bar_type)

record_batches = []
Expand Down Expand Up @@ -1542,14 +1557,20 @@ def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):

# If changing the signature of this method, make sure to apply the same
# changes to job.QueryJob.to_dataframe()
def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
use_bqstorage_api=False,
):
"""Create a pandas DataFrame by loading all pages of a query.
Args:
bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient):
**Beta Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.
from BigQuery.
This method requires the ``pyarrow`` and
``google-cloud-bigquery-storage`` libraries.
Expand Down Expand Up @@ -1586,6 +1607,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
progress bar as a graphical dialog box.
..versionadded:: 1.11.0
use_bqstorage_api (bool):
**Beta Feature** Optional. If ``True``, create a BigQuery
*Storage API client and
use the faster BigQuery Storage API to fetch rows from
BigQuery. See the ``bqstorage_client`` parameter for more
information.
This argument does nothing if ``bqstorage_client`` is supplied.
..versionadded:: 1.22.0
Returns:
pandas.DataFrame:
Expand All @@ -1605,6 +1636,9 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
if dtypes is None:
dtypes = {}

if not bqstorage_client and use_bqstorage_api:
bqstorage_client = self.client._create_bqstorage_client()

if bqstorage_client and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
Expand Down Expand Up @@ -1651,11 +1685,15 @@ class _EmptyRowIterator(object):
pages = ()
total_rows = 0

def to_arrow(self, progress_bar_type=None):
def to_arrow(
self, progress_bar_type=None, bqstorage_client=None, use_bqstorage_api=False
):
"""[Beta] Create an empty class:`pyarrow.Table`.
Args:
progress_bar_type (Optional[str]): Ignored. Added for compatibility with RowIterator.
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
use_bqstorage_api (bool): Ignored. Added for compatibility with RowIterator.
Returns:
pyarrow.Table: An empty :class:`pyarrow.Table`.
Expand All @@ -1664,13 +1702,20 @@ def to_arrow(self, progress_bar_type=None):
raise ValueError(_NO_PYARROW_ERROR)
return pyarrow.Table.from_arrays(())

def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
def to_dataframe(
self,
bqstorage_client=None,
dtypes=None,
progress_bar_type=None,
use_bqstorage_api=False,
):
"""Create an empty dataframe.
Args:
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
dtypes (Any): Ignored. Added for compatibility with RowIterator.
progress_bar_type (Any): Ignored. Added for compatibility with RowIterator.
use_bqstorage_api (bool): Ignored. Added for compatibility with RowIterator.
Returns:
pandas.DataFrame: An empty :class:`~pandas.DataFrame`.
Expand Down
33 changes: 33 additions & 0 deletions bigquery/samples/download_public_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def download_public_data(client):

# [START bigquery_pandas_public_data]
# TODO(developer): Import the client library.
# from google.cloud import bigquery

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# TODO(developer): Set table_id to the fully-qualified table ID in standard
# SQL format, including the project ID and dataset ID.
table_id = "bigquery-public-data.usa_names.usa_1910_current"

# Use the BigQuery Storage API to speed-up downloads of large tables.
dataframe = client.list_rows(table_id).to_dataframe(use_bqstorage_api=True)

print(dataframe.info())
# [END bigquery_pandas_public_data]
34 changes: 34 additions & 0 deletions bigquery/samples/download_public_data_sandbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def download_public_data_sandbox(client):

# [START bigquery_pandas_public_data_sandbox]
# TODO(developer): Import the client library.
# from google.cloud import bigquery

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# `SELECT *` is an anti-pattern in BigQuery because it is cheaper and
# faster to use the BigQuery Storage API directly, but BigQuery Sandbox
# users can only use the BigQuery Storage API to download query results.
query_string = "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_current`"

# Use the BigQuery Storage API to speed-up downloads of large tables.
dataframe = client.query(query_string).to_dataframe(use_bqstorage_api=True)

print(dataframe.info())
# [END bigquery_pandas_public_data_sandbox]
24 changes: 24 additions & 0 deletions bigquery/samples/tests/test_download_public_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .. import download_public_data


def test_download_public_data(capsys, client):

download_public_data.download_public_data(client)
out, _ = capsys.readouterr()
assert "year" in out
assert "gender" in out
assert "name" in out
24 changes: 24 additions & 0 deletions bigquery/samples/tests/test_download_public_data_sandbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .. import download_public_data_sandbox


def test_download_public_data_sandbox(capsys, client):

download_public_data_sandbox.download_public_data_sandbox(client)
out, _ = capsys.readouterr()
assert "year" in out
assert "gender" in out
assert "name" in out

0 comments on commit 8d098de

Please sign in to comment.