Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Add support for BigQuery Storage API Arrow format in to_dataframe and to_arrow. #8551

Merged
merged 20 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 56 additions & 21 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Shared helper functions for connecting BigQuery and pandas."""

import concurrent.futures
import functools
import warnings

from six.moves import queue
Expand Down Expand Up @@ -74,6 +75,8 @@ def pyarrow_timestamp():


if pyarrow:
# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
# When modifying it be sure to update it there as well.
BQ_TO_ARROW_SCALARS = {
"BOOL": pyarrow.bool_,
"BOOLEAN": pyarrow.bool_,
Expand Down Expand Up @@ -269,25 +272,27 @@ def download_dataframe_tabledata_list(pages, schema, dtypes):
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)


def _download_dataframe_bqstorage_stream(
download_state,
bqstorage_client,
column_names,
dtypes,
session,
stream,
worker_queue,
def _bqstorage_page_to_arrow(page):
return page.to_arrow()


def _bqstorage_page_to_dataframe(column_names, dtypes, page):
# page.to_dataframe() does not preserve column order in some versions
# of google-cloud-bigquery-storage. Access by column name to rearrange.
return page.to_dataframe(dtypes=dtypes)[column_names]


def _download_table_bqstorage_stream(
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)

for page in rowstream.pages:
if download_state.done:
return
# page.to_dataframe() does not preserve column order in some versions
# of google-cloud-bigquery-storage. Access by column name to rearrange.
frame = page.to_dataframe(dtypes=dtypes)[column_names]
worker_queue.put(frame)
item = page_to_item(page)
worker_queue.put(item)


def _nowait(futures):
Expand All @@ -304,14 +309,13 @@ def _nowait(futures):
return done, not_done


def download_dataframe_bqstorage(
def _download_table_bqstorage(
project_id,
table,
bqstorage_client,
column_names,
dtypes,
preserve_order=False,
selected_fields=None,
page_to_item=None,
):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
if "$" in table.table_id:
Expand All @@ -333,14 +337,13 @@ def download_dataframe_bqstorage(
session = bqstorage_client.create_read_session(
table.to_bqstorage(),
"projects/{}".format(project_id),
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
read_options=read_options,
requested_streams=requested_streams,
)

# Avoid reading rows from an empty table. pandas.concat will fail on an
# empty list.
# Avoid reading rows from an empty table.
if not session.streams:
yield pandas.DataFrame(columns=column_names)
return

total_streams = len(session.streams)
Expand All @@ -360,14 +363,13 @@ def download_dataframe_bqstorage(
# See: https://github.com/googleapis/google-cloud-python/pull/7698
not_done = [
pool.submit(
_download_dataframe_bqstorage_stream,
_download_table_bqstorage_stream,
download_state,
bqstorage_client,
column_names,
dtypes,
session,
stream,
worker_queue,
page_to_item,
)
for stream in session.streams
]
Expand Down Expand Up @@ -410,3 +412,36 @@ def download_dataframe_bqstorage(
# Shutdown all background threads, now that they should know to
# exit early.
pool.shutdown(wait=True)


def download_arrow_bqstorage(
project_id, table, bqstorage_client, preserve_order=False, selected_fields=None
):
return _download_table_bqstorage(
project_id,
table,
bqstorage_client,
preserve_order=preserve_order,
selected_fields=selected_fields,
page_to_item=_bqstorage_page_to_arrow,
)


def download_dataframe_bqstorage(
project_id,
table,
bqstorage_client,
column_names,
dtypes,
preserve_order=False,
selected_fields=None,
):
page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
return _download_table_bqstorage(
project_id,
table,
bqstorage_client,
preserve_order=preserve_order,
selected_fields=selected_fields,
page_to_item=page_to_item,
)
130 changes: 87 additions & 43 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import copy
import datetime
import functools
import operator
import warnings

Expand Down Expand Up @@ -1403,14 +1404,52 @@ def _get_progress_bar(self, progress_bar_type):
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

def _to_arrow_iterable(self):
def _to_page_iterable(
self, bqstorage_download, tabledata_list_download, bqstorage_client=None
):
if bqstorage_client is not None:
try:
# Iterate over the stream so that read errors are raised (and
# the method can then fallback to tabledata.list).
for item in bqstorage_download():
yield item
return
except google.api_core.exceptions.Forbidden:
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
# clearly problems if the developer has explicitly asked for
# BigQuery Storage API support.
raise
except google.api_core.exceptions.GoogleAPICallError:
# There is a known issue with reading from small anonymous
# query results tables, so some errors are expected. Rather
# than throw those errors, try reading the DataFrame again, but
# with the tabledata.list API.
pass

for item in tabledata_list_download():
yield item

def _to_arrow_iterable(self, bqstorage_client=None):
"""Create an iterable of arrow RecordBatches, to process the table as a stream."""
for record_batch in _pandas_helpers.download_arrow_tabledata_list(
iter(self.pages), self.schema
):
yield record_batch
bqstorage_download = functools.partial(
_pandas_helpers.download_arrow_bqstorage,
self._project,
self._table,
bqstorage_client,
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_arrow_tabledata_list, iter(self.pages), self.schema
)
return self._to_page_iterable(
bqstorage_download,
tabledata_list_download,
bqstorage_client=bqstorage_client,
)

def to_arrow(self, progress_bar_type=None):
def to_arrow(self, progress_bar_type=None, bqstorage_client=None):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.

Expand All @@ -1433,6 +1472,18 @@ def to_arrow(self, progress_bar_type=None):
``'tqdm_gui'``
Use the :func:`tqdm.tqdm_gui` function to display a
progress bar as a graphical dialog box.
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
**Beta Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.

This method requires the ``pyarrow`` and
``google-cloud-bigquery-storage`` libraries.

Reading from a specific partition or snapshot is not
currently supported by this method.

Returns:
pyarrow.Table
Expand All @@ -1452,7 +1503,7 @@ def to_arrow(self, progress_bar_type=None):
progress_bar = self._get_progress_bar(progress_bar_type)

record_batches = []
for record_batch in self._to_arrow_iterable():
for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client):
record_batches.append(record_batch)

if progress_bar is not None:
Expand All @@ -1466,47 +1517,40 @@ def to_arrow(self, progress_bar_type=None):
# Indicate that the download has finished.
progress_bar.close()

arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema)
return pyarrow.Table.from_batches(record_batches, schema=arrow_schema)
if record_batches:
return pyarrow.Table.from_batches(record_batches)
else:
# No records, use schema based on BigQuery schema.
arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema)
return pyarrow.Table.from_batches(record_batches, schema=arrow_schema)

def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
"""Create an iterable of pandas DataFrames, to process the table as a stream.

See ``to_dataframe`` for argument descriptions.
"""
if bqstorage_client is not None:
column_names = [field.name for field in self._schema]
try:
# Iterate over the stream so that read errors are raised (and
# the method can then fallback to tabledata.list).
for frame in _pandas_helpers.download_dataframe_bqstorage(
self._project,
self._table,
bqstorage_client,
column_names,
dtypes,
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
):
yield frame
return
except google.api_core.exceptions.Forbidden:
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
# clearly problems if the developer has explicitly asked for
# BigQuery Storage API support.
raise
except google.api_core.exceptions.GoogleAPICallError:
# There is a known issue with reading from small anonymous
# query results tables, so some errors are expected. Rather
# than throw those errors, try reading the DataFrame again, but
# with the tabledata.list API.
pass

for frame in _pandas_helpers.download_dataframe_tabledata_list(
iter(self.pages), self.schema, dtypes
):
yield frame
column_names = [field.name for field in self._schema]
bqstorage_download = functools.partial(
_pandas_helpers.download_dataframe_bqstorage,
self._project,
self._table,
bqstorage_client,
column_names,
dtypes,
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_dataframe_tabledata_list,
iter(self.pages),
self.schema,
dtypes,
)
return self._to_page_iterable(
bqstorage_download,
tabledata_list_download,
bqstorage_client=bqstorage_client,
)

def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
"""Create a pandas DataFrame by loading all pages of a query.
Expand All @@ -1519,7 +1563,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.

This method requires the ``fastavro`` and
This method requires the ``pyarrow`` and
``google-cloud-bigquery-storage`` libraries.

Reading from a specific partition or snapshot is not
Expand Down
6 changes: 4 additions & 2 deletions bigquery/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
]
extras = {
"bqstorage": [
"google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev",
"fastavro>=0.21.2",
"google-cloud-bigquery-storage >= 0.6.0, <2.0.0dev",
# Bad Linux release for 0.14.0.
# https://issues.apache.org/jira/browse/ARROW-5868
"pyarrow>=0.13.0, != 0.14.0",
],
"pandas": ["pandas>=0.17.1"],
# Exclude PyArrow dependency from Windows Python 2.7.
Expand Down
Loading