diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index eeb65e0b9766..6a1b2dab910f 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -14,6 +14,8 @@ """Shared helper functions for connecting BigQuery and pandas.""" +import warnings + try: import pyarrow import pyarrow.parquet @@ -107,6 +109,8 @@ def bq_to_arrow_field(bq_field): if arrow_type: is_nullable = bq_field.mode.upper() == "NULLABLE" return pyarrow.field(bq_field.name, arrow_type, nullable=is_nullable) + + warnings.warn("Unable to determine type for field '{}'.".format(bq_field.name)) return None @@ -119,11 +123,8 @@ def bq_to_arrow_array(series, bq_field): return pyarrow.array(series, type=arrow_type) -def to_parquet(dataframe, bq_schema, filepath): - """Write dataframe as a Parquet file, according to the desired BQ schema. - - This function requires the :mod:`pyarrow` package. Arrow is used as an - intermediate format. +def to_arrow(dataframe, bq_schema): + """Convert pandas dataframe to Arrow table, using BigQuery schema. Args: dataframe (pandas.DataFrame): @@ -131,12 +132,12 @@ def to_parquet(dataframe, bq_schema, filepath): bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): Desired BigQuery schema. Number of columns must match number of columns in the DataFrame. - filepath (str): - Path to write Parquet file to. - """ - if pyarrow is None: - raise ValueError("pyarrow is required for BigQuery schema conversion.") + Returns: + pyarrow.Table: + Table containing dataframe data, with schema derived from + BigQuery schema. + """ if len(bq_schema) != len(dataframe.columns): raise ValueError( "Number of columns in schema must match number of columns in dataframe." @@ -144,9 +145,36 @@ def to_parquet(dataframe, bq_schema, filepath): arrow_arrays = [] arrow_names = [] + arrow_fields = [] for bq_field in bq_schema: + arrow_fields.append(bq_to_arrow_field(bq_field)) arrow_names.append(bq_field.name) arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field)) - arrow_table = pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) + if all((field is not None for field in arrow_fields)): + return pyarrow.Table.from_arrays( + arrow_arrays, schema=pyarrow.schema(arrow_fields) + ) + return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names) + + +def to_parquet(dataframe, bq_schema, filepath): + """Write dataframe as a Parquet file, according to the desired BQ schema. + + This function requires the :mod:`pyarrow` package. Arrow is used as an + intermediate format. + + Args: + dataframe (pandas.DataFrame): + DataFrame to convert to convert to Parquet file. + bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]): + Desired BigQuery schema. Number of columns must match number of + columns in the DataFrame. + filepath (str): + Path to write Parquet file to. + """ + if pyarrow is None: + raise ValueError("pyarrow is required for BigQuery schema conversion.") + + arrow_table = to_arrow(dataframe, bq_schema) pyarrow.parquet.write_table(arrow_table, filepath) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index d04bf7c1854b..eba4c3b6adef 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -699,6 +699,45 @@ def test_load_table_from_dataframe_w_nulls(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, num_rows) + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_load_table_from_dataframe_w_required(self): + """Test that a DataFrame with required columns can be uploaded if a + BigQuery schema is specified. + + See: https://github.com/googleapis/google-cloud-python/issues/8093 + """ + table_schema = ( + bigquery.SchemaField("name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + ) + + records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}] + dataframe = pandas.DataFrame(records) + job_config = bigquery.LoadJobConfig(schema=table_schema) + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_required".format( + Config.CLIENT.project, dataset_id + ) + + # Create the table before loading so that schema mismatch errors are + # identified. + table = retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 2) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_explicit_schema(self): diff --git a/bigquery/tests/unit/test__pandas_helpers.py b/bigquery/tests/unit/test__pandas_helpers.py index f04f95307806..40b4548dae28 100644 --- a/bigquery/tests/unit/test__pandas_helpers.py +++ b/bigquery/tests/unit/test__pandas_helpers.py @@ -15,6 +15,7 @@ import datetime import decimal import functools +import warnings try: import pandas @@ -26,6 +27,7 @@ except ImportError: # pragma: NO COVER pyarrow = None import pytest +import pytz from google.cloud.bigquery import schema @@ -373,7 +375,7 @@ def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test): ( "GEOGRAPHY", [ - "POINT(30, 10)", + "POINT(30 10)", None, "LINESTRING (30 10, 10 30, 40 40)", "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", @@ -440,6 +442,94 @@ def test_bq_to_arrow_array_w_special_floats(module_under_test): assert roundtrip[3] is None +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_to_arrow_w_required_fields(module_under_test): + bq_schema = ( + schema.SchemaField("field01", "STRING", mode="REQUIRED"), + schema.SchemaField("field02", "BYTES", mode="REQUIRED"), + schema.SchemaField("field03", "INTEGER", mode="REQUIRED"), + schema.SchemaField("field04", "INT64", mode="REQUIRED"), + schema.SchemaField("field05", "FLOAT", mode="REQUIRED"), + schema.SchemaField("field06", "FLOAT64", mode="REQUIRED"), + schema.SchemaField("field07", "NUMERIC", mode="REQUIRED"), + schema.SchemaField("field08", "BOOLEAN", mode="REQUIRED"), + schema.SchemaField("field09", "BOOL", mode="REQUIRED"), + schema.SchemaField("field10", "TIMESTAMP", mode="REQUIRED"), + schema.SchemaField("field11", "DATE", mode="REQUIRED"), + schema.SchemaField("field12", "TIME", mode="REQUIRED"), + schema.SchemaField("field13", "DATETIME", mode="REQUIRED"), + schema.SchemaField("field14", "GEOGRAPHY", mode="REQUIRED"), + ) + dataframe = pandas.DataFrame( + { + "field01": ["hello", "world"], + "field02": [b"abd", b"efg"], + "field03": [1, 2], + "field04": [3, 4], + "field05": [1.25, 9.75], + "field06": [-1.75, -3.5], + "field07": [decimal.Decimal("1.2345"), decimal.Decimal("6.7891")], + "field08": [True, False], + "field09": [False, True], + "field10": [ + datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + datetime.datetime(2012, 12, 21, 9, 7, 42, tzinfo=pytz.utc), + ], + "field11": [datetime.date(9999, 12, 31), datetime.date(1970, 1, 1)], + "field12": [datetime.time(23, 59, 59, 999999), datetime.time(12, 0, 0)], + "field13": [ + datetime.datetime(1970, 1, 1, 0, 0, 0), + datetime.datetime(2012, 12, 21, 9, 7, 42), + ], + "field14": [ + "POINT(30 10)", + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + } + ) + + arrow_table = module_under_test.to_arrow(dataframe, bq_schema) + arrow_schema = arrow_table.schema + + assert len(arrow_schema) == len(bq_schema) + for arrow_field in arrow_schema: + assert not arrow_field.nullable + + +@pytest.mark.skipIf(pandas is None, "Requires `pandas`") +@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`") +def test_to_arrow_w_unknown_type(module_under_test): + bq_schema = ( + schema.SchemaField("field00", "UNKNOWN_TYPE"), + schema.SchemaField("field01", "STRING"), + schema.SchemaField("field02", "BYTES"), + schema.SchemaField("field03", "INTEGER"), + ) + dataframe = pandas.DataFrame( + { + "field00": ["whoami", "whatami"], + "field01": ["hello", "world"], + "field02": [b"abd", b"efg"], + "field03": [1, 2], + } + ) + + with warnings.catch_warnings(record=True) as warned: + arrow_table = module_under_test.to_arrow(dataframe, bq_schema) + arrow_schema = arrow_table.schema + + assert len(warned) == 1 + warning = warned[0] + assert "field00" in str(warning) + + assert len(arrow_schema) == len(bq_schema) + assert arrow_schema[0].name == "field00" + assert arrow_schema[1].name == "field01" + assert arrow_schema[2].name == "field02" + assert arrow_schema[3].name == "field03" + + @pytest.mark.skipIf(pandas is None, "Requires `pandas`") def test_to_parquet_without_pyarrow(module_under_test, monkeypatch): monkeypatch.setattr(module_under_test, "pyarrow", None)