Skip to content

Commit

Permalink
Marshal row data correctly in 'Table.insert_data()' (#3426)
Browse files Browse the repository at this point in the history
* Move '_row{,s}_from_json' next to scalar '_from_json' helpers.

* Add converter helpers for row data scalars.

* Convert row data using helpers.

Closes #2957.
  • Loading branch information
tseaver authored May 18, 2017
1 parent 3de5f33 commit 9220dac
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 54 deletions.
92 changes: 57 additions & 35 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.cloud._helpers import UTC
from google.cloud._helpers import _date_from_iso8601_date
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _time_from_iso8601_time_naive
from google.cloud._helpers import _to_bytes
Expand Down Expand Up @@ -122,6 +123,38 @@ def _record_from_json(value, field):
}


def _row_from_json(row, schema):
"""Convert JSON row data to row with appropriate types.
Note: ``row['f']`` and ``schema`` are presumed to be of the same length.
:type row: dict
:param row: A JSON response row to be converted.
:type schema: tuple
:param schema: A tuple of
:class:`~google.cloud.bigquery.schema.SchemaField`.
:rtype: tuple
:returns: A tuple of data converted to native types.
"""
row_data = []
for field, cell in zip(schema, row['f']):
converter = _CELLDATA_FROM_JSON[field.field_type]
if field.mode == 'REPEATED':
row_data.append([converter(item['v'], field)
for item in cell['v']])
else:
row_data.append(converter(cell['v'], field))

return tuple(row_data)


def _rows_from_json(rows, schema):
"""Convert JSON row data to rows with appropriate types."""
return [_row_from_json(row, schema) for row in rows]


def _int_to_json(value):
"""Coerce 'value' to an JSON-compatible representation."""
if isinstance(value, int):
Expand All @@ -148,8 +181,11 @@ def _bytes_to_json(value):
return value


def _timestamp_to_json(value):
"""Coerce 'value' to an JSON-compatible representation."""
def _timestamp_to_json_parameter(value):
"""Coerce 'value' to an JSON-compatible representation.
This version returns the string representation used in query parameters.
"""
if isinstance(value, datetime.datetime):
if value.tzinfo not in (None, UTC):
# Convert to UTC and remove the time zone info.
Expand All @@ -159,6 +195,16 @@ def _timestamp_to_json(value):
return value


def _timestamp_to_json_row(value):
"""Coerce 'value' to an JSON-compatible representation.
This version returns floating-point seconds value used in row data.
"""
if isinstance(value, datetime.datetime):
value = _microseconds_from_datetime(value) * 1e-6
return value


def _datetime_to_json(value):
"""Coerce 'value' to an JSON-compatible representation."""
if isinstance(value, datetime.datetime):
Expand All @@ -180,49 +226,25 @@ def _time_to_json(value):
return value


