diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py new file mode 100644 index 000000000000..eeb65e0b9766 --- /dev/null +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -0,0 +1,152 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared helper functions for connecting BigQuery and pandas.""" + +try: + import pyarrow + import pyarrow.parquet +except ImportError: # pragma: NO COVER + pyarrow = None + +from google.cloud.bigquery import schema + + +STRUCT_TYPES = ("RECORD", "STRUCT") + + +def pyarrow_datetime(): + return pyarrow.timestamp("us", tz=None) + + +def pyarrow_numeric(): + return pyarrow.decimal128(38, 9) + + +def pyarrow_time(): + return pyarrow.time64("us") + + +def pyarrow_timestamp(): + return pyarrow.timestamp("us", tz="UTC") + + +if pyarrow: + BQ_TO_ARROW_SCALARS = { + "BOOL": pyarrow.bool_, + "BOOLEAN": pyarrow.bool_, + "BYTES": pyarrow.binary, + "DATE": pyarrow.date32, + "DATETIME": pyarrow_datetime, + "FLOAT": pyarrow.float64, + "FLOAT64": pyarrow.float64, + "GEOGRAPHY": pyarrow.string, + "INT64": pyarrow.int64, + "INTEGER": pyarrow.int64, + "NUMERIC": pyarrow_numeric, + "STRING": pyarrow.string, + "TIME": pyarrow_time, + "TIMESTAMP": pyarrow_timestamp, + } +else: # pragma: NO COVER + BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER + + +def bq_to_arrow_struct_data_type(field): + arrow_fields = [] + for subfield in field.fields: + arrow_subfield = bq_to_arrow_field(subfield) + if arrow_subfield: + arrow_fields.append(arrow_subfield) + else: + # Could not determine a subfield type. Fallback to type + # inference. + return None + return pyarrow.struct(arrow_fields) + + +def bq_to_arrow_data_type(field): + """Return the Arrow data type, corresponding to a given BigQuery column. + + Returns None if default Arrow type inspection should be used. + """ + if field.mode is not None and field.mode.upper() == "REPEATED": + inner_type = bq_to_arrow_data_type( + schema.SchemaField(field.name, field.field_type) + ) + if inner_type: + return pyarrow.list_(inner_type) + return None + + if field.field_type.upper() in STRUCT_TYPES: + return bq_to_arrow_struct_data_type(field) + + data_type_constructor = BQ_TO_ARROW_SCALARS.get(field.field_type.upper()) + if data_type_constructor is None: + return None + return data_type_constructor() + + +def bq_to_arrow_field(bq_field): + """Return the Arrow field, corresponding to a given BigQuery column. + + Returns None if the Arrow type cannot be determined. + """ + arrow_type = bq_to_arrow_data_type(bq_field) + if arrow_type: + is_nullable = bq_field.mode.upper() == "NULLABLE" + return pyarrow.field(bq_field.name, arrow_type, nullable=is_nullable) + return None + + +def bq_to_arrow_array(series, bq_field): + arrow_type = bq_to_arrow_data_type(bq_field) + if bq_field.mode.upper() == "REPEATED": + return pyarrow.ListArray.from_pandas(series, type=arrow_type) + if bq_field.field_type.upper() in STRUCT_TYPES: + return pyarrow.StructArray.from_pandas(series, type=arrow_type) + return pyarrow.array(series, type=arrow_type) + + +def to_parquet(dataframe, bq_schema, filepath): + """Write dataframe as a Parquet file, according to the desired BQ schema. + + This function requires the :mod:`pyarrow` package. Arrow is used as an + intermediate format. + + Args: + dataframe (pandas.DataFrame): + DataFrame to convert to convert to Parquet file. + bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + Desired BigQuery schema. Number of columns must match number of + columns in the DataFrame. + filepath (str): + Path to write Parquet file to. + """ + if pyarrow is None: + raise ValueError("pyarrow is required for BigQuery schema conversion.") + + if len(bq_schema) != len(dataframe.columns): + raise ValueError( + "Number of columns in schema must match number of columns in dataframe." + ) + + arrow_arrays = [] + arrow_names = [] + for bq_field in bq_schema: + arrow_names.append(bq_field.name) + arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field)) + + arrow_table = pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) + pyarrow.parquet.write_table(arrow_table, filepath) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index f61c18f11bd4..78d718aa6a2a 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -44,6 +44,7 @@ from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._http import Connection +from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetListItem from google.cloud.bigquery.dataset import DatasetReference @@ -1271,9 +1272,16 @@ def load_table_from_dataframe( project (str, optional): Project ID of the project of where to run the job. Defaults to the client's project. - job_config (google.cloud.bigquery.job.LoadJobConfig, optional): + job_config (~google.cloud.bigquery.job.LoadJobConfig, optional): Extra configuration options for the job. + To override the default pandas data type conversions, supply + a value for + :attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with + column names matching those of the dataframe. The BigQuery + schema is used to determine the correct data type conversion. + Indexes are not loaded. Requires the :mod:`pyarrow` library. + Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -1296,7 +1304,10 @@ def load_table_from_dataframe( os.close(tmpfd) try: - dataframe.to_parquet(tmppath) + if job_config.schema: + _pandas_helpers.to_parquet(dataframe, job_config.schema, tmppath) + else: + dataframe.to_parquet(tmppath) with open(tmppath, "rb") as parquet_file: return self.load_table_from_file( diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index cceca192b8f7..2b4aa84b8faf 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -27,6 +27,7 @@ import six import pytest +import pytz try: from google.cloud import bigquery_storage_v1beta1 @@ -36,6 +37,10 @@ import pandas except ImportError: # pragma: NO COVER pandas = None +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None try: import IPython from IPython.utils import io @@ -622,6 +627,159 @@ def test_load_table_from_local_avro_file_then_dump_table(self): sorted(row_tuples, key=by_wavelength), sorted(ROWS, key=by_wavelength) ) + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_nulls(self): + """Test that a DataFrame with null columns can be uploaded if a + BigQuery schema is specified. + + See: https://github.com/googleapis/google-cloud-python/issues/7370 + """ + # Schema with all scalar types. + scalars_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + table_schema = scalars_schema + ( + # TODO: Array columns can't be read due to NULLABLE versus REPEATED + # mode mismatch. See: + # https://issuetracker.google.com/133415569#comment3 + # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), + # TODO: Support writing StructArrays to Parquet. See: + # https://jira.apache.org/jira/browse/ARROW-2587 + # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), + ) + num_rows = 100 + nulls = [None] * num_rows + dataframe = pandas.DataFrame( + { + "bool_col": nulls, + "bytes_col": nulls, + "date_col": nulls, + "dt_col": nulls, + "float_col": nulls, + "geo_col": nulls, + "int_col": nulls, + "num_col": nulls, + "str_col": nulls, + "time_col": nulls, + "ts_col": nulls, + } + ) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_nulls".format( + Config.CLIENT.project, dataset_id + ) + + # Create the table before loading so that schema mismatch errors are + # identified. + table = retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, num_rows) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_explicit_schema(self): + # Schema with all scalar types. + scalars_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + table_schema = scalars_schema + ( + # TODO: Array columns can't be read due to NULLABLE versus REPEATED + # mode mismatch. See: + # https://issuetracker.google.com/133415569#comment3 + # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), + # TODO: Support writing StructArrays to Parquet. See: + # https://jira.apache.org/jira/browse/ARROW-2587 + # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), + ) + dataframe = pandas.DataFrame( + { + "bool_col": [True, None, False], + "bytes_col": [b"abc", None, b"def"], + "date_col": [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], + "dt_col": [ + datetime.datetime(1, 1, 1, 0, 0, 0), + None, + datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + ], + "float_col": [float("-inf"), float("nan"), float("inf")], + "geo_col": [ + "POINT(30 10)", + None, + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + "int_col": [-9223372036854775808, None, 9223372036854775807], + "num_col": [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + ], + "str_col": ["abc", None, "def"], + "time_col": [ + datetime.time(0, 0, 0), + None, + datetime.time(23, 59, 59, 999999), + ], + "ts_col": [ + datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + None, + datetime.datetime( + 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc + ), + ], + }, + dtype="object", + ) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema".format( + Config.CLIENT.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table_id) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 3) + def test_load_avro_from_uri_then_dump_table(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import SourceFormat diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py new file mode 100644 index 000000000000..f04f95307806 --- /dev/null +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -0,0 +1,458 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import decimal +import functools + +try: + import pandas +except ImportError: # pragma: NO COVER + pandas = None +try: + import pyarrow + import pyarrow.types +except ImportError: # pragma: NO COVER + pyarrow = None +import pytest + +from google.cloud.bigquery import schema + + +@pytest.fixture +def module_under_test(): + from google.cloud.bigquery import _pandas_helpers + + return _pandas_helpers + + +def is_none(value): + return value is None + + +def is_datetime(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime-type + return all_( + pyarrow.types.is_timestamp, + lambda type_: type_.unit == "us", + lambda type_: type_.tz is None, + )(type_) + + +def is_numeric(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type + return all_( + pyarrow.types.is_decimal, + lambda type_: type_.precision == 38, + lambda type_: type_.scale == 9, + )(type_) + + +def is_timestamp(type_): + # See: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp-type + return all_( + pyarrow.types.is_timestamp, + lambda type_: type_.unit == "us", + lambda type_: type_.tz == "UTC", + )(type_) + + +def do_all(functions, value): + return all((func(value) for func in functions)) + + +def all_(*functions): + return functools.partial(do_all, functions) + + +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_is_datetime(): + assert is_datetime(pyarrow.timestamp("us", tz=None)) + assert not is_datetime(pyarrow.timestamp("ms", tz=None)) + assert not is_datetime(pyarrow.timestamp("us", tz="UTC")) + assert not is_datetime(pyarrow.string()) + + +def test_do_all(): + assert do_all((lambda _: True, lambda _: True), None) + assert not do_all((lambda _: True, lambda _: False), None) + assert not do_all((lambda _: False,), None) + + +def test_all_(): + assert all_(lambda _: True, lambda _: True)(None) + assert not all_(lambda _: True, lambda _: False)(None) + + +@pytest.mark.parametrize( + "bq_type,bq_mode,is_correct_type", + [ + ("STRING", "NULLABLE", pyarrow.types.is_string), + ("STRING", None, pyarrow.types.is_string), + ("string", "NULLABLE", pyarrow.types.is_string), + ("StRiNg", "NULLABLE", pyarrow.types.is_string), + ("BYTES", "NULLABLE", pyarrow.types.is_binary), + ("INTEGER", "NULLABLE", pyarrow.types.is_int64), + ("INT64", "NULLABLE", pyarrow.types.is_int64), + ("FLOAT", "NULLABLE", pyarrow.types.is_float64), + ("FLOAT64", "NULLABLE", pyarrow.types.is_float64), + ("NUMERIC", "NULLABLE", is_numeric), + ("BOOLEAN", "NULLABLE", pyarrow.types.is_boolean), + ("BOOL", "NULLABLE", pyarrow.types.is_boolean), + ("TIMESTAMP", "NULLABLE", is_timestamp), + ("DATE", "NULLABLE", pyarrow.types.is_date32), + ("TIME", "NULLABLE", pyarrow.types.is_time64), + ("DATETIME", "NULLABLE", is_datetime), + ("GEOGRAPHY", "NULLABLE", pyarrow.types.is_string), + ("UNKNOWN_TYPE", "NULLABLE", is_none), + # Use pyarrow.list_(item_type) for repeated (array) fields. + ( + "STRING", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "STRING", + "repeated", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "STRING", + "RePeAtEd", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ( + "BYTES", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_binary(type_.value_type), + ), + ), + ( + "INTEGER", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_int64(type_.value_type), + ), + ), + ( + "INT64", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_int64(type_.value_type), + ), + ), + ( + "FLOAT", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_float64(type_.value_type), + ), + ), + ( + "FLOAT64", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_float64(type_.value_type), + ), + ), + ( + "NUMERIC", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_numeric(type_.value_type)), + ), + ( + "BOOLEAN", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_boolean(type_.value_type), + ), + ), + ( + "BOOL", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_boolean(type_.value_type), + ), + ), + ( + "TIMESTAMP", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_timestamp(type_.value_type)), + ), + ( + "DATE", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_date32(type_.value_type), + ), + ), + ( + "TIME", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_time64(type_.value_type), + ), + ), + ( + "DATETIME", + "REPEATED", + all_(pyarrow.types.is_list, lambda type_: is_datetime(type_.value_type)), + ), + ( + "GEOGRAPHY", + "REPEATED", + all_( + pyarrow.types.is_list, + lambda type_: pyarrow.types.is_string(type_.value_type), + ), + ), + ("RECORD", "REPEATED", is_none), + ("UNKNOWN_TYPE", "REPEATED", is_none), + ], +) +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_data_type(module_under_test, bq_type, bq_mode, is_correct_type): + field = schema.SchemaField("ignored_name", bq_type, mode=bq_mode) + actual = module_under_test.bq_to_arrow_data_type(field) + assert is_correct_type(actual) + + +@pytest.mark.parametrize("bq_type", ["RECORD", "record", "STRUCT", "struct"]) +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_data_type_w_struct(module_under_test, bq_type): + fields = ( + schema.SchemaField("field01", "STRING"), + schema.SchemaField("field02", "BYTES"), + schema.SchemaField("field03", "INTEGER"), + schema.SchemaField("field04", "INT64"), + schema.SchemaField("field05", "FLOAT"), + schema.SchemaField("field06", "FLOAT64"), + schema.SchemaField("field07", "NUMERIC"), + schema.SchemaField("field08", "BOOLEAN"), + schema.SchemaField("field09", "BOOL"), + schema.SchemaField("field10", "TIMESTAMP"), + schema.SchemaField("field11", "DATE"), + schema.SchemaField("field12", "TIME"), + schema.SchemaField("field13", "DATETIME"), + schema.SchemaField("field14", "GEOGRAPHY"), + ) + field = schema.SchemaField("ignored_name", bq_type, mode="NULLABLE", fields=fields) + actual = module_under_test.bq_to_arrow_data_type(field) + expected = pyarrow.struct( + ( + pyarrow.field("field01", pyarrow.string()), + pyarrow.field("field02", pyarrow.binary()), + pyarrow.field("field03", pyarrow.int64()), + pyarrow.field("field04", pyarrow.int64()), + pyarrow.field("field05", pyarrow.float64()), + pyarrow.field("field06", pyarrow.float64()), + pyarrow.field("field07", module_under_test.pyarrow_numeric()), + pyarrow.field("field08", pyarrow.bool_()), + pyarrow.field("field09", pyarrow.bool_()), + pyarrow.field("field10", module_under_test.pyarrow_timestamp()), + pyarrow.field("field11", pyarrow.date32()), + pyarrow.field("field12", module_under_test.pyarrow_time()), + pyarrow.field("field13", module_under_test.pyarrow_datetime()), + pyarrow.field("field14", pyarrow.string()), + ) + ) + assert pyarrow.types.is_struct(actual) + assert actual.num_children == len(fields) + assert actual.equals(expected) + + +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test): + fields = ( + schema.SchemaField("field1", "STRING"), + schema.SchemaField("field2", "INTEGER"), + # Don't know what to convert UNKNOWN_TYPE to, let type inference work, + # instead. + schema.SchemaField("field3", "UNKNOWN_TYPE"), + ) + field = schema.SchemaField("ignored_name", "RECORD", mode="NULLABLE", fields=fields) + actual = module_under_test.bq_to_arrow_data_type(field) + assert actual is None + + +@pytest.mark.parametrize( + "bq_type,rows", + [ + ("STRING", ["abc", None, "def", None]), + ("BYTES", [b"abc", None, b"def", None]), + ("INTEGER", [123, None, 456, None]), + ("INT64", [-9223372036854775808, None, 9223372036854775807, 123]), + ("FLOAT", [1.25, None, 3.5, None]), + ( + "NUMERIC", + [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + decimal.Decimal("999.123456789"), + ], + ), + ("BOOLEAN", [True, None, False, None]), + ("BOOL", [False, None, True, None]), + # TODO: Once https://issues.apache.org/jira/browse/ARROW-5450 is + # resolved, test with TIMESTAMP column. Conversion from pyarrow + # TimestampArray to list of Python objects fails with OverflowError: + # Python int too large to convert to C long. + # + # ( + # "TIMESTAMP", + # [ + # datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + # None, + # datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc), + # datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + # ], + # ), + ( + "DATE", + [ + datetime.date(1, 1, 1), + None, + datetime.date(9999, 12, 31), + datetime.date(1970, 1, 1), + ], + ), + ( + "TIME", + [ + datetime.time(0, 0, 0), + None, + datetime.time(23, 59, 59, 999999), + datetime.time(12, 0, 0), + ], + ), + # TODO: Once https://issues.apache.org/jira/browse/ARROW-5450 is + # resolved, test with DATETIME column. Conversion from pyarrow + # TimestampArray to list of Python objects fails with OverflowError: + # Python int too large to convert to C long. + # + # ( + # "DATETIME", + # [ + # datetime.datetime(1, 1, 1, 0, 0, 0), + # None, + # datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + # datetime.datetime(1970, 1, 1, 0, 0, 0), + # ], + # ), + ( + "GEOGRAPHY", + [ + "POINT(30, 10)", + None, + "LINESTRING (30 10, 10 30, 40 40)", + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + ), + ], +) +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_array_w_nullable_scalars(module_under_test, bq_type, rows): + series = pandas.Series(rows, dtype="object") + bq_field = schema.SchemaField("field_name", bq_type) + arrow_array = module_under_test.bq_to_arrow_array(series, bq_field) + roundtrip = arrow_array.to_pylist() + assert rows == roundtrip + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_array_w_arrays(module_under_test): + rows = [[1, 2, 3], [], [4, 5, 6]] + series = pandas.Series(rows, dtype="object") + bq_field = schema.SchemaField("field_name", "INTEGER", mode="REPEATED") + arrow_array = module_under_test.bq_to_arrow_array(series, bq_field) + roundtrip = arrow_array.to_pylist() + assert rows == roundtrip + + +@pytest.mark.parametrize("bq_type", ["RECORD", "record", "STRUCT", "struct"]) +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_array_w_structs(module_under_test, bq_type): + rows = [ + {"int_col": 123, "string_col": "abc"}, + None, + {"int_col": 456, "string_col": "def"}, + ] + series = pandas.Series(rows, dtype="object") + bq_field = schema.SchemaField( + "field_name", + bq_type, + fields=( + schema.SchemaField("int_col", "INTEGER"), + schema.SchemaField("string_col", "STRING"), + ), + ) + arrow_array = module_under_test.bq_to_arrow_array(series, bq_field) + roundtrip = arrow_array.to_pylist() + assert rows == roundtrip + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_bq_to_arrow_array_w_special_floats(module_under_test): + bq_field = schema.SchemaField("field_name", "FLOAT64") + rows = [float("-inf"), float("nan"), float("inf"), None] + series = pandas.Series(rows, dtype="object") + arrow_array = module_under_test.bq_to_arrow_array(series, bq_field) + roundtrip = arrow_array.to_pylist() + assert len(rows) == len(roundtrip) + assert roundtrip[0] == float("-inf") + assert roundtrip[1] != roundtrip[1] # NaN doesn't equal itself. + assert roundtrip[2] == float("inf") + assert roundtrip[3] is None + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +def test_to_parquet_without_pyarrow(module_under_test, monkeypatch): + monkeypatch.setattr(module_under_test, "pyarrow", None) + with pytest.raises(ValueError) as exc: + module_under_test.to_parquet(pandas.DataFrame(), (), None) + assert "pyarrow is required" in str(exc) + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_to_parquet_w_missing_columns(module_under_test, monkeypatch): + with pytest.raises(ValueError) as exc: + module_under_test.to_parquet( + pandas.DataFrame(), (schema.SchemaField("not_found", "STRING"),), None + ) + assert "columns in schema must match" in str(exc) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 13889f90d7e8..dd98f2bcce64 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5000,6 +5000,49 @@ def test_load_table_from_dataframe_w_custom_job_config(self): assert sent_config is job_config assert sent_config.source_format == job.SourceFormat.PARQUET + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_nulls(self): + """Test that a DataFrame with null columns can be uploaded if a + BigQuery schema is specified. + + See: https://github.com/googleapis/google-cloud-python/issues/7370 + """ + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + + client = self._make_client() + records = [{"name": None, "age": None}, {"name": None, "age": None}] + dataframe = pandas.DataFrame(records) + schema = [SchemaField("name", "STRING"), SchemaField("age", "INTEGER")] + job_config = job.LoadJobConfig(schema=schema) + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=mock.ANY, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config is job_config + assert sent_config.source_format == job.SourceFormat.PARQUET + # Low-level tests @classmethod