-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: support ARRAY data type when loading from DataFrame with Parquet #980
Changes from 18 commits
62fd565
f44a6ae
8179ded
bbdad5d
839004c
3cb2439
16b9fc0
ba1b321
5f915cd
1c52bb4
a452b31
ffb34a6
d3828b1
62137d8
ba3f145
ea54491
e43c6fc
d9c508c
e8be400
9ec8c67
4fa8665
4714b6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,19 +27,11 @@ | |
import json | ||
import math | ||
import os | ||
import packaging.version | ||
import tempfile | ||
from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union | ||
import uuid | ||
import warnings | ||
|
||
try: | ||
import pyarrow | ||
|
||
_PYARROW_VERSION = packaging.version.parse(pyarrow.__version__) | ||
except ImportError: # pragma: NO COVER | ||
pyarrow = None | ||
|
||
from google import resumable_media # type: ignore | ||
from google.resumable_media.requests import MultipartUpload | ||
from google.resumable_media.requests import ResumableUpload | ||
|
@@ -103,6 +95,10 @@ | |
from google.cloud.bigquery.table import TableListItem | ||
from google.cloud.bigquery.table import TableReference | ||
from google.cloud.bigquery.table import RowIterator | ||
from google.cloud.bigquery.format_options import ParquetOptions | ||
from google.cloud.bigquery import _helpers | ||
|
||
pyarrow = _helpers.PYARROW_VERSIONS.try_import() | ||
|
||
|
||
_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB | ||
|
@@ -128,8 +124,6 @@ | |
# https://github.com/googleapis/python-bigquery/issues/438 | ||
_MIN_GET_QUERY_RESULTS_TIMEOUT = 120 | ||
|
||
# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414 | ||
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")]) | ||
|
||
TIMEOUT_HEADER = "X-Server-Timeout" | ||
|
||
|
@@ -2469,10 +2463,10 @@ def load_table_from_dataframe( | |
They are supported when using the PARQUET source format, but | ||
due to the way they are encoded in the ``parquet`` file, | ||
a mismatch with the existing table schema can occur, so | ||
100% compatibility cannot be guaranteed for REPEATED fields when | ||
REPEATED fields are not properly supported when using ``pyarrow<4.0.0`` | ||
using the parquet format. | ||
|
||
https://github.com/googleapis/python-bigquery/issues/17 | ||
https://github.com/googleapis/python-bigquery/issues/19 | ||
|
||
Args: | ||
dataframe (pandas.DataFrame): | ||
|
@@ -2519,18 +2513,18 @@ def load_table_from_dataframe( | |
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are | ||
supported. | ||
parquet_compression (Optional[str]): | ||
[Beta] The compression method to use if intermittently | ||
serializing ``dataframe`` to a parquet file. | ||
|
||
The argument is directly passed as the ``compression`` | ||
argument to the underlying ``pyarrow.parquet.write_table()`` | ||
method (the default value "snappy" gets converted to uppercase). | ||
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table | ||
|
||
If the job config schema is missing, the argument is directly | ||
passed as the ``compression`` argument to the underlying | ||
``DataFrame.to_parquet()`` method. | ||
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet | ||
[Beta] The compression method to use if intermittently | ||
serializing ``dataframe`` to a parquet file. | ||
|
||
The argument is directly passed as the ``compression`` | ||
argument to the underlying ``pyarrow.parquet.write_table()`` | ||
method (the default value "snappy" gets converted to uppercase). | ||
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table | ||
|
||
If the job config schema is missing, the argument is directly | ||
passed as the ``compression`` argument to the underlying | ||
``DataFrame.to_parquet()`` method. | ||
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet | ||
timeout (Optional[float]): | ||
The number of seconds to wait for the underlying HTTP transport | ||
before using ``retry``. | ||
|
@@ -2562,6 +2556,13 @@ def load_table_from_dataframe( | |
if job_config.source_format is None: | ||
# default value | ||
job_config.source_format = job.SourceFormat.PARQUET | ||
|
||
if job_config.parquet_options is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since CSV is also supported, should we check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, you're right. Probably would be fine. But also best not to be going around editing the JobConfig if not necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the coverage failure, we need a unit test where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done.
tswast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
parquet_options = ParquetOptions() | ||
# default value | ||
parquet_options.enable_list_inference = True | ||
tswast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
job_config.parquet_options = parquet_options | ||
|
||
if job_config.source_format not in supported_formats: | ||
raise ValueError( | ||
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format( | ||
|
@@ -2628,12 +2629,12 @@ def load_table_from_dataframe( | |
try: | ||
|
||
if job_config.source_format == job.SourceFormat.PARQUET: | ||
if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS: | ||
if _helpers.PYARROW_VERSIONS.is_bad_version: | ||
msg = ( | ||
"Loading dataframe data in PARQUET format with pyarrow " | ||
f"{_PYARROW_VERSION} can result in data corruption. It is " | ||
"therefore *strongly* advised to use a different pyarrow " | ||
"version or a different source format. " | ||
f"{_helpers.PYARROW_VERSIONS.installed_version} can result in data " | ||
"corruption. It is therefore *strongly* advised to use a " | ||
"different pyarrow version or a different source format. " | ||
"See: https://github.com/googleapis/python-bigquery/issues/781" | ||
) | ||
warnings.warn(msg, category=RuntimeWarning) | ||
|
@@ -2647,9 +2648,17 @@ def load_table_from_dataframe( | |
job_config.schema, | ||
tmppath, | ||
parquet_compression=parquet_compression, | ||
parquet_use_compliant_nested_type=True, | ||
) | ||
else: | ||
dataframe.to_parquet(tmppath, compression=parquet_compression) | ||
dataframe.to_parquet( | ||
tmppath, | ||
engine="pyarrow", | ||
compression=parquet_compression, | ||
**{"use_compliant_nested_type": True} | ||
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type | ||
else {}, | ||
tswast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
|
||
else: | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