_SCALAR_VALUE_TO_JSON = {
# Converters used for scalar values marshalled as row data.
_SCALAR_VALUE_TO_JSON_ROW = {
'INTEGER': _int_to_json,
'INT64': _int_to_json,
'FLOAT': _float_to_json,
'FLOAT64': _float_to_json,
'BOOLEAN': _bool_to_json,
'BOOL': _bool_to_json,
'BYTES': _bytes_to_json,
'TIMESTAMP': _timestamp_to_json,
'TIMESTAMP': _timestamp_to_json_row,
'DATETIME': _datetime_to_json,
'DATE': _date_to_json,
'TIME': _time_to_json,
}


def _row_from_json(row, schema):
"""Convert JSON row data to row with appropriate types.
:type row: dict
:param row: A JSON response row to be converted.
:type schema: tuple
:param schema: A tuple of
:class:`~google.cloud.bigquery.schema.SchemaField`.
:rtype: tuple
:returns: A tuple of data converted to native types.
"""
row_data = []
for field, cell in zip(schema, row['f']):
converter = _CELLDATA_FROM_JSON[field.field_type]
if field.mode == 'REPEATED':
row_data.append([converter(item['v'], field)
for item in cell['v']])
else:
row_data.append(converter(cell['v'], field))

return tuple(row_data)


def _rows_from_json(rows, schema):
"""Convert JSON row data to rows with appropriate types."""
return [_row_from_json(row, schema) for row in rows]
# Converters used for scalar values marshalled as query parameters.
_SCALAR_VALUE_TO_JSON_PARAM = _SCALAR_VALUE_TO_JSON_ROW.copy()
_SCALAR_VALUE_TO_JSON_PARAM['TIMESTAMP'] = _timestamp_to_json_parameter


class _ConfigurationProperty(object):
Expand Down Expand Up @@ -420,7 +442,7 @@ def to_api_repr(self):
:returns: JSON mapping
"""
value = self.value
converter = _SCALAR_VALUE_TO_JSON.get(self.type_)
converter = _SCALAR_VALUE_TO_JSON_PARAM.get(self.type_)
if converter is not None:
value = converter(value)
resource = {
Expand Down Expand Up @@ -506,7 +528,7 @@ def to_api_repr(self):
a_values = [repr_['parameterValue'] for repr_ in reprs]
else:
a_type = {'type': self.array_type}
converter = _SCALAR_VALUE_TO_JSON.get(self.array_type)
converter = _SCALAR_VALUE_TO_JSON_PARAM.get(self.array_type)
if converter is not None:
values = [converter(value) for value in values]
a_values = [{'value': value} for value in values]
Expand Down Expand Up @@ -600,7 +622,7 @@ def to_api_repr(self):
values[name] = repr_['parameterValue']
else:
s_types[name] = {'name': name, 'type': {'type': type_}}
converter = _SCALAR_VALUE_TO_JSON.get(type_)
converter = _SCALAR_VALUE_TO_JSON_PARAM.get(type_)
if converter is not None:
value = converter(value)
values[name] = {'value': value}
Expand Down
22 changes: 8 additions & 14 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
import six

from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import _millis_from_datetime
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import make_exception
from google.cloud.iterator import HTTPIterator
from google.cloud.streaming.exceptions import HttpError
from google.cloud.streaming.http_wrapper import Request
from google.cloud.streaming.http_wrapper import make_api_request
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
from google.cloud.streaming.transfer import Upload
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery._helpers import _row_from_json
from google.cloud.iterator import HTTPIterator
from google.cloud.bigquery._helpers import _SCALAR_VALUE_TO_JSON_ROW


_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'"
Expand Down Expand Up @@ -673,6 +673,9 @@ def fetch_data(self, max_results=None, page_token=None, client=None):
(this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).
"""
if len(self._schema) == 0:
raise ValueError(_TABLE_HAS_NO_SCHEMA)

client = self._require_client(client)
path = '%s/data' % (self.path,)
iterator = HTTPIterator(client=client, path=path,
Expand Down Expand Up @@ -741,11 +744,9 @@ def insert_data(self,
row_info = {}

for field, value in zip(self._schema, row):
if field.field_type == 'TIMESTAMP':
# BigQuery stores TIMESTAMP data internally as a
# UNIX timestamp with microsecond precision.
# Specifies the number of seconds since the epoch.
value = _convert_timestamp(value)
converter = _SCALAR_VALUE_TO_JSON_ROW.get(field.field_type)
if converter is not None: # STRING doesn't need converting
value = converter(value)
row_info[field.name] = value

info = {'json': row_info}
Expand Down Expand Up @@ -1131,10 +1132,3 @@ class _UrlBuilder(object):
def __init__(self):
self.query_params = {}
self._relative_path = ''


def _convert_timestamp(value):
"""Helper for :meth:`Table.insert_data`."""
if isinstance(value, datetime.datetime):
value = _microseconds_from_datetime(value) * 1e-6
return value
29 changes: 26 additions & 3 deletions bigquery/tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,12 @@ def test_w_bytes(self):
self.assertEqual(converted, expected)


class Test_timestamp_to_json(unittest.TestCase):
class Test_timestamp_to_json_parameter(unittest.TestCase):

def _call_fut(self, value):
from google.cloud.bigquery._helpers import _timestamp_to_json
from google.cloud.bigquery._helpers import _timestamp_to_json_parameter

return _timestamp_to_json(value)
return _timestamp_to_json_parameter(value)

def test_w_float(self):
self.assertEqual(self._call_fut(1.234567), 1.234567)
Expand Down Expand Up @@ -604,6 +604,29 @@ def test_w_datetime_w_utc_zone(self):
self.assertEqual(self._call_fut(when), ZULU)


class Test_timestamp_to_json_row(unittest.TestCase):

def _call_fut(self, value):
from google.cloud.bigquery._helpers import _timestamp_to_json_row

return _timestamp_to_json_row(value)

def test_w_float(self):
self.assertEqual(self._call_fut(1.234567), 1.234567)

def test_w_string(self):
ZULU = '2016-12-20 15:58:27.339328+00:00'
self.assertEqual(self._call_fut(ZULU), ZULU)

def test_w_datetime(self):
import datetime
from google.cloud._helpers import _microseconds_from_datetime

when = datetime.datetime(2016, 12, 20, 15, 58, 27, 339328)
self.assertEqual(
self._call_fut(when), _microseconds_from_datetime(when) / 1e6)


class Test_datetime_to_json(unittest.TestCase):

def _call_fut(self, value):
Expand Down
26 changes: 24 additions & 2 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,24 @@ def test_delete_w_alternate_client(self):
self.assertEqual(req['method'], 'DELETE')
self.assertEqual(req['path'], '/%s' % PATH)

def test_fetch_data_wo_schema(self):
from google.cloud.bigquery.table import _TABLE_HAS_NO_SCHEMA

client = _Client(project=self.PROJECT)
dataset = _Dataset(client)
table = self._make_one(self.TABLE_NAME, dataset=dataset)
ROWS = [
('Phred Phlyntstone', 32),
('Bharney Rhubble', 33),
('Wylma Phlyntstone', 29),
('Bhettye Rhubble', 27),
]

with self.assertRaises(ValueError) as exc:
table.fetch_data()

self.assertEqual(exc.exception.args, (_TABLE_HAS_NO_SCHEMA,))

def test_fetch_data_w_bound_client(self):
import datetime
import six
Expand Down Expand Up @@ -1355,7 +1373,7 @@ def _row_data(row):
if isinstance(row[2], datetime.datetime):
joined = _microseconds_from_datetime(joined) * 1e-6
return {'full_name': row[0],
'age': row[1],
'age': str(row[1]),
'joined': joined}

SENT = {
Expand Down Expand Up @@ -1404,7 +1422,11 @@ def test_insert_data_w_alternate_client(self):
]

def _row_data(row):
return {'full_name': row[0], 'age': row[1], 'voter': row[2]}
return {
'full_name': row[0],
'age': str(row[1]),
'voter': row[2] and 'true' or 'false',
}

SENT = {
'skipInvalidRows': True,
Expand Down

0 comments on commit 9220dac

Please sign in to comment.