Skip to content

Commit

Permalink
fix: error inserting DataFrame with REPEATED field (#925)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim Swast <swast@google.com>
  • Loading branch information
plamut and tswast authored Aug 31, 2021
1 parent 8448922 commit 656d2fa
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 15 deletions.
8 changes: 7 additions & 1 deletion google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,13 @@ def dataframe_to_json_generator(dataframe):
output = {}
for column, value in zip(dataframe.columns, row):
# Omit NaN values.
if pandas.isna(value):
is_nan = pandas.isna(value)

# isna() can also return an array-like of bools, but the latter's boolean
# value is ambiguous, hence an extra check. An array-like value is *not*
# considered a NaN, however.
if isinstance(is_nan, bool) and is_nan:
continue
output[column] = value

yield output
63 changes: 49 additions & 14 deletions tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,41 @@ def test_dataframe_to_json_generator(module_under_test):
assert list(rows) == expected


def test_dataframe_to_json_generator_repeated_field(module_under_test):
pytest.importorskip(
"pandas",
minversion=str(PANDAS_MINIUM_VERSION),
reason=(
f"Requires `pandas version >= {PANDAS_MINIUM_VERSION}` "
"which introduces pandas.NA"
),
)

df_data = [
collections.OrderedDict(
[("repeated_col", [pandas.NA, 2, None, 4]), ("not_repeated_col", "first")]
),
collections.OrderedDict(
[
("repeated_col", ["a", "b", mock.sentinel.foo, "d"]),
("not_repeated_col", "second"),
]
),
]
dataframe = pandas.DataFrame(df_data)

rows = module_under_test.dataframe_to_json_generator(dataframe)

expected = [
{"repeated_col": [pandas.NA, 2, None, 4], "not_repeated_col": "first"},
{
"repeated_col": ["a", "b", mock.sentinel.foo, "d"],
"not_repeated_col": "second",
},
]
assert list(rows) == expected


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_list_columns_and_indexes_with_named_index(module_under_test):
df_data = collections.OrderedDict(
Expand Down Expand Up @@ -882,7 +917,7 @@ def test_list_columns_and_indexes_with_multiindex(module_under_test):
def test_dataframe_to_bq_schema_dict_sequence(module_under_test):
df_data = collections.OrderedDict(
[
("str_column", [u"hello", u"world"]),
("str_column", ["hello", "world"]),
("int_column", [42, 8]),
("bool_column", [True, False]),
]
Expand Down Expand Up @@ -1070,7 +1105,7 @@ def test_dataframe_to_arrow_dict_sequence_schema(module_under_test):
]

dataframe = pandas.DataFrame(
{"field01": [u"hello", u"world"], "field02": [True, False]}
{"field01": ["hello", "world"], "field02": [True, False]}
)

arrow_table = module_under_test.dataframe_to_arrow(dataframe, dict_schema)
Expand Down Expand Up @@ -1139,8 +1174,8 @@ def test_dataframe_to_parquet_compression_method(module_under_test):
def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow(module_under_test):
dataframe = pandas.DataFrame(
data=[
{"id": 10, "status": u"FOO", "execution_date": datetime.date(2019, 5, 10)},
{"id": 20, "status": u"BAR", "created_at": datetime.date(2018, 9, 12)},
{"id": 10, "status": "FOO", "execution_date": datetime.date(2019, 5, 10)},
{"id": 20, "status": "BAR", "created_at": datetime.date(2018, 9, 12)},
]
)

Expand All @@ -1167,8 +1202,8 @@ def test_dataframe_to_bq_schema_fallback_needed_wo_pyarrow(module_under_test):
def test_dataframe_to_bq_schema_fallback_needed_w_pyarrow(module_under_test):
dataframe = pandas.DataFrame(
data=[
{"id": 10, "status": u"FOO", "created_at": datetime.date(2019, 5, 10)},
{"id": 20, "status": u"BAR", "created_at": datetime.date(2018, 9, 12)},
{"id": 10, "status": "FOO", "created_at": datetime.date(2019, 5, 10)},
{"id": 20, "status": "BAR", "created_at": datetime.date(2018, 9, 12)},
]
)

Expand Down Expand Up @@ -1197,8 +1232,8 @@ def test_dataframe_to_bq_schema_fallback_needed_w_pyarrow(module_under_test):
def test_dataframe_to_bq_schema_pyarrow_fallback_fails(module_under_test):
dataframe = pandas.DataFrame(
data=[
{"struct_field": {"one": 2}, "status": u"FOO"},
{"struct_field": {"two": u"222"}, "status": u"BAR"},
{"struct_field": {"one": 2}, "status": "FOO"},
{"struct_field": {"two": "222"}, "status": "BAR"},
]
)

Expand Down Expand Up @@ -1252,7 +1287,7 @@ def test_augment_schema_type_detection_succeeds(module_under_test):
"timestamp_field": datetime.datetime(2005, 5, 31, 14, 25, 55),
"date_field": datetime.date(2005, 5, 31),
"bytes_field": b"some bytes",
"string_field": u"some characters",
"string_field": "some characters",
"numeric_field": decimal.Decimal("123.456"),
"bignumeric_field": decimal.Decimal("{d38}.{d38}".format(d38="9" * 38)),
}
Expand Down Expand Up @@ -1312,13 +1347,13 @@ def test_augment_schema_type_detection_fails(module_under_test):
dataframe = pandas.DataFrame(
data=[
{
"status": u"FOO",
"status": "FOO",
"struct_field": {"one": 1},
"struct_field_2": {"foo": u"123"},
"struct_field_2": {"foo": "123"},
},
{
"status": u"BAR",
"struct_field": {"two": u"111"},
"status": "BAR",
"struct_field": {"two": "111"},
"struct_field_2": {"bar": 27},
},
]
Expand Down Expand Up @@ -1351,7 +1386,7 @@ def test_dataframe_to_parquet_dict_sequence_schema(module_under_test):
]

dataframe = pandas.DataFrame(
{"field01": [u"hello", u"world"], "field02": [True, False]}
{"field01": ["hello", "world"], "field02": [True, False]}
)

write_table_patch = mock.patch.object(
Expand Down

0 comments on commit 656d2fa

Please sign in to comment.