Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Allow choice of compression when loading from dataframe #8938

Merged
merged 3 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ def load_table_from_dataframe(
location=None,
project=None,
job_config=None,
parquet_compression="snappy",
):
"""Upload the contents of a table from a pandas DataFrame.

Expand Down Expand Up @@ -1491,6 +1492,15 @@ def load_table_from_dataframe(
column names matching those of the dataframe. The BigQuery
schema is used to determine the correct data type conversion.
Indexes are not loaded. Requires the :mod:`pyarrow` library.
parquet_compression (str):
The compression method to use if intermittently serializing
plamut marked this conversation as resolved.
Show resolved Hide resolved
``dataframe`` to a parquet file. Must be one of {"snappy",
"gzip", "brotli"}, or ``None`` for no compression. Defaults
to "snappy".

The argument is directly passed as the ``compression`` argument
to the underlying ``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet

Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -1527,7 +1537,7 @@ def load_table_from_dataframe(
PendingDeprecationWarning,
stacklevel=2,
)
dataframe.to_parquet(tmppath)
dataframe.to_parquet(tmppath, compression=parquet_compression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need to modify _pandas_helpers.dataframe_to_parquet as well. See:

pyarrow.parquet.write_table(arrow_table, filepath)
The underlying pyarrow.parquet.write_table function also takes a compression argument.

Long-term, I expect the _pandas_helpers.dataframe_to_parquet function to get used more often than the dataframe.to_parquet method. We'll want to start fetching the table schema if not provided and use that for pandas to BigQuery type conversions #8142.

Copy link
Contributor Author

@plamut plamut Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a mismatch between the two methods, the pyarrow's accepts a richer range of compression methods.

Having either two different compression parameters, or a single parameter that accepts different values depending on the context, could be confusing to the end users, thus I will only allow the compression methods supported by both.

However, if there are uses cases that would need specific support for LZO, LZ4, and ZSTD as well, please let me know. Probably there aren't, because to date the compression method has not been exposed anyway?

It's good that we are marking the parameter as beta, as I can see how this can change in the future. 👍

Update:
Changed my mind after realizing that we probably should document the underlying serialization methods and link to their original docs. Since we are already exposing that detail, it makes less sense to try hiding compression options behind the lowest common denominator.


with open(tmppath, "rb") as parquet_file:
return self.load_table_from_file(
Expand Down
27 changes: 27 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5375,6 +5375,33 @@ def test_load_table_from_dataframe_w_schema_wo_pyarrow(self):
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == schema

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self):
client = self._make_client()
records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}]
dataframe = pandas.DataFrame(records)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
pyarrow_patch = mock.patch("google.cloud.bigquery.client.pyarrow", None)
to_parquet_patch = mock.patch.object(
dataframe, "to_parquet", wraps=dataframe.to_parquet
)

with load_patch, pyarrow_patch, to_parquet_patch as to_parquet_spy:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
location=self.LOCATION,
parquet_compression="gzip",
)

call_args = to_parquet_spy.call_args
assert call_args is not None
assert call_args.kwargs.get("compression") == "gzip"

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nulls(self):
Expand Down