Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(bigquery): to_dataframe uses faster to_arrow + to_pandas when pyarrow is available #10027

Merged
merged 7 commits into from
Jan 15, 2020
63 changes: 40 additions & 23 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,17 @@ def to_arrow(
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)

if (
bqstorage_client or create_bqstorage_client
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
bqstorage_client = None

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
Expand Down Expand Up @@ -1707,33 +1718,39 @@ def to_dataframe(
create_bqstorage_client = False
bqstorage_client = None

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
bqstorage_client = self.client._create_bqstorage_client()

try:
progress_bar = self._get_progress_bar(progress_bar_type)
if pyarrow is not None:
# If pyarrow is available, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for frame in self.to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)
# The bqstorage_client is only used if pyarrow is available, so the
# rest of this method only needs to account for tabledata.list.
progress_bar = self._get_progress_bar(progress_bar_type)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))
frames = []
for frame in self.to_dataframe_iterable(dtypes=dtypes):
frames.append(frame)

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()

# Avoid concatting an empty list.
if not frames:
Expand Down
7 changes: 6 additions & 1 deletion bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -2369,7 +2369,12 @@ def test_nested_table_to_dataframe(self):
row = df.iloc[0]
# verify the row content
self.assertEqual(row["string_col"], "Some value")
self.assertEqual(row["record_col"], record)
expected_keys = tuple(sorted(record.keys()))
row_keys = tuple(sorted(row["record_col"].keys()))
self.assertEqual(row_keys, expected_keys)
# Can't compare numpy arrays, which pyarrow encodes the embedded
# repeated column to, so convert to list.
self.assertEqual(list(row["record_col"]["nested_repeated"]), [0, 1, 2])
# verify that nested data can be accessed with indices/keys
self.assertEqual(row["record_col"]["nested_repeated"][0], 0)
self.assertEqual(
Expand Down
Loading