Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Add client.insert_rows_from_dataframe() method #9162

Merged
merged 3 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Client for interacting with the Google BigQuery API."""

from __future__ import absolute_import
from __future__ import division

try:
from collections import abc as collections_abc
Expand All @@ -25,7 +26,9 @@
import functools
import gzip
import io
import itertools
import json
import math
import os
import tempfile
import uuid
Expand Down Expand Up @@ -2080,6 +2083,57 @@ def insert_rows(self, table, rows, selected_fields=None, **kwargs):

return self.insert_rows_json(table, json_rows, **kwargs)

def insert_rows_from_dataframe(
self, table, dataframe, selected_fields=None, chunk_size=500, **kwargs
):
"""Insert rows into a table from a dataframe via the streaming API.

Args:
table (Union[ \
:class:`~google.cloud.bigquery.table.Table`, \
:class:`~google.cloud.bigquery.table.TableReference`, \
str, \
]):
The destination table for the row data, or a reference to it.
dataframe (pandas.DataFrame):
A :class:`~pandas.DataFrame` containing the data to load.
selected_fields (Sequence[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
]):
The fields to return. Required if ``table`` is a
:class:`~google.cloud.bigquery.table.TableReference`.
chunk_size (int):
The number of rows to stream in a single chunk. Must be positive.
kwargs (dict):
Keyword arguments to
:meth:`~google.cloud.bigquery.client.Client.insert_rows_json`.

Returns:
Sequence[Sequence[Mappings]]:
A list with insert errors for each insert chunk. Each element
is a list containing one mapping per row with insert errors:
the "index" key identifies the row, and the "errors" key
contains a list of the mappings describing one or more problems
with the row.

Raises:
ValueError: if table's schema is not set
"""
insert_results = []

chunk_count = int(math.ceil(len(dataframe) / chunk_size))
rows_iter = (
dict(six.moves.zip(dataframe.columns, row))
for row in dataframe.itertuples(index=False, name=None)
)

for _ in range(chunk_count):
rows_chunk = itertools.islice(rows_iter, chunk_size)
result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs)
insert_results.append(result)

return insert_results

def insert_rows_json(
self,
table,
Expand Down
67 changes: 67 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,73 @@ def test_query_results_to_dataframe_w_bqstorage(self):
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_insert_rows_from_dataframe(self):
SF = bigquery.SchemaField
schema = [
SF("float_col", "FLOAT", mode="REQUIRED"),
SF("int_col", "INTEGER", mode="REQUIRED"),
SF("bool_col", "BOOLEAN", mode="REQUIRED"),
SF("string_col", "STRING", mode="NULLABLE"),
]

dataframe = pandas.DataFrame(
[
{
"float_col": 1.11,
"bool_col": True,
"string_col": "my string",
"int_col": 10,
},
{
"float_col": 2.22,
"bool_col": False,
"string_col": "another string",
"int_col": 20,
},
{
"float_col": 3.33,
"bool_col": False,
"string_col": "another string",
"int_col": 30,
},
{
"float_col": 4.44,
"bool_col": True,
"string_col": "another string",
"int_col": 40,
},
{
"float_col": 5.55,
"bool_col": False,
"string_col": "another string",
"int_col": 50,
},
]
)

table_id = "test_table"
dataset = self.temp_dataset(_make_dataset_id("issue_7553"))
table_arg = Table(dataset.table(table_id), schema=schema)
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)

Config.CLIENT.insert_rows_from_dataframe(table, dataframe, chunk_size=3)

retry = RetryResult(_has_rows, max_tries=8)
rows = retry(self._fetch_single_page)(table)

sorted_rows = sorted(rows, key=operator.attrgetter("int_col"))
row_tuples = [r.values() for r in sorted_rows]
expected = [tuple(data_row) for data_row in dataframe.itertuples(index=False)]

assert len(row_tuples) == len(expected)

for row, expected_row in zip(row_tuples, expected):
six.assertCountEqual(
self, row, expected_row
) # column order does not matter

def test_insert_rows_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
Expand Down
124 changes: 124 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4472,6 +4472,130 @@ def test_insert_rows_w_numeric(self):
data=sent,
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_insert_rows_from_dataframe(self):
from google.cloud.bigquery.table import SchemaField
from google.cloud.bigquery.table import Table

API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format(
self.PROJECT, self.DS_ID, self.TABLE_REF.table_id
)

dataframe = pandas.DataFrame(
[
{"name": u"Little One", "age": 10, "adult": False},
{"name": u"Young Gun", "age": 20, "adult": True},
{"name": u"Dad", "age": 30, "adult": True},
{"name": u"Stranger", "age": 40, "adult": True},
]
)

# create client
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = make_connection({}, {})

# create table
schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
SchemaField("adult", "BOOLEAN", mode="REQUIRED"),
]
table = Table(self.TABLE_REF, schema=schema)

with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))):
error_info = client.insert_rows_from_dataframe(
table, dataframe, chunk_size=3
)

self.assertEqual(len(error_info), 2)
for chunk_errors in error_info:
assert chunk_errors == []

EXPECTED_SENT_DATA = [
{
"rows": [
{
"insertId": "0",
"json": {"name": "Little One", "age": "10", "adult": "false"},
},
{
"insertId": "1",
"json": {"name": "Young Gun", "age": "20", "adult": "true"},
},
{
"insertId": "2",
"json": {"name": "Dad", "age": "30", "adult": "true"},
},
]
},
{
"rows": [
{
"insertId": "3",
"json": {"name": "Stranger", "age": "40", "adult": "true"},
}
]
},
]

actual_calls = conn.api_request.call_args_list

for call, expected_data in six.moves.zip_longest(
actual_calls, EXPECTED_SENT_DATA
):
expected_call = mock.call(method="POST", path=API_PATH, data=expected_data)
assert call == expected_call

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_insert_rows_from_dataframe_many_columns(self):
from google.cloud.bigquery.table import SchemaField
from google.cloud.bigquery.table import Table

API_PATH = "/projects/{}/datasets/{}/tables/{}/insertAll".format(
self.PROJECT, self.DS_ID, self.TABLE_REF.table_id
)
N_COLUMNS = 256 # should be >= 256

dataframe = pandas.DataFrame(
[{"foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS)}]
)

# create client
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = make_connection({}, {})

# create table
schema = [SchemaField("foo_{}".format(i), "STRING") for i in range(N_COLUMNS)]
table = Table(self.TABLE_REF, schema=schema)

with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))):
error_info = client.insert_rows_from_dataframe(
table, dataframe, chunk_size=3
)

assert len(error_info) == 1
assert error_info[0] == []

EXPECTED_SENT_DATA = {
"rows": [
{
"insertId": "0",
"json": {
"foo_{}".format(i): "bar_{}".format(i) for i in range(N_COLUMNS)
},
}
]
}
expected_call = mock.call(method="POST", path=API_PATH, data=EXPECTED_SENT_DATA)

actual_calls = conn.api_request.call_args_list
assert len(actual_calls) == 1
assert actual_calls[0] == expected_call

def test_insert_rows_json(self):
from google.cloud.bigquery.table import Table, SchemaField
from google.cloud.bigquery.dataset import DatasetReference
Expand Down