Skip to content

Commit

Permalink
Support compression arg in dataframe_to_parquet()
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Aug 6, 2019
1 parent f8170e9 commit 6e6797a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 7 deletions.
9 changes: 7 additions & 2 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)


def dataframe_to_parquet(dataframe, bq_schema, filepath):
def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SNAPPY"):
"""Write dataframe as a Parquet file, according to the desired BQ schema.
This function requires the :mod:`pyarrow` package. Arrow is used as an
Expand All @@ -222,12 +222,17 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath):
columns in the DataFrame.
filepath (str):
Path to write Parquet file to.
parquet_compression (str):
(optional) The compression codec to use by the the
``pyarrow.parquet.write_table`` serializing method. Defaults to
"SNAPPY".
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
"""
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)


def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
Expand Down
21 changes: 16 additions & 5 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1494,11 +1494,16 @@ def load_table_from_dataframe(
Indexes are not loaded. Requires the :mod:`pyarrow` library.
parquet_compression (str):
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file. Must be one of
{"snappy", "gzip", "brotli"}, or ``None`` for no compression.
Defaults to "snappy".
serializing ``dataframe`` to a parquet file.
The argument is directly passed as the ``compression`` argument
If ``pyarrow`` and job config schema are used, the argument
is directly passed as the ``compression`` argument to the
underlying ``pyarrow.parquet.write_table()`` method (the
default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
If either ``pyarrow`` or job config schema are missing, the
argument is directly passed as the ``compression`` argument
to the underlying ``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
Expand All @@ -1525,8 +1530,14 @@ def load_table_from_dataframe(

try:
if pyarrow and job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

_pandas_helpers.dataframe_to_parquet(
dataframe, job_config.schema, tmppath
dataframe,
job_config.schema,
tmppath,
parquet_compression=parquet_compression,
)
else:
if job_config.schema:
Expand Down
22 changes: 22 additions & 0 deletions bigquery/tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import functools
import warnings

import mock

try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -613,3 +615,23 @@ def test_dataframe_to_parquet_w_missing_columns(module_under_test, monkeypatch):
pandas.DataFrame(), (schema.SchemaField("not_found", "STRING"),), None
)
assert "columns in schema must match" in str(exc_context.value)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`")
def test_dataframe_to_parquet_compression_method(module_under_test):
bq_schema = (schema.SchemaField("field00", "STRING"),)
dataframe = pandas.DataFrame({"field00": ["foo", "bar"]})

write_table_patch = mock.patch.object(
module_under_test.pyarrow.parquet, "write_table", autospec=True
)

with write_table_patch as fake_write_table:
module_under_test.dataframe_to_parquet(
dataframe, bq_schema, None, parquet_compression="ZSTD"
)

call_args = fake_write_table.call_args
assert call_args is not None
assert call_args.kwargs.get("compression") == "ZSTD"
33 changes: 33 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5375,6 +5375,39 @@ def test_load_table_from_dataframe_w_schema_wo_pyarrow(self):
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == schema

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self):
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}]
dataframe = pandas.DataFrame(records)
schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER"))
job_config = job.LoadJobConfig(schema=schema)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
to_parquet_patch = mock.patch(
"google.cloud.bigquery.client._pandas_helpers.dataframe_to_parquet",
autospec=True,
)

with load_patch, to_parquet_patch as fake_to_parquet:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
job_config=job_config,
location=self.LOCATION,
parquet_compression="LZ4",
)

call_args = fake_to_parquet.call_args
assert call_args is not None
assert call_args.kwargs.get("parquet_compression") == "LZ4"

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self):
Expand Down

0 comments on commit 6e6797a

Please sign in to comment.