Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Allow choice of compression when loading from dataframe #8938

Merged
merged 3 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)


def dataframe_to_parquet(dataframe, bq_schema, filepath):
def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SNAPPY"):
"""Write dataframe as a Parquet file, according to the desired BQ schema.

This function requires the :mod:`pyarrow` package. Arrow is used as an
Expand All @@ -222,12 +222,17 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath):
columns in the DataFrame.
filepath (str):
Path to write Parquet file to.
parquet_compression (str):
(optional) The compression codec to use by the the
``pyarrow.parquet.write_table`` serializing method. Defaults to
"SNAPPY".
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
"""
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)


def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
Expand Down
25 changes: 23 additions & 2 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ def load_table_from_dataframe(
location=None,
project=None,
job_config=None,
parquet_compression="snappy",
):
"""Upload the contents of a table from a pandas DataFrame.

Expand Down Expand Up @@ -1491,6 +1492,20 @@ def load_table_from_dataframe(
column names matching those of the dataframe. The BigQuery
schema is used to determine the correct data type conversion.
Indexes are not loaded. Requires the :mod:`pyarrow` library.
parquet_compression (str):
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.

If ``pyarrow`` and job config schema are used, the argument
is directly passed as the ``compression`` argument to the
underlying ``pyarrow.parquet.write_table()`` method (the
default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table

If either ``pyarrow`` or job config schema are missing, the
argument is directly passed as the ``compression`` argument
to the underlying ``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet

Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand All @@ -1515,8 +1530,14 @@ def load_table_from_dataframe(

try:
if pyarrow and job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

_pandas_helpers.dataframe_to_parquet(
dataframe, job_config.schema, tmppath
dataframe,
job_config.schema,
tmppath,
parquet_compression=parquet_compression,
)
else:
if job_config.schema:
Expand All @@ -1527,7 +1548,7 @@ def load_table_from_dataframe(
PendingDeprecationWarning,
stacklevel=2,
)
dataframe.to_parquet(tmppath)
dataframe.to_parquet(tmppath, compression=parquet_compression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need to modify _pandas_helpers.dataframe_to_parquet as well. See:

pyarrow.parquet.write_table(arrow_table, filepath)
The underlying pyarrow.parquet.write_table function also takes a compression argument.

Long-term, I expect the _pandas_helpers.dataframe_to_parquet function to get used more often than the dataframe.to_parquet method. We'll want to start fetching the table schema if not provided and use that for pandas to BigQuery type conversions #8142.

Copy link
Contributor Author

@plamut plamut Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a mismatch between the two methods, the pyarrow's accepts a richer range of compression methods.

Having either two different compression parameters, or a single parameter that accepts different values depending on the context, could be confusing to the end users, thus I will only allow the compression methods supported by both.

However, if there are uses cases that would need specific support for LZO, LZ4, and ZSTD as well, please let me know. Probably there aren't, because to date the compression method has not been exposed anyway?

It's good that we are marking the parameter as beta, as I can see how this can change in the future. 👍

Update:
Changed my mind after realizing that we probably should document the underlying serialization methods and link to their original docs. Since we are already exposing that detail, it makes less sense to try hiding compression options behind the lowest common denominator.


with open(tmppath, "rb") as parquet_file:
return self.load_table_from_file(
Expand Down
22 changes: 22 additions & 0 deletions bigquery/tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import functools
import warnings

import mock

try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -613,3 +615,23 @@ def test_dataframe_to_parquet_w_missing_columns(module_under_test, monkeypatch):
pandas.DataFrame(), (schema.SchemaField("not_found", "STRING"),), None
)
assert "columns in schema must match" in str(exc_context.value)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`")
def test_dataframe_to_parquet_compression_method(module_under_test):
bq_schema = (schema.SchemaField("field00", "STRING"),)
dataframe = pandas.DataFrame({"field00": ["foo", "bar"]})

write_table_patch = mock.patch.object(
module_under_test.pyarrow.parquet, "write_table", autospec=True
)

with write_table_patch as fake_write_table:
module_under_test.dataframe_to_parquet(
dataframe, bq_schema, None, parquet_compression="ZSTD"
)

call_args = fake_write_table.call_args
assert call_args is not None
assert call_args.kwargs.get("compression") == "ZSTD"
60 changes: 60 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5375,6 +5375,66 @@ def test_load_table_from_dataframe_w_schema_wo_pyarrow(self):
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == schema

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self):
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}]
dataframe = pandas.DataFrame(records)
schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER"))
job_config = job.LoadJobConfig(schema=schema)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
to_parquet_patch = mock.patch(
"google.cloud.bigquery.client._pandas_helpers.dataframe_to_parquet",
autospec=True,
)

with load_patch, to_parquet_patch as fake_to_parquet:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
job_config=job_config,
location=self.LOCATION,
parquet_compression="LZ4",
)

call_args = fake_to_parquet.call_args
assert call_args is not None
assert call_args.kwargs.get("parquet_compression") == "LZ4"

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self):
client = self._make_client()
records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}]
dataframe = pandas.DataFrame(records)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
pyarrow_patch = mock.patch("google.cloud.bigquery.client.pyarrow", None)
to_parquet_patch = mock.patch.object(
dataframe, "to_parquet", wraps=dataframe.to_parquet
)

with load_patch, pyarrow_patch, to_parquet_patch as to_parquet_spy:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
location=self.LOCATION,
parquet_compression="gzip",
)

call_args = to_parquet_spy.call_args
assert call_args is not None
assert call_args.kwargs.get("compression") == "gzip"

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nulls(self):
Expand Down