-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use job_config.schema
for data type conversion if specified in load_table_from_dataframe
.
#8105
Changes from all commits
d6c2ab5
46c3a12
58e59ea
aa38e42
6b22c34
cf40257
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Shared helper functions for connecting BigQuery and pandas.""" | ||
|
||
try: | ||
import pyarrow | ||
import pyarrow.parquet | ||
except ImportError: # pragma: NO COVER | ||
pyarrow = None | ||
|
||
from google.cloud.bigquery import schema | ||
|
||
|
||
STRUCT_TYPES = ("RECORD", "STRUCT") | ||
|
||
|
||
def pyarrow_datetime(): | ||
return pyarrow.timestamp("us", tz=None) | ||
|
||
|
||
def pyarrow_numeric(): | ||
return pyarrow.decimal128(38, 9) | ||
|
||
|
||
def pyarrow_time(): | ||
return pyarrow.time64("us") | ||
|
||
|
||
def pyarrow_timestamp(): | ||
return pyarrow.timestamp("us", tz="UTC") | ||
|
||
|
||
if pyarrow: | ||
BQ_TO_ARROW_SCALARS = { | ||
"BOOL": pyarrow.bool_, | ||
"BOOLEAN": pyarrow.bool_, | ||
"BYTES": pyarrow.binary, | ||
"DATE": pyarrow.date32, | ||
"DATETIME": pyarrow_datetime, | ||
"FLOAT": pyarrow.float64, | ||
"FLOAT64": pyarrow.float64, | ||
"GEOGRAPHY": pyarrow.string, | ||
"INT64": pyarrow.int64, | ||
"INTEGER": pyarrow.int64, | ||
"NUMERIC": pyarrow_numeric, | ||
"STRING": pyarrow.string, | ||
"TIME": pyarrow_time, | ||
"TIMESTAMP": pyarrow_timestamp, | ||
} | ||
else: # pragma: NO COVER | ||
BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER | ||
|
||
|
||
def bq_to_arrow_struct_data_type(field): | ||
arrow_fields = [] | ||
for subfield in field.fields: | ||
arrow_subfield = bq_to_arrow_field(subfield) | ||
if arrow_subfield: | ||
arrow_fields.append(arrow_subfield) | ||
else: | ||
# Could not determine a subfield type. Fallback to type | ||
# inference. | ||
return None | ||
return pyarrow.struct(arrow_fields) | ||
|
||
|
||
def bq_to_arrow_data_type(field): | ||
"""Return the Arrow data type, corresponding to a given BigQuery column. | ||
|
||
Returns None if default Arrow type inspection should be used. | ||
""" | ||
if field.mode is not None and field.mode.upper() == "REPEATED": | ||
inner_type = bq_to_arrow_data_type( | ||
schema.SchemaField(field.name, field.field_type) | ||
) | ||
if inner_type: | ||
return pyarrow.list_(inner_type) | ||
return None | ||
|
||
if field.field_type.upper() in STRUCT_TYPES: | ||
return bq_to_arrow_struct_data_type(field) | ||
|
||
data_type_constructor = BQ_TO_ARROW_SCALARS.get(field.field_type.upper()) | ||
if data_type_constructor is None: | ||
return None | ||
return data_type_constructor() | ||
|
||
|
||
def bq_to_arrow_field(bq_field): | ||
"""Return the Arrow field, corresponding to a given BigQuery column. | ||
|
||
Returns None if the Arrow type cannot be determined. | ||
""" | ||
arrow_type = bq_to_arrow_data_type(bq_field) | ||
if arrow_type: | ||
is_nullable = bq_field.mode.upper() == "NULLABLE" | ||
return pyarrow.field(bq_field.name, arrow_type, nullable=is_nullable) | ||
return None | ||
|
||
|
||
def bq_to_arrow_array(series, bq_field): | ||
arrow_type = bq_to_arrow_data_type(bq_field) | ||
if bq_field.mode.upper() == "REPEATED": | ||
return pyarrow.ListArray.from_pandas(series, type=arrow_type) | ||
if bq_field.field_type.upper() in STRUCT_TYPES: | ||
return pyarrow.StructArray.from_pandas(series, type=arrow_type) | ||
return pyarrow.array(series, type=arrow_type) | ||
|
||
|
||
def to_parquet(dataframe, bq_schema, filepath): | ||
"""Write dataframe as a Parquet file, according to the desired BQ schema. | ||
|
||
This function requires the :mod:`pyarrow` package. Arrow is used as an | ||
intermediate format. | ||
|
||
Args: | ||
dataframe (pandas.DataFrame): | ||
DataFrame to convert to convert to Parquet file. | ||
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): | ||
Desired BigQuery schema. Number of columns must match number of | ||
columns in the DataFrame. | ||
filepath (str): | ||
Path to write Parquet file to. | ||
""" | ||
if pyarrow is None: | ||
raise ValueError("pyarrow is required for BigQuery schema conversion.") | ||
|
||
if len(bq_schema) != len(dataframe.columns): | ||
raise ValueError( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note from chat: Maybe we want to allow the bq_schema to be used as an override? Any unmentioned columns get the default pandas type inference. This is how pandas-gbq works. The schema argument is more used as an override for when a particular column is ambiguous. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On second thought, let's leave this as-is and fixup later. Filed #8140 as a feature request. |
||
"Number of columns in schema must match number of columns in dataframe." | ||
) | ||
|
||
arrow_arrays = [] | ||
arrow_names = [] | ||
for bq_field in bq_schema: | ||
arrow_names.append(bq_field.name) | ||
arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field)) | ||
|
||
arrow_table = pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) | ||
pyarrow.parquet.write_table(arrow_table, filepath) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
from google.cloud.bigquery._helpers import _record_field_to_json | ||
from google.cloud.bigquery._helpers import _str_or_none | ||
from google.cloud.bigquery._http import Connection | ||
from google.cloud.bigquery import _pandas_helpers | ||
from google.cloud.bigquery.dataset import Dataset | ||
from google.cloud.bigquery.dataset import DatasetListItem | ||
from google.cloud.bigquery.dataset import DatasetReference | ||
|
@@ -1271,9 +1272,16 @@ def load_table_from_dataframe( | |
project (str, optional): | ||
Project ID of the project of where to run the job. Defaults | ||
to the client's project. | ||
job_config (google.cloud.bigquery.job.LoadJobConfig, optional): | ||
job_config (~google.cloud.bigquery.job.LoadJobConfig, optional): | ||
Extra configuration options for the job. | ||
|
||
To override the default pandas data type conversions, supply | ||
a value for | ||
:attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with | ||
column names matching those of the dataframe. The BigQuery | ||
schema is used to determine the correct data type conversion. | ||
Indexes are not loaded. Requires the :mod:`pyarrow` library. | ||
|
||
Returns: | ||
google.cloud.bigquery.job.LoadJob: A new load job. | ||
|
||
|
@@ -1296,7 +1304,10 @@ def load_table_from_dataframe( | |
os.close(tmpfd) | ||
|
||
try: | ||
dataframe.to_parquet(tmppath) | ||
if job_config.schema: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note from chat: if schema isn't populated, we might want to call get_table and use the table's schema if it the table already exists and we're appending to it. (This is what pandas-gbq does) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. Filed #8142. I think this would make a good feature, but shouldn't block this PR. |
||
_pandas_helpers.to_parquet(dataframe, job_config.schema, tmppath) | ||
else: | ||
dataframe.to_parquet(tmppath) | ||
|
||
with open(tmppath, "rb") as parquet_file: | ||
return self.load_table_from_file( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
|
||
import six | ||
import pytest | ||
import pytz | ||
|
||
try: | ||
from google.cloud import bigquery_storage_v1beta1 | ||
|
@@ -36,6 +37,10 @@ | |
import pandas | ||
except ImportError: # pragma: NO COVER | ||
pandas = None | ||
try: | ||
import pyarrow | ||
except ImportError: # pragma: NO COVER | ||
pyarrow = None | ||
try: | ||
import IPython | ||
from IPython.utils import io | ||
|
@@ -622,6 +627,159 @@ def test_load_table_from_local_avro_file_then_dump_table(self): | |
sorted(row_tuples, key=by_wavelength), sorted(ROWS, key=by_wavelength) | ||
) | ||
|
||
@unittest.skipIf(pandas is None, "Requires `pandas`") | ||
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`") | ||
def test_load_table_from_dataframe_w_nulls(self): | ||
"""Test that a DataFrame with null columns can be uploaded if a | ||
BigQuery schema is specified. | ||
|
||
See: https://github.com/googleapis/google-cloud-python/issues/7370 | ||
""" | ||
# Schema with all scalar types. | ||
scalars_schema = ( | ||
bigquery.SchemaField("bool_col", "BOOLEAN"), | ||
bigquery.SchemaField("bytes_col", "BYTES"), | ||
bigquery.SchemaField("date_col", "DATE"), | ||
bigquery.SchemaField("dt_col", "DATETIME"), | ||
bigquery.SchemaField("float_col", "FLOAT"), | ||
bigquery.SchemaField("geo_col", "GEOGRAPHY"), | ||
bigquery.SchemaField("int_col", "INTEGER"), | ||
bigquery.SchemaField("num_col", "NUMERIC"), | ||
bigquery.SchemaField("str_col", "STRING"), | ||
bigquery.SchemaField("time_col", "TIME"), | ||
bigquery.SchemaField("ts_col", "TIMESTAMP"), | ||
) | ||
table_schema = scalars_schema + ( | ||
# TODO: Array columns can't be read due to NULLABLE versus REPEATED | ||
# mode mismatch. See: | ||
# https://issuetracker.google.com/133415569#comment3 | ||
# bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), | ||
# TODO: Support writing StructArrays to Parquet. See: | ||
# https://jira.apache.org/jira/browse/ARROW-2587 | ||
# bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), | ||
) | ||
num_rows = 100 | ||
nulls = [None] * num_rows | ||
dataframe = pandas.DataFrame( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I would suggest putting in non-null values for the sample data to make the test more complete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bug actually only shows up when the whole column contains nulls, because when at least one non-null value is present, pandas auto-detect code works correctly. I do include non-nulls in the unit tests. |
||
{ | ||
"bool_col": nulls, | ||
"bytes_col": nulls, | ||
"date_col": nulls, | ||
"dt_col": nulls, | ||
"float_col": nulls, | ||
"geo_col": nulls, | ||
"int_col": nulls, | ||
"num_col": nulls, | ||
"str_col": nulls, | ||
"time_col": nulls, | ||
"ts_col": nulls, | ||
} | ||
) | ||
|
||
dataset_id = _make_dataset_id("bq_load_test") | ||
self.temp_dataset(dataset_id) | ||
table_id = "{}.{}.load_table_from_dataframe_w_nulls".format( | ||
Config.CLIENT.project, dataset_id | ||
) | ||
|
||
# Create the table before loading so that schema mismatch errors are | ||
# identified. | ||
table = retry_403(Config.CLIENT.create_table)( | ||
Table(table_id, schema=table_schema) | ||
) | ||
self.to_delete.insert(0, table) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason why we prepend the table ref to to_delete instead of appending it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So that the table gets deleted before the dataset does. |
||
|
||
job_config = bigquery.LoadJobConfig(schema=table_schema) | ||
load_job = Config.CLIENT.load_table_from_dataframe( | ||
dataframe, table_id, job_config=job_config | ||
) | ||
load_job.result() | ||
|
||
table = Config.CLIENT.get_table(table) | ||
self.assertEqual(tuple(table.schema), table_schema) | ||
self.assertEqual(table.num_rows, num_rows) | ||
|
||
@unittest.skipIf(pandas is None, "Requires `pandas`") | ||
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`") | ||
def test_load_table_from_dataframe_w_explicit_schema(self): | ||
# Schema with all scalar types. | ||
scalars_schema = ( | ||
bigquery.SchemaField("bool_col", "BOOLEAN"), | ||
bigquery.SchemaField("bytes_col", "BYTES"), | ||
bigquery.SchemaField("date_col", "DATE"), | ||
bigquery.SchemaField("dt_col", "DATETIME"), | ||
bigquery.SchemaField("float_col", "FLOAT"), | ||
bigquery.SchemaField("geo_col", "GEOGRAPHY"), | ||
bigquery.SchemaField("int_col", "INTEGER"), | ||
bigquery.SchemaField("num_col", "NUMERIC"), | ||
bigquery.SchemaField("str_col", "STRING"), | ||
bigquery.SchemaField("time_col", "TIME"), | ||
bigquery.SchemaField("ts_col", "TIMESTAMP"), | ||
) | ||
table_schema = scalars_schema + ( | ||
# TODO: Array columns can't be read due to NULLABLE versus REPEATED | ||
# mode mismatch. See: | ||
# https://issuetracker.google.com/133415569#comment3 | ||
# bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), | ||
# TODO: Support writing StructArrays to Parquet. See: | ||
# https://jira.apache.org/jira/browse/ARROW-2587 | ||
# bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), | ||
) | ||
dataframe = pandas.DataFrame( | ||
{ | ||
"bool_col": [True, None, False], | ||
"bytes_col": [b"abc", None, b"def"], | ||
"date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], | ||
"dt_col": [ | ||
datetime.datetime(1, 1, 1, 0, 0, 0), | ||
None, | ||
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), | ||
], | ||
"float_col": [float("-inf"), float("nan"), float("inf")], | ||
"geo_col": [ | ||
"POINT(30 10)", | ||
None, | ||
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", | ||
], | ||
"int_col": [-9223372036854775808, None, 9223372036854775807], | ||
"num_col": [ | ||
decimal.Decimal("-99999999999999999999999999999.999999999"), | ||
None, | ||
decimal.Decimal("99999999999999999999999999999.999999999"), | ||
], | ||
"str_col": ["abc", None, "def"], | ||
"time_col": [ | ||
datetime.time(0, 0, 0), | ||
None, | ||
datetime.time(23, 59, 59, 999999), | ||
], | ||
"ts_col": [ | ||
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), | ||
None, | ||
datetime.datetime( | ||
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc | ||
), | ||
], | ||
}, | ||
dtype="object", | ||
) | ||
|
||
dataset_id = _make_dataset_id("bq_load_test") | ||
self.temp_dataset(dataset_id) | ||
table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema".format( | ||
Config.CLIENT.project, dataset_id | ||
) | ||
|
||
job_config = bigquery.LoadJobConfig(schema=table_schema) | ||
load_job = Config.CLIENT.load_table_from_dataframe( | ||
dataframe, table_id, job_config=job_config | ||
) | ||
load_job.result() | ||
|
||
table = Config.CLIENT.get_table(table_id) | ||
self.assertEqual(tuple(table.schema), table_schema) | ||
self.assertEqual(table.num_rows, 3) | ||
|
||
def test_load_avro_from_uri_then_dump_table(self): | ||
from google.cloud.bigquery.job import CreateDisposition | ||
from google.cloud.bigquery.job import SourceFormat | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a list somewhere that defines BQ types? I wonder if we can add an assertion here that BQ_TO_ARROW_SCALARS.keys() == BQ_TYPES.keys(), so we have a better guarantee that all types are accounted for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. There's an open FR at #7632 I've been hesitant to add such a list, since it's yet another thing to keep in sync manually, but I agree it'd be useful for cases such as this.