From 9553d8d9d7685d5c86de015f025e0f56dc48955a Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 21 Jun 2019 14:45:19 -0700 Subject: [PATCH 01/17] BQ Storage: Add basic arrow stream parser --- .../google/cloud/bigquery/_pandas_helpers.py | 2 + .../cloud/bigquery_storage_v1beta1/reader.py | 89 +++++++++++++++++-- bigquery_storage/noxfile.py | 2 +- bigquery_storage/setup.py | 1 + bigquery_storage/tests/unit/test_reader.py | 76 +++++++++++++--- 5 files changed, 149 insertions(+), 21 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 5261c2b99efd..21eabac3fa91 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -75,6 +75,8 @@ def pyarrow_timestamp(): if pyarrow: + # This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py + # When modifying it be sure to update it there as well. BQ_TO_ARROW_SCALARS = { "BOOL": pyarrow.bool_, "BOOLEAN": pyarrow.bool_, diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index ac45d7022d5d..e66bd728868c 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -29,6 +29,11 @@ pandas = None import six +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None + from google.cloud.bigquery_storage_v1beta1 import types @@ -37,6 +42,9 @@ "fastavro is required to parse ReadRowResponse messages with Avro bytes." ) _PANDAS_REQUIRED = "pandas is required to create a DataFrame" +_PYARROW_REQUIRED = ( + "pyarrow is required to parse ReadRowResponse messages with Arrow bytes." +) class ReadRowsStream(object): @@ -152,9 +160,6 @@ def rows(self, read_session): Iterable[Mapping]: A sequence of rows, represented as dictionaries. """ - if fastavro is None: - raise ImportError(_FASTAVRO_REQUIRED) - return ReadRowsIterable(self, read_session) def to_dataframe(self, read_session, dtypes=None): @@ -186,8 +191,6 @@ def to_dataframe(self, read_session, dtypes=None): pandas.DataFrame: A data frame of all rows in the stream. """ - if fastavro is None: - raise ImportError(_FASTAVRO_REQUIRED) if pandas is None: raise ImportError(_PANDAS_REQUIRED) @@ -212,6 +215,7 @@ def __init__(self, reader, read_session): self._status = None self._reader = reader self._read_session = read_session + self._stream_parser = _StreamParser.from_read_session(self._read_session) @property def total_rows(self): @@ -231,10 +235,9 @@ def pages(self): """ # Each page is an iterator of rows. But also has num_items, remaining, # and to_dataframe. - stream_parser = _StreamParser(self._read_session) for message in self._reader: self._status = message.status - yield ReadRowsPage(stream_parser, message) + yield ReadRowsPage(self._stream_parser, message) def __iter__(self): """Iterator for each row in all pages.""" @@ -355,16 +358,39 @@ def to_dataframe(self, dtypes=None): class _StreamParser(object): + def to_dataframe(self, message, dtypes=None): + raise NotImplementedError("Not implemented.") + + def to_rows(self, message): + raise NotImplementedError("Not implemented.") + + @staticmethod + def from_read_session(read_session): + schema_type = read_session.WhichOneof("schema") + if schema_type == "avro_schema": + return _AvroStreamParser(read_session) + elif schema_type == "arrow_schema": + return _ArrowStreamParser(read_session) + else: + raise TypeError( + "Unsupported schema type in read_session: {0}".format(schema_type) + ) + + +class _AvroStreamParser(_StreamParser): """Helper to parse Avro messages into useful representations.""" def __init__(self, read_session): - """Construct a _StreamParser. + """Construct an _AvroStreamParser. Args: read_session (google.cloud.bigquery_storage_v1beta1.types.ReadSession): A read session. This is required because it contains the schema used in the stream messages. """ + if fastavro is None: + raise ImportError(_FASTAVRO_REQUIRED) + self._read_session = read_session self._avro_schema_json = None self._fastavro_schema = None @@ -447,6 +473,53 @@ def to_rows(self, message): break # Finished with message +class _ArrowStreamParser(_StreamParser): + def __init__(self, read_session): + if pyarrow is None: + raise ImportError(_PYARROW_REQUIRED) + + self._read_session = read_session + self._schema = None + + def to_rows(self, message): + record_batch = self._parse_arrow_message(message) + + # Iterate through each column simultaneously, and make a dict from the + # row values + for row in zip(*record_batch.columns): + yield dict(zip(self._column_names, row)) + + def to_dataframe(self, message, dtypes=None): + record_batch = self._parse_arrow_message(message) + + if dtypes is None: + dtypes = {} + + df = record_batch.to_pandas() + + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + + return df + + def _parse_arrow_message(self, message): + self._parse_arrow_schema() + + return pyarrow.read_record_batch( + pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch), + self._schema, + ) + + def _parse_arrow_schema(self): + if self._schema: + return + + self._schema = pyarrow.read_schema( + pyarrow.py_buffer(self._read_session.arrow_schema.serialized_schema) + ) + self._column_names = [field.name for field in self._schema] + + def _copy_stream_position(position): """Copy a StreamPosition. diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index 3840ad8d6638..32cce79a68df 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -37,7 +37,7 @@ def default(session): session.install('mock', 'pytest', 'pytest-cov') for local_dep in LOCAL_DEPS: session.install('-e', local_dep) - session.install('-e', '.[pandas,fastavro]') + session.install('-e', '.[pandas,fastavro,pyarrow]') # Run py.test against the unit tests. session.run( diff --git a/bigquery_storage/setup.py b/bigquery_storage/setup.py index 8471b55485d1..bfdd6d3cabbd 100644 --- a/bigquery_storage/setup.py +++ b/bigquery_storage/setup.py @@ -31,6 +31,7 @@ extras = { 'pandas': 'pandas>=0.17.1', 'fastavro': 'fastavro>=0.21.2', + 'pyarrow': 'pyarrow>=0.13.0', } package_root = os.path.abspath(os.path.dirname(__file__)) diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index a39309b55de5..3376caba7bcf 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -20,6 +20,7 @@ import json import fastavro +import pyarrow import mock import pandas import pandas.testing @@ -44,6 +45,20 @@ "time": {"type": "long", "logicalType": "time-micros"}, "timestamp": {"type": "long", "logicalType": "timestamp-micros"}, } +# This dictionary is duplicated in bigquery/google/cloud/bigquery/_pandas_helpers.py +# When modifying it be sure to update it there as well. +BQ_TO_ARROW_TYPES = { + "int64": pyarrow.int64(), + "float64": pyarrow.float64(), + "bool": pyarrow.bool_(), + "numeric": pyarrow.decimal128(38, 9), + "string": pyarrow.utf8(), + "bytes": pyarrow.binary(), + "date": pyarrow.date32(), # int32 days since epoch + "datetime": pyarrow.timestamp("us"), + "time": pyarrow.time64("us"), + "timestamp": pyarrow.timestamp("us", tz="UTC"), +} SCALAR_COLUMNS = [ {"name": "int_col", "type": "int64"}, {"name": "float_col", "type": "float64"}, @@ -143,11 +158,17 @@ def _avro_blocks_w_deadline(avro_blocks): raise google.api_core.exceptions.DeadlineExceeded("test: timeout, don't reconnect") -def _generate_read_session(avro_schema_json): +def _generate_avro_read_session(avro_schema_json): schema = json.dumps(avro_schema_json) return bigquery_storage_v1beta1.types.ReadSession(avro_schema={"schema": schema}) +def _generate_arrow_read_session(arrow_schema): + return bigquery_storage_v1beta1.types.ReadSession( + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()} + ) + + def _bq_to_avro_schema(bq_columns): fields = [] avro_schema = {"type": "record", "name": "__root__", "fields": fields} @@ -166,6 +187,18 @@ def _bq_to_avro_schema(bq_columns): return avro_schema +def _bq_to_arrow_schema(bq_columns): + def bq_col_as_field(column): + doc = column.get("description") + name = column["name"] + type_ = BQ_TO_ARROW_TYPES[column["type"]] + mode = column.get("mode", "nullable").lower() + + return pyarrow.field(name, type_, mode == "nullable", {"description": doc}) + + return pyarrow.schema(bq_col_as_field(c) for c in bq_columns) + + def _get_avro_bytes(rows, avro_schema): avro_file = six.BytesIO() for row in rows: @@ -173,12 +206,31 @@ def _get_avro_bytes(rows, avro_schema): return avro_file.getvalue() -def test_rows_raises_import_error(mut, class_under_test, mock_client, monkeypatch): +def test_avro_rows_raises_import_error(mut, class_under_test, mock_client, monkeypatch): monkeypatch.setattr(mut, "fastavro", None) reader = class_under_test( [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} ) - read_session = bigquery_storage_v1beta1.types.ReadSession() + + bq_columns = [{"name": "int_col", "type": "int64"}] + avro_schema = _bq_to_avro_schema(bq_columns) + read_session = _generate_avro_read_session(avro_schema) + + with pytest.raises(ImportError): + reader.rows(read_session) + + +def test_pyarrow_rows_raises_import_error( + mut, class_under_test, mock_client, monkeypatch +): + monkeypatch.setattr(mut, "pyarrow", None) + reader = class_under_test( + [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + + bq_columns = [{"name": "int_col", "type": "int64"}] + arrow_schema = _bq_to_arrow_schema(bq_columns) + read_session = _generate_arrow_read_session(arrow_schema) with pytest.raises(ImportError): reader.rows(read_session) @@ -187,7 +239,7 @@ def test_rows_raises_import_error(mut, class_under_test, mock_client, monkeypatc def test_rows_w_empty_stream(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) reader = class_under_test( [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} ) @@ -199,7 +251,7 @@ def test_rows_w_empty_stream(class_under_test, mock_client): def test_rows_w_scalars(class_under_test, mock_client): avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) avro_blocks = _bq_to_avro_blocks(SCALAR_BLOCKS, avro_schema) reader = class_under_test( @@ -214,7 +266,7 @@ def test_rows_w_scalars(class_under_test, mock_client): def test_rows_w_timeout(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) bq_blocks_1 = [ [{"int_col": 123}, {"int_col": 234}], [{"int_col": 345}, {"int_col": 456}], @@ -248,7 +300,7 @@ def test_rows_w_timeout(class_under_test, mock_client): def test_rows_w_reconnect(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) bq_blocks_1 = [ [{"int_col": 123}, {"int_col": 234}], [{"int_col": 345}, {"int_col": 456}], @@ -295,7 +347,7 @@ def test_rows_w_reconnect(class_under_test, mock_client): def test_rows_w_reconnect_by_page(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) bq_blocks_1 = [ [{"int_col": 123}, {"int_col": 234}], [{"int_col": 345}, {"int_col": 456}], @@ -358,7 +410,7 @@ def test_to_dataframe_no_pandas_raises_import_error( ): monkeypatch.setattr(mut, "pandas", None) avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) avro_blocks = _bq_to_avro_blocks(SCALAR_BLOCKS, avro_schema) reader = class_under_test( @@ -390,7 +442,7 @@ def test_to_dataframe_no_fastavro_raises_import_error( def test_to_dataframe_w_scalars(class_under_test): avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) avro_blocks = _bq_to_avro_blocks(SCALAR_BLOCKS, avro_schema) reader = class_under_test( @@ -427,7 +479,7 @@ def test_to_dataframe_w_dtypes(class_under_test): {"name": "lilfloat", "type": "float64"}, ] ) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) blocks = [ [{"bigfloat": 1.25, "lilfloat": 30.5}, {"bigfloat": 2.5, "lilfloat": 21.125}], [{"bigfloat": 3.75, "lilfloat": 11.0}], @@ -458,7 +510,7 @@ def test_to_dataframe_by_page(class_under_test, mock_client): {"name": "bool_col", "type": "bool"}, ] avro_schema = _bq_to_avro_schema(bq_columns) - read_session = _generate_read_session(avro_schema) + read_session = _generate_avro_read_session(avro_schema) block_1 = [{"int_col": 123, "bool_col": True}, {"int_col": 234, "bool_col": False}] block_2 = [{"int_col": 345, "bool_col": True}, {"int_col": 456, "bool_col": False}] block_3 = [{"int_col": 567, "bool_col": True}, {"int_col": 789, "bool_col": False}] From 48735626a2ba9534d2672c7bec108ab1bf2371b3 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Thu, 27 Jun 2019 15:36:51 -0700 Subject: [PATCH 02/17] BQ Storage: Add tests for to_dataframe with arrow data --- bigquery_storage/tests/unit/test_reader.py | 118 ++++++++++++++++++++- 1 file changed, 115 insertions(+), 3 deletions(-) diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index 3376caba7bcf..94f33019433a 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -146,6 +146,28 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json): return avro_blocks +def _bq_to_arrow_batches(bq_blocks, arrow_schema): + arrow_batches = [] + for block in bq_blocks: + arrays = [] + for name in arrow_schema.names: + arrays.append( + pyarrow.array( + (row[name] for row in block), + type=arrow_schema.field_by_name(name).type, + size=len(block), + ) + ) + record_batch = pyarrow.RecordBatch.from_arrays(arrays, arrow_schema) + + response = bigquery_storage_v1beta1.types.ReadRowsResponse() + response.arrow_record_batch.serialized_record_batch = ( + record_batch.serialize().to_pybytes() + ) + arrow_batches.append(response) + return arrow_batches + + def _avro_blocks_w_unavailable(avro_blocks): for block in avro_blocks: yield block @@ -236,6 +258,18 @@ def test_pyarrow_rows_raises_import_error( reader.rows(read_session) +def test_rows_no_schema_set_raises_type_error( + mut, class_under_test, mock_client, monkeypatch +): + reader = class_under_test( + [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + read_session = bigquery_storage_v1beta1.types.ReadSession() + + with pytest.raises(TypeError): + reader.rows(read_session) + + def test_rows_w_empty_stream(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) @@ -249,6 +283,19 @@ def test_rows_w_empty_stream(class_under_test, mock_client): assert tuple(got) == () +def test_rows_w_empty_stream_arrow(class_under_test, mock_client): + bq_columns = [{"name": "int_col", "type": "int64"}] + arrow_schema = _bq_to_arrow_schema(bq_columns) + read_session = _generate_arrow_read_session(arrow_schema) + reader = class_under_test( + [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + + got = reader.rows(read_session) + assert got.total_rows is None + assert tuple(got) == () + + def test_rows_w_scalars(class_under_test, mock_client): avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) read_session = _generate_avro_read_session(avro_schema) @@ -263,6 +310,20 @@ def test_rows_w_scalars(class_under_test, mock_client): assert got == expected +def test_rows_w_scalars_arrow(class_under_test, mock_client): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + got = tuple(reader.rows(read_session)) + + expected = tuple(itertools.chain.from_iterable(SCALAR_BLOCKS)) + assert got == expected + + def test_rows_w_timeout(class_under_test, mock_client): bq_columns = [{"name": "int_col", "type": "int64"}] avro_schema = _bq_to_avro_schema(bq_columns) @@ -427,16 +488,15 @@ def test_to_dataframe_no_pandas_raises_import_error( next(reader.rows(read_session).pages).to_dataframe() -def test_to_dataframe_no_fastavro_raises_import_error( +def test_to_dataframe_no_schema_set_raises_type_error( mut, class_under_test, mock_client, monkeypatch ): - monkeypatch.setattr(mut, "fastavro", None) reader = class_under_test( [], mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} ) read_session = bigquery_storage_v1beta1.types.ReadSession() - with pytest.raises(ImportError): + with pytest.raises(TypeError): reader.to_dataframe(read_session) @@ -472,6 +532,26 @@ def test_to_dataframe_w_scalars(class_under_test): ) +def test_to_dataframe_w_scalars_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + got = reader.to_dataframe(read_session) + + expected = pandas.DataFrame( + list(itertools.chain.from_iterable(SCALAR_BLOCKS)), columns=SCALAR_COLUMN_NAMES + ) + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + def test_to_dataframe_w_dtypes(class_under_test): avro_schema = _bq_to_avro_schema( [ @@ -504,6 +584,38 @@ def test_to_dataframe_w_dtypes(class_under_test): ) +def test_to_dataframe_w_dtypes_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema( + [ + {"name": "bigfloat", "type": "float64"}, + {"name": "lilfloat", "type": "float64"}, + ] + ) + read_session = _generate_arrow_read_session(arrow_schema) + blocks = [ + [{"bigfloat": 1.25, "lilfloat": 30.5}, {"bigfloat": 2.5, "lilfloat": 21.125}], + [{"bigfloat": 3.75, "lilfloat": 11.0}], + ] + arrow_batches = _bq_to_arrow_batches(blocks, arrow_schema) + + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) + + expected = pandas.DataFrame( + { + "bigfloat": [1.25, 2.5, 3.75], + "lilfloat": pandas.Series([30.5, 21.125, 11.0], dtype="float16"), + }, + columns=["bigfloat", "lilfloat"], + ) + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + def test_to_dataframe_by_page(class_under_test, mock_client): bq_columns = [ {"name": "int_col", "type": "int64"}, From 45bd1e90ef4708c187660d3f669014903c00071c Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Mon, 1 Jul 2019 10:32:29 -0700 Subject: [PATCH 03/17] Use Arrow format in client.list_rows(..).to_dataframe(..) with BQ Storage client --- bigquery/google/cloud/bigquery/_pandas_helpers.py | 1 + bigquery/google/cloud/bigquery/table.py | 2 +- bigquery/setup.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 21eabac3fa91..0f021dc84d05 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -293,6 +293,7 @@ def download_dataframe_bqstorage( session = bqstorage_client.create_read_session( table.to_bqstorage(), "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.proto.storage_pb2.DataFormat.ARROW, read_options=read_options, requested_streams=requested_streams, ) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 7af3bc6f48b4..c25af2079519 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1444,7 +1444,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non supplied, use the faster BigQuery Storage API to fetch rows from BigQuery. This API is a billable API. - This method requires the ``fastavro`` and + This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. Reading from a specific partition or snapshot is not diff --git a/bigquery/setup.py b/bigquery/setup.py index 8592a232ecb3..62af0c3f05f6 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,7 @@ extras = { "bqstorage": [ "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", - "fastavro>=0.21.2", + "pyarrow >= 0.4.1", ], "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. From 06f990e4f3ce25550e21fd36d90768ed3db8f970 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 10 Jul 2019 11:42:06 -0500 Subject: [PATCH 04/17] Add system test for arrow wire format. --- bigquery_storage/tests/system/test_system.py | 36 +++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/bigquery_storage/tests/system/test_system.py b/bigquery_storage/tests/system/test_system.py index 3e86a7fc2263..6a86cffa016f 100644 --- a/bigquery_storage/tests/system/test_system.py +++ b/bigquery_storage/tests/system/test_system.py @@ -67,7 +67,7 @@ def test_read_rows_full_table(client, project_id, small_table_reference): assert len(block.avro_rows.serialized_binary_rows) > 0 -def test_read_rows_to_dataframe(client, project_id): +def test_read_rows_to_dataframe_w_avro(client, project_id): table_ref = bigquery_storage_v1beta1.types.TableReference() table_ref.project_id = "bigquery-public-data" table_ref.dataset_id = "new_york_citibike" @@ -75,6 +75,40 @@ def test_read_rows_to_dataframe(client, project_id): session = client.create_read_session( table_ref, "projects/{}".format(project_id), requested_streams=1 ) + schema_type = session.WhichOneof("schema") + assert schema_type == "avro_schema" + + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + frame = client.read_rows(stream_pos).to_dataframe( + session, dtypes={"latitude": numpy.float16} + ) + + # Station ID is a required field (no nulls), so the datatype should always + # be integer. + assert frame.station_id.dtype.name == "int64" + assert frame.latitude.dtype.name == "float16" + assert frame.longitude.dtype.name == "float64" + assert frame["name"].str.startswith("Central Park").any() + + +def test_read_rows_to_dataframe_w_arrow(client, project_id): + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = "bigquery-public-data" + table_ref.dataset_id = "new_york_citibike" + table_ref.table_id = "citibike_stations" + + session = client.create_read_session( + table_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, + requested_streams=1 + ) + schema_type = session.WhichOneof("schema") + assert schema_type == "arrow_schema" + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( stream=session.streams[0] ) From 5e7a403313d434e481c2ecd269a546471550e617 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 10 Jul 2019 14:31:27 -0500 Subject: [PATCH 05/17] Add pyarrow to system tests deps. --- bigquery_storage/noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery_storage/noxfile.py b/bigquery_storage/noxfile.py index 32cce79a68df..bb1be8dec998 100644 --- a/bigquery_storage/noxfile.py +++ b/bigquery_storage/noxfile.py @@ -121,7 +121,7 @@ def system(session): session.install('-e', os.path.join('..', 'test_utils')) for local_dep in LOCAL_DEPS: session.install('-e', local_dep) - session.install('-e', '.[pandas,fastavro]') + session.install('-e', '.[fastavro,pandas,pyarrow]') # Run py.test against the system tests. session.run('py.test', '--quiet', 'tests/system/') From 5155ac91f657b31ff0d8c21ba9ed2ec71d013ff4 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 10 Jul 2019 16:11:53 -0500 Subject: [PATCH 06/17] Add to_arrow with BQ Storage API. --- .../google/cloud/bigquery/_pandas_helpers.py | 76 +++++++++++++----- bigquery/google/cloud/bigquery/table.py | 46 ++++++++++- bigquery/tests/unit/test_job.py | 2 + .../cloud/bigquery_storage_v1beta1/reader.py | 80 ++++++++++++++++--- bigquery_storage/tests/system/test_system.py | 2 +- bigquery_storage/tests/unit/test_reader.py | 2 +- 6 files changed, 171 insertions(+), 37 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 39f709765397..d77aa67d5cf5 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -15,6 +15,7 @@ """Shared helper functions for connecting BigQuery and pandas.""" import concurrent.futures +import functools import warnings from six.moves import queue @@ -271,14 +272,18 @@ def download_dataframe_tabledata_list(pages, schema, dtypes): yield _tabledata_list_page_to_dataframe(page, column_names, dtypes) -def _download_dataframe_bqstorage_stream( - download_state, - bqstorage_client, - column_names, - dtypes, - session, - stream, - worker_queue, +def _bqstorage_page_to_arrow(page): + return page.to_arrow() + + +def _bqstorage_page_to_dataframe(column_names, dtypes, page): + # page.to_dataframe() does not preserve column order in some versions + # of google-cloud-bigquery-storage. Access by column name to rearrange. + return page.to_dataframe(dtypes=dtypes)[column_names] + + +def _download_table_bqstorage_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item ): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position).rows(session) @@ -286,10 +291,8 @@ def _download_dataframe_bqstorage_stream( for page in rowstream.pages: if download_state.done: return - # page.to_dataframe() does not preserve column order in some versions - # of google-cloud-bigquery-storage. Access by column name to rearrange. - frame = page.to_dataframe(dtypes=dtypes)[column_names] - worker_queue.put(frame) + item = page_to_item(page) + worker_queue.put(item) def _nowait(futures): @@ -306,14 +309,13 @@ def _nowait(futures): return done, not_done -def download_dataframe_bqstorage( +def _download_table_bqstorage( project_id, table, bqstorage_client, - column_names, - dtypes, preserve_order=False, selected_fields=None, + page_to_item=None, ): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if "$" in table.table_id: @@ -335,15 +337,13 @@ def download_dataframe_bqstorage( session = bqstorage_client.create_read_session( table.to_bqstorage(), "projects/{}".format(project_id), - format_=bigquery_storage_v1beta1.proto.storage_pb2.DataFormat.ARROW, + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=read_options, requested_streams=requested_streams, ) - # Avoid reading rows from an empty table. pandas.concat will fail on an - # empty list. + # Avoid reading rows from an empty table. if not session.streams: - yield pandas.DataFrame(columns=column_names) return total_streams = len(session.streams) @@ -363,14 +363,13 @@ def download_dataframe_bqstorage( # See: https://github.com/googleapis/google-cloud-python/pull/7698 not_done = [ pool.submit( - _download_dataframe_bqstorage_stream, + _download_table_bqstorage_stream, download_state, bqstorage_client, - column_names, - dtypes, session, stream, worker_queue, + page_to_item, ) for stream in session.streams ] @@ -413,3 +412,36 @@ def download_dataframe_bqstorage( # Shutdown all background threads, now that they should know to # exit early. pool.shutdown(wait=True) + + +def download_arrow_bqstorage( + project_id, table, bqstorage_client, preserve_order=False, selected_fields=None +): + return _download_table_bqstorage( + project_id, + table, + bqstorage_client, + preserve_order=preserve_order, + selected_fields=selected_fields, + page_to_item=_bqstorage_page_to_arrow, + ) + + +def download_dataframe_bqstorage( + project_id, + table, + bqstorage_client, + column_names, + dtypes, + preserve_order=False, + selected_fields=None, +): + page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) + return _download_table_bqstorage( + project_id, + table, + bqstorage_client, + preserve_order=preserve_order, + selected_fields=selected_fields, + page_to_item=page_to_item, + ) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index c670af164ce8..4999de3b0d8d 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1403,14 +1403,42 @@ def _get_progress_bar(self, progress_bar_type): warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) return None - def _to_arrow_iterable(self): + def _to_arrow_iterable(self, bqstorage_client=None): """Create an iterable of arrow RecordBatches, to process the table as a stream.""" + if bqstorage_client is not None: + column_names = [field.name for field in self._schema] + try: + # Iterate over the stream so that read errors are raised (and + # the method can then fallback to tabledata.list). + for record_batch in _pandas_helpers.download_arrow_bqstorage( + self._project, + self._table, + bqstorage_client, + column_names, + preserve_order=self._preserve_order, + selected_fields=self._selected_fields, + ): + yield record_batch + return + except google.api_core.exceptions.Forbidden: + # Don't hide errors such as insufficient permissions to create + # a read session, or the API is not enabled. Both of those are + # clearly problems if the developer has explicitly asked for + # BigQuery Storage API support. + raise + except google.api_core.exceptions.GoogleAPICallError: + # There is a known issue with reading from small anonymous + # query results tables, so some errors are expected. Rather + # than throw those errors, try reading the DataFrame again, but + # with the tabledata.list API. + pass + for record_batch in _pandas_helpers.download_arrow_tabledata_list( iter(self.pages), self.schema ): yield record_batch - def to_arrow(self, progress_bar_type=None): + def to_arrow(self, progress_bar_type=None, bqstorage_client=None): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1433,6 +1461,18 @@ def to_arrow(self, progress_bar_type=None): ``'tqdm_gui'`` Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. + bqstorage_client ( \ + google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ + ): + **Beta Feature** Optional. A BigQuery Storage API client. If + supplied, use the faster BigQuery Storage API to fetch rows + from BigQuery. This API is a billable API. + + This method requires the ``pyarrow`` and + ``google-cloud-bigquery-storage`` libraries. + + Reading from a specific partition or snapshot is not + currently supported by this method. Returns: pyarrow.Table @@ -1452,7 +1492,7 @@ def to_arrow(self, progress_bar_type=None): progress_bar = self._get_progress_bar(progress_bar_type) record_batches = [] - for record_batch in self._to_arrow_iterable(): + for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): record_batches.append(record_batch) if progress_bar is not None: diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 22809c245d4b..dcc90b2d96a8 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -4897,6 +4897,7 @@ def test_to_dataframe_bqstorage(self): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/{}".format(self.PROJECT), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use default number of streams for best performance. requested_streams=0, @@ -5340,6 +5341,7 @@ def test_to_dataframe_bqstorage_preserve_order(query): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/test-project", + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use a single stream to preserve row order. requested_streams=1, diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index e66bd728868c..8c73e4443a5a 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -27,6 +27,10 @@ import pandas except ImportError: # pragma: NO COVER pandas = None +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None import six try: @@ -38,13 +42,12 @@ _STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,) -_FASTAVRO_REQUIRED = ( - "fastavro is required to parse ReadRowResponse messages with Avro bytes." -) + +_AVRO_BYTES_OPERATION = "parse ReadRowResponse messages with Avro bytes" +_ARROW_BYTES_OPERATION = "parse ReadRowResponse messages with Arrow bytes" +_FASTAVRO_REQUIRED = "fastavro is required to {operation}." _PANDAS_REQUIRED = "pandas is required to create a DataFrame" -_PYARROW_REQUIRED = ( - "pyarrow is required to parse ReadRowResponse messages with Arrow bytes." -) +_PYARROW_REQUIRED = "pyarrow is required to {operation}." class ReadRowsStream(object): @@ -121,7 +124,7 @@ def __iter__(self): while True: try: for message in self._wrapped: - rowcount = message.avro_rows.row_count + rowcount = message.row_count self._position.offset += rowcount yield message @@ -162,6 +165,26 @@ def rows(self, read_session): """ return ReadRowsIterable(self, read_session) + def to_arrow(self, read_session): + """Create a :class:`pyarrow.Table` of all rows in the stream. + + This method requires the pyarrow library and a stream using the Arrow + format. + + Args: + read_session ( \ + ~google.cloud.bigquery_storage_v1beta1.types.ReadSession \ + ): + The read session associated with this read rows stream. This + contains the schema, which is required to parse the data + messages. + + Returns: + pyarrow.Table: + A table of all rows in the stream. + """ + return self.rows(read_session).to_arrow() + def to_dataframe(self, read_session, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. @@ -245,6 +268,21 @@ def __iter__(self): for row in page: yield row + def to_arrow(self): + """Create a :class:`pyarrow.Table` of all rows in the stream. + + This method requires the pyarrow library and a stream using the Arrow + format. + + Returns: + pyarrow.Table: + A table of all rows in the stream. + """ + record_batches = [] + for page in self.pages: + record_batches.append(page.to_arrow()) + return pyarrow.Table.from_batches(record_batches) + def to_dataframe(self, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. @@ -294,8 +332,8 @@ def __init__(self, stream_parser, message): self._stream_parser = stream_parser self._message = message self._iter_rows = None - self._num_items = self._message.avro_rows.row_count - self._remaining = self._message.avro_rows.row_count + self._num_items = self._message.row_count + self._remaining = self._message.row_count def _parse_rows(self): """Parse rows from the message only once.""" @@ -358,6 +396,9 @@ def to_dataframe(self, dtypes=None): class _StreamParser(object): + def to_arrow(self, message): + raise NotImplementedError("Not implemented.") + def to_dataframe(self, message, dtypes=None): raise NotImplementedError("Not implemented.") @@ -396,6 +437,20 @@ def __init__(self, read_session): self._fastavro_schema = None self._column_names = None + def to_arrow(self, message): + """Create an :class:`pyarrow.RecordBatch` of rows in the page. + + Args: + message (google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse): + Protocol buffer from the read rows stream, to convert into an + Arrow record batch. + + Returns: + pyarrow.RecordBatch: + Rows from the message, as an Arrow record batch. + """ + raise NotImplementedError("to_arrow not implemented for Avro streams.") + def to_dataframe(self, message, dtypes=None): """Create a :class:`pandas.DataFrame` of rows in the page. @@ -476,11 +531,16 @@ def to_rows(self, message): class _ArrowStreamParser(_StreamParser): def __init__(self, read_session): if pyarrow is None: - raise ImportError(_PYARROW_REQUIRED) + raise ImportError( + _PYARROW_REQUIRED.format(operation=_ARROW_BYTES_OPERATION) + ) self._read_session = read_session self._schema = None + def to_arrow(self, message): + return self._parse_arrow_message(message) + def to_rows(self, message): record_batch = self._parse_arrow_message(message) diff --git a/bigquery_storage/tests/system/test_system.py b/bigquery_storage/tests/system/test_system.py index 6a86cffa016f..ee4b06cb43ce 100644 --- a/bigquery_storage/tests/system/test_system.py +++ b/bigquery_storage/tests/system/test_system.py @@ -104,7 +104,7 @@ def test_read_rows_to_dataframe_w_arrow(client, project_id): table_ref, "projects/{}".format(project_id), format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, - requested_streams=1 + requested_streams=1, ) schema_type = session.WhichOneof("schema") assert schema_type == "arrow_schema" diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index 94f33019433a..03e8779a8b8f 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -140,7 +140,7 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json): fastavro.schemaless_writer(blockio, avro_schema, row) response = bigquery_storage_v1beta1.types.ReadRowsResponse() - response.avro_rows.row_count = len(block) + response.row_count = len(block) response.avro_rows.serialized_binary_rows = blockio.getvalue() avro_blocks.append(response) return avro_blocks From 5a5edd51740f621412180b2cbb726fea2747f8ef Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 08:44:06 -0500 Subject: [PATCH 07/17] Revert changes to bigquery so that bigquery_storage can be released separately. --- .../google/cloud/bigquery/_pandas_helpers.py | 77 +++++-------------- bigquery/google/cloud/bigquery/table.py | 48 +----------- bigquery/setup.py | 2 +- bigquery/tests/unit/test_job.py | 2 - 4 files changed, 26 insertions(+), 103 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index d77aa67d5cf5..5a3a9833b572 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -15,7 +15,6 @@ """Shared helper functions for connecting BigQuery and pandas.""" import concurrent.futures -import functools import warnings from six.moves import queue @@ -75,8 +74,6 @@ def pyarrow_timestamp(): if pyarrow: - # This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py - # When modifying it be sure to update it there as well. BQ_TO_ARROW_SCALARS = { "BOOL": pyarrow.bool_, "BOOLEAN": pyarrow.bool_, @@ -272,18 +269,14 @@ def download_dataframe_tabledata_list(pages, schema, dtypes): yield _tabledata_list_page_to_dataframe(page, column_names, dtypes) -def _bqstorage_page_to_arrow(page): - return page.to_arrow() - - -def _bqstorage_page_to_dataframe(column_names, dtypes, page): - # page.to_dataframe() does not preserve column order in some versions - # of google-cloud-bigquery-storage. Access by column name to rearrange. - return page.to_dataframe(dtypes=dtypes)[column_names] - - -def _download_table_bqstorage_stream( - download_state, bqstorage_client, session, stream, worker_queue, page_to_item +def _download_dataframe_bqstorage_stream( + download_state, + bqstorage_client, + column_names, + dtypes, + session, + stream, + worker_queue, ): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position).rows(session) @@ -291,8 +284,10 @@ def _download_table_bqstorage_stream( for page in rowstream.pages: if download_state.done: return - item = page_to_item(page) - worker_queue.put(item) + # page.to_dataframe() does not preserve column order in some versions + # of google-cloud-bigquery-storage. Access by column name to rearrange. + frame = page.to_dataframe(dtypes=dtypes)[column_names] + worker_queue.put(frame) def _nowait(futures): @@ -309,13 +304,14 @@ def _nowait(futures): return done, not_done -def _download_table_bqstorage( +def download_dataframe_bqstorage( project_id, table, bqstorage_client, + column_names, + dtypes, preserve_order=False, selected_fields=None, - page_to_item=None, ): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if "$" in table.table_id: @@ -337,13 +333,14 @@ def _download_table_bqstorage( session = bqstorage_client.create_read_session( table.to_bqstorage(), "projects/{}".format(project_id), - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=read_options, requested_streams=requested_streams, ) - # Avoid reading rows from an empty table. + # Avoid reading rows from an empty table. pandas.concat will fail on an + # empty list. if not session.streams: + yield pandas.DataFrame(columns=column_names) return total_streams = len(session.streams) @@ -363,13 +360,14 @@ def _download_table_bqstorage( # See: https://github.com/googleapis/google-cloud-python/pull/7698 not_done = [ pool.submit( - _download_table_bqstorage_stream, + _download_dataframe_bqstorage_stream, download_state, bqstorage_client, + column_names, + dtypes, session, stream, worker_queue, - page_to_item, ) for stream in session.streams ] @@ -412,36 +410,3 @@ def _download_table_bqstorage( # Shutdown all background threads, now that they should know to # exit early. pool.shutdown(wait=True) - - -def download_arrow_bqstorage( - project_id, table, bqstorage_client, preserve_order=False, selected_fields=None -): - return _download_table_bqstorage( - project_id, - table, - bqstorage_client, - preserve_order=preserve_order, - selected_fields=selected_fields, - page_to_item=_bqstorage_page_to_arrow, - ) - - -def download_dataframe_bqstorage( - project_id, - table, - bqstorage_client, - column_names, - dtypes, - preserve_order=False, - selected_fields=None, -): - page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) - return _download_table_bqstorage( - project_id, - table, - bqstorage_client, - preserve_order=preserve_order, - selected_fields=selected_fields, - page_to_item=page_to_item, - ) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 4999de3b0d8d..8aa7788acdfa 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1403,42 +1403,14 @@ def _get_progress_bar(self, progress_bar_type): warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) return None - def _to_arrow_iterable(self, bqstorage_client=None): + def _to_arrow_iterable(self): """Create an iterable of arrow RecordBatches, to process the table as a stream.""" - if bqstorage_client is not None: - column_names = [field.name for field in self._schema] - try: - # Iterate over the stream so that read errors are raised (and - # the method can then fallback to tabledata.list). - for record_batch in _pandas_helpers.download_arrow_bqstorage( - self._project, - self._table, - bqstorage_client, - column_names, - preserve_order=self._preserve_order, - selected_fields=self._selected_fields, - ): - yield record_batch - return - except google.api_core.exceptions.Forbidden: - # Don't hide errors such as insufficient permissions to create - # a read session, or the API is not enabled. Both of those are - # clearly problems if the developer has explicitly asked for - # BigQuery Storage API support. - raise - except google.api_core.exceptions.GoogleAPICallError: - # There is a known issue with reading from small anonymous - # query results tables, so some errors are expected. Rather - # than throw those errors, try reading the DataFrame again, but - # with the tabledata.list API. - pass - for record_batch in _pandas_helpers.download_arrow_tabledata_list( iter(self.pages), self.schema ): yield record_batch - def to_arrow(self, progress_bar_type=None, bqstorage_client=None): + def to_arrow(self, progress_bar_type=None): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1461,18 +1433,6 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): ``'tqdm_gui'`` Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. - bqstorage_client ( \ - google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ - ): - **Beta Feature** Optional. A BigQuery Storage API client. If - supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. This API is a billable API. - - This method requires the ``pyarrow`` and - ``google-cloud-bigquery-storage`` libraries. - - Reading from a specific partition or snapshot is not - currently supported by this method. Returns: pyarrow.Table @@ -1492,7 +1452,7 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): progress_bar = self._get_progress_bar(progress_bar_type) record_batches = [] - for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): + for record_batch in self._to_arrow_iterable(): record_batches.append(record_batch) if progress_bar is not None: @@ -1559,7 +1519,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non supplied, use the faster BigQuery Storage API to fetch rows from BigQuery. This API is a billable API. - This method requires the ``pyarrow`` and + This method requires the ``fastavro`` and ``google-cloud-bigquery-storage`` libraries. Reading from a specific partition or snapshot is not diff --git a/bigquery/setup.py b/bigquery/setup.py index 84a0b384816c..5637c0f4fd53 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,7 @@ extras = { "bqstorage": [ "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", - "pyarrow >= 0.4.1", + "fastavro>=0.21.2", ], "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index dcc90b2d96a8..22809c245d4b 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -4897,7 +4897,6 @@ def test_to_dataframe_bqstorage(self): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/{}".format(self.PROJECT), - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use default number of streams for best performance. requested_streams=0, @@ -5341,7 +5340,6 @@ def test_to_dataframe_bqstorage_preserve_order(query): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/test-project", - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use a single stream to preserve row order. requested_streams=1, From 4986bd1cdc8f1cc500f3fcac1f09c62f2c527fda Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 09:43:15 -0500 Subject: [PATCH 08/17] Add tests for to_arrow. --- .../cloud/bigquery_storage_v1beta1/reader.py | 9 ++++ bigquery_storage/tests/system/test_system.py | 35 +++++++++++++++ bigquery_storage/tests/unit/test_reader.py | 44 ++++++++++++++++++- 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index 8c73e4443a5a..5d753e3f0132 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -367,6 +367,15 @@ def next(self): # Alias needed for Python 2/3 support. __next__ = next + def to_arrow(self): + """Create an :class:`pyarrow.RecordBatch` of rows in the page. + + Returns: + pyarrow.RecordBatch: + Rows from the message, as an Arrow record batch. + """ + return self._stream_parser.to_arrow(self._message) + def to_dataframe(self, dtypes=None): """Create a :class:`pandas.DataFrame` of rows in the page. diff --git a/bigquery_storage/tests/system/test_system.py b/bigquery_storage/tests/system/test_system.py index ee4b06cb43ce..aa5dd5db868f 100644 --- a/bigquery_storage/tests/system/test_system.py +++ b/bigquery_storage/tests/system/test_system.py @@ -18,6 +18,7 @@ import os import numpy +import pyarrow.types import pytest from google.cloud import bigquery_storage_v1beta1 @@ -67,6 +68,40 @@ def test_read_rows_full_table(client, project_id, small_table_reference): assert len(block.avro_rows.serialized_binary_rows) > 0 +def test_read_rows_to_arrow(client, project_id): + table_ref = bigquery_storage_v1beta1.types.TableReference() + table_ref.project_id = "bigquery-public-data" + table_ref.dataset_id = "new_york_citibike" + table_ref.table_id = "citibike_stations" + + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.selected_fields.append("station_id") + read_options.selected_fields.append("latitude") + read_options.selected_fields.append("longitude") + read_options.selected_fields.append("name") + session = client.create_read_session( + table_ref, + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, + read_options=read_options, + requested_streams=1, + ) + stream_pos = bigquery_storage_v1beta1.types.StreamPosition( + stream=session.streams[0] + ) + + tbl = client.read_rows(stream_pos).to_arrow(session) + + assert tbl.num_columns == 4 + schema = tbl.schema + # Use field_by_name because the order doesn't currently match that of + # selected_fields. + assert pyarrow.types.is_int64(schema.field_by_name("station_id").type) + assert pyarrow.types.is_float64(schema.field_by_name("latitude").type) + assert pyarrow.types.is_float64(schema.field_by_name("longitude").type) + assert pyarrow.types.is_string(schema.field_by_name("name").type) + + def test_read_rows_to_dataframe_w_avro(client, project_id): table_ref = bigquery_storage_v1beta1.types.TableReference() table_ref.project_id = "bigquery-public-data" diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index 03e8779a8b8f..748a45608f3a 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -146,7 +146,7 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json): return avro_blocks -def _bq_to_arrow_batches(bq_blocks, arrow_schema): +def _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): arrow_batches = [] for block in bq_blocks: arrays = [] @@ -158,8 +158,13 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema): size=len(block), ) ) - record_batch = pyarrow.RecordBatch.from_arrays(arrays, arrow_schema) + arrow_batches.append(pyarrow.RecordBatch.from_arrays(arrays, arrow_schema)) + return arrow_batches + +def _bq_to_arrow_batches(bq_blocks, arrow_schema): + arrow_batches = [] + for record_batch in _bq_to_arrow_batch_objects(bq_blocks, arrow_schema): response = bigquery_storage_v1beta1.types.ReadRowsResponse() response.arrow_record_batch.serialized_record_batch = ( record_batch.serialize().to_pybytes() @@ -466,6 +471,41 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client): assert page_4.remaining == 0 +def test_to_arrow_no_pyarrow_raises_import_error( + mut, class_under_test, mock_client, monkeypatch +): + monkeypatch.setattr(mut, "pyarrow", None) + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + + with pytest.raises(ImportError): + reader.to_arrow(read_session) + + with pytest.raises(ImportError): + reader.rows(read_session).to_arrow() + + with pytest.raises(ImportError): + next(reader.rows(read_session).pages).to_arrow() + + +def test_to_arrow_w_scalars_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches(SCALAR_BLOCKS, arrow_schema) + reader = class_under_test( + arrow_batches, mock_client, bigquery_storage_v1beta1.types.StreamPosition(), {} + ) + actual_table = reader.to_arrow(read_session) + expected_table = pyarrow.Table.from_batches( + _bq_to_arrow_batch_objects(SCALAR_BLOCKS, arrow_schema) + ) + assert actual_table == expected_table + + def test_to_dataframe_no_pandas_raises_import_error( mut, class_under_test, mock_client, monkeypatch ): From c2cc2b6ffd9d0a8a8951a38e5f0a6d0e9bffbd08 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 14:02:40 -0500 Subject: [PATCH 09/17] Revert "Revert changes to bigquery so that bigquery_storage can be released" This reverts commit 5a5edd51740f621412180b2cbb726fea2747f8ef. --- .../google/cloud/bigquery/_pandas_helpers.py | 77 ++++++++++++++----- bigquery/google/cloud/bigquery/table.py | 48 +++++++++++- bigquery/setup.py | 2 +- bigquery/tests/unit/test_job.py | 2 + 4 files changed, 103 insertions(+), 26 deletions(-) diff --git a/bigquery/google/cloud/bigquery/_pandas_helpers.py b/bigquery/google/cloud/bigquery/_pandas_helpers.py index 5a3a9833b572..d77aa67d5cf5 100644 --- a/bigquery/google/cloud/bigquery/_pandas_helpers.py +++ b/bigquery/google/cloud/bigquery/_pandas_helpers.py @@ -15,6 +15,7 @@ """Shared helper functions for connecting BigQuery and pandas.""" import concurrent.futures +import functools import warnings from six.moves import queue @@ -74,6 +75,8 @@ def pyarrow_timestamp(): if pyarrow: + # This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py + # When modifying it be sure to update it there as well. BQ_TO_ARROW_SCALARS = { "BOOL": pyarrow.bool_, "BOOLEAN": pyarrow.bool_, @@ -269,14 +272,18 @@ def download_dataframe_tabledata_list(pages, schema, dtypes): yield _tabledata_list_page_to_dataframe(page, column_names, dtypes) -def _download_dataframe_bqstorage_stream( - download_state, - bqstorage_client, - column_names, - dtypes, - session, - stream, - worker_queue, +def _bqstorage_page_to_arrow(page): + return page.to_arrow() + + +def _bqstorage_page_to_dataframe(column_names, dtypes, page): + # page.to_dataframe() does not preserve column order in some versions + # of google-cloud-bigquery-storage. Access by column name to rearrange. + return page.to_dataframe(dtypes=dtypes)[column_names] + + +def _download_table_bqstorage_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item ): position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) rowstream = bqstorage_client.read_rows(position).rows(session) @@ -284,10 +291,8 @@ def _download_dataframe_bqstorage_stream( for page in rowstream.pages: if download_state.done: return - # page.to_dataframe() does not preserve column order in some versions - # of google-cloud-bigquery-storage. Access by column name to rearrange. - frame = page.to_dataframe(dtypes=dtypes)[column_names] - worker_queue.put(frame) + item = page_to_item(page) + worker_queue.put(item) def _nowait(futures): @@ -304,14 +309,13 @@ def _nowait(futures): return done, not_done -def download_dataframe_bqstorage( +def _download_table_bqstorage( project_id, table, bqstorage_client, - column_names, - dtypes, preserve_order=False, selected_fields=None, + page_to_item=None, ): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" if "$" in table.table_id: @@ -333,14 +337,13 @@ def download_dataframe_bqstorage( session = bqstorage_client.create_read_session( table.to_bqstorage(), "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=read_options, requested_streams=requested_streams, ) - # Avoid reading rows from an empty table. pandas.concat will fail on an - # empty list. + # Avoid reading rows from an empty table. if not session.streams: - yield pandas.DataFrame(columns=column_names) return total_streams = len(session.streams) @@ -360,14 +363,13 @@ def download_dataframe_bqstorage( # See: https://github.com/googleapis/google-cloud-python/pull/7698 not_done = [ pool.submit( - _download_dataframe_bqstorage_stream, + _download_table_bqstorage_stream, download_state, bqstorage_client, - column_names, - dtypes, session, stream, worker_queue, + page_to_item, ) for stream in session.streams ] @@ -410,3 +412,36 @@ def download_dataframe_bqstorage( # Shutdown all background threads, now that they should know to # exit early. pool.shutdown(wait=True) + + +def download_arrow_bqstorage( + project_id, table, bqstorage_client, preserve_order=False, selected_fields=None +): + return _download_table_bqstorage( + project_id, + table, + bqstorage_client, + preserve_order=preserve_order, + selected_fields=selected_fields, + page_to_item=_bqstorage_page_to_arrow, + ) + + +def download_dataframe_bqstorage( + project_id, + table, + bqstorage_client, + column_names, + dtypes, + preserve_order=False, + selected_fields=None, +): + page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) + return _download_table_bqstorage( + project_id, + table, + bqstorage_client, + preserve_order=preserve_order, + selected_fields=selected_fields, + page_to_item=page_to_item, + ) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 8aa7788acdfa..4999de3b0d8d 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1403,14 +1403,42 @@ def _get_progress_bar(self, progress_bar_type): warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) return None - def _to_arrow_iterable(self): + def _to_arrow_iterable(self, bqstorage_client=None): """Create an iterable of arrow RecordBatches, to process the table as a stream.""" + if bqstorage_client is not None: + column_names = [field.name for field in self._schema] + try: + # Iterate over the stream so that read errors are raised (and + # the method can then fallback to tabledata.list). + for record_batch in _pandas_helpers.download_arrow_bqstorage( + self._project, + self._table, + bqstorage_client, + column_names, + preserve_order=self._preserve_order, + selected_fields=self._selected_fields, + ): + yield record_batch + return + except google.api_core.exceptions.Forbidden: + # Don't hide errors such as insufficient permissions to create + # a read session, or the API is not enabled. Both of those are + # clearly problems if the developer has explicitly asked for + # BigQuery Storage API support. + raise + except google.api_core.exceptions.GoogleAPICallError: + # There is a known issue with reading from small anonymous + # query results tables, so some errors are expected. Rather + # than throw those errors, try reading the DataFrame again, but + # with the tabledata.list API. + pass + for record_batch in _pandas_helpers.download_arrow_tabledata_list( iter(self.pages), self.schema ): yield record_batch - def to_arrow(self, progress_bar_type=None): + def to_arrow(self, progress_bar_type=None, bqstorage_client=None): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1433,6 +1461,18 @@ def to_arrow(self, progress_bar_type=None): ``'tqdm_gui'`` Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. + bqstorage_client ( \ + google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ + ): + **Beta Feature** Optional. A BigQuery Storage API client. If + supplied, use the faster BigQuery Storage API to fetch rows + from BigQuery. This API is a billable API. + + This method requires the ``pyarrow`` and + ``google-cloud-bigquery-storage`` libraries. + + Reading from a specific partition or snapshot is not + currently supported by this method. Returns: pyarrow.Table @@ -1452,7 +1492,7 @@ def to_arrow(self, progress_bar_type=None): progress_bar = self._get_progress_bar(progress_bar_type) record_batches = [] - for record_batch in self._to_arrow_iterable(): + for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): record_batches.append(record_batch) if progress_bar is not None: @@ -1519,7 +1559,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non supplied, use the faster BigQuery Storage API to fetch rows from BigQuery. This API is a billable API. - This method requires the ``fastavro`` and + This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. Reading from a specific partition or snapshot is not diff --git a/bigquery/setup.py b/bigquery/setup.py index 5637c0f4fd53..84a0b384816c 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,7 @@ extras = { "bqstorage": [ "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", - "fastavro>=0.21.2", + "pyarrow >= 0.4.1", ], "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 22809c245d4b..dcc90b2d96a8 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -4897,6 +4897,7 @@ def test_to_dataframe_bqstorage(self): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/{}".format(self.PROJECT), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use default number of streams for best performance. requested_streams=0, @@ -5340,6 +5341,7 @@ def test_to_dataframe_bqstorage_preserve_order(query): bqstorage_client.create_read_session.assert_called_once_with( mock.ANY, "projects/test-project", + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, read_options=mock.ANY, # Use a single stream to preserve row order. requested_streams=1, From c18c21ea102af9f6287e86aa38b962fd947fbe2e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 14:33:24 -0500 Subject: [PATCH 10/17] Add system test for to_arrow. --- bigquery/google/cloud/bigquery/table.py | 11 ++-- bigquery/tests/system.py | 73 +++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 4999de3b0d8d..cd826e78c043 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1406,7 +1406,6 @@ def _get_progress_bar(self, progress_bar_type): def _to_arrow_iterable(self, bqstorage_client=None): """Create an iterable of arrow RecordBatches, to process the table as a stream.""" if bqstorage_client is not None: - column_names = [field.name for field in self._schema] try: # Iterate over the stream so that read errors are raised (and # the method can then fallback to tabledata.list). @@ -1414,7 +1413,6 @@ def _to_arrow_iterable(self, bqstorage_client=None): self._project, self._table, bqstorage_client, - column_names, preserve_order=self._preserve_order, selected_fields=self._selected_fields, ): @@ -1506,8 +1504,13 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): # Indicate that the download has finished. progress_bar.close() - arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema) - return pyarrow.Table.from_batches(record_batches, schema=arrow_schema) + if record_batches: + return pyarrow.Table.from_batches(record_batches) + else: + # No records, use schema based on BigQuery schema. + arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema) + return pyarrow.Table.from_batches(record_batches, schema=arrow_schema) + def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): """Create an iterable of pandas DataFrames, to process the table as a stream. diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 2213bc7c88da..92adc0fdc665 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -39,6 +39,7 @@ pandas = None try: import pyarrow + import pyarrow.types except ImportError: # pragma: NO COVER pyarrow = None try: @@ -1959,6 +1960,78 @@ def test_create_table_rows_fetch_nested_schema(self): def _fetch_dataframe(self, query): return Config.CLIENT.query(query).result().to_dataframe() + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_nested_table_to_arrow(self): + from google.cloud.bigquery.job import SourceFormat + from google.cloud.bigquery.job import WriteDisposition + + SF = bigquery.SchemaField + schema = [ + SF("string_col", "STRING", mode="NULLABLE"), + SF( + "record_col", + "RECORD", + mode="NULLABLE", + fields=[ + SF("nested_string", "STRING", mode="NULLABLE"), + SF("nested_repeated", "INTEGER", mode="REPEATED"), + SF( + "nested_record", + "RECORD", + mode="NULLABLE", + fields=[SF("nested_nested_string", "STRING", mode="NULLABLE")], + ), + ], + ), + SF("float_col", "FLOAT", mode="NULLABLE"), + ] + record = { + "nested_string": "another string value", + "nested_repeated": [0, 1, 2], + "nested_record": {"nested_nested_string": "some deep insight"}, + } + to_insert = [ + { + "string_col": "Some value", + "record_col": record, + "float_col": 3.14, + } + ] + rows = [json.dumps(row) for row in to_insert] + body = six.BytesIO("{}\n".format("\n".join(rows)).encode("ascii")) + table_id = "test_table" + dataset = self.temp_dataset(_make_dataset_id("nested_df")) + table = dataset.table(table_id) + self.to_delete.insert(0, table) + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = WriteDisposition.WRITE_TRUNCATE + job_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON + job_config.schema = schema + # Load a table using a local JSON file from memory. + Config.CLIENT.load_table_from_file(body, table, job_config=job_config).result() + bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=Config.CLIENT._credentials + ) + + tbl = Config.CLIENT.list_rows(table, selected_fields=schema).to_arrow( + bqstorage_client=bqstorage_client + ) + + self.assertIsInstance(tbl, pyarrow.Table) + self.assertEqual(tbl.num_rows, 1) + self.assertEqual(tbl.num_columns, 3) + # Columns may not appear in the requested order. + self.assertTrue(pyarrow.types.is_float64(tbl.schema.field_by_name("float_col").type)) + self.assertTrue(pyarrow.types.is_string(tbl.schema.field_by_name("string_col").type)) + record_col = tbl.schema.field_by_name("record_col").type + self.assertTrue(pyarrow.types.is_struct(record_col)) + self.assertEqual(record_col.num_children, 3) + self.assertTrue(pyarrow.types.is_string(record_col[0].type)) + self.assertEqual(record_col[0].name, "nested_string") + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_nested_table_to_dataframe(self): from google.cloud.bigquery.job import SourceFormat From f6470622b33db112daa010176198beee89bd2d5a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 14:39:57 -0500 Subject: [PATCH 11/17] Remove parameterized error messages. --- .../cloud/bigquery_storage_v1beta1/reader.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index 5d753e3f0132..138fae4110eb 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -43,11 +43,13 @@ _STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,) -_AVRO_BYTES_OPERATION = "parse ReadRowResponse messages with Avro bytes" -_ARROW_BYTES_OPERATION = "parse ReadRowResponse messages with Arrow bytes" -_FASTAVRO_REQUIRED = "fastavro is required to {operation}." +_FASTAVRO_REQUIRED = ( + "fastavro is required to parse ReadRowResponse messages with Avro bytes." +) _PANDAS_REQUIRED = "pandas is required to create a DataFrame" -_PYARROW_REQUIRED = "pyarrow is required to {operation}." +_PYARROW_REQUIRED = ( + "pyarrow is required to parse ReadRowResponse messages with Arrow bytes." +) class ReadRowsStream(object): @@ -540,9 +542,7 @@ def to_rows(self, message): class _ArrowStreamParser(_StreamParser): def __init__(self, read_session): if pyarrow is None: - raise ImportError( - _PYARROW_REQUIRED.format(operation=_ARROW_BYTES_OPERATION) - ) + raise ImportError(_PYARROW_REQUIRED) self._read_session = read_session self._schema = None From 526ac196eca2a238a588729329a8c2b91eef24fe Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 15:00:35 -0500 Subject: [PATCH 12/17] Avoid single-field record types in system tests. --- bigquery/tests/system.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 92adc0fdc665..ea885030aaca 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1978,12 +1978,6 @@ def test_nested_table_to_arrow(self): fields=[ SF("nested_string", "STRING", mode="NULLABLE"), SF("nested_repeated", "INTEGER", mode="REPEATED"), - SF( - "nested_record", - "RECORD", - mode="NULLABLE", - fields=[SF("nested_nested_string", "STRING", mode="NULLABLE")], - ), ], ), SF("float_col", "FLOAT", mode="NULLABLE"), @@ -1991,7 +1985,6 @@ def test_nested_table_to_arrow(self): record = { "nested_string": "another string value", "nested_repeated": [0, 1, 2], - "nested_record": {"nested_nested_string": "some deep insight"}, } to_insert = [ { @@ -2028,9 +2021,12 @@ def test_nested_table_to_arrow(self): self.assertTrue(pyarrow.types.is_string(tbl.schema.field_by_name("string_col").type)) record_col = tbl.schema.field_by_name("record_col").type self.assertTrue(pyarrow.types.is_struct(record_col)) - self.assertEqual(record_col.num_children, 3) - self.assertTrue(pyarrow.types.is_string(record_col[0].type)) + self.assertEqual(record_col.num_children, 2) self.assertEqual(record_col[0].name, "nested_string") + self.assertTrue(pyarrow.types.is_string(record_col[0].type)) + self.assertEqual(record_col[1].name, "nested_repeated") + self.assertTrue(pyarrow.types.is_list(record_col[1].type)) + self.assertTrue(pyarrow.types.is_int64(record_col[1].type.value_type)) @unittest.skipIf(pandas is None, "Requires `pandas`") def test_nested_table_to_dataframe(self): From 9091cdc96e7b54e92add2ecc0cfdfa961f4ae4a6 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 16:22:06 -0500 Subject: [PATCH 13/17] Bump minimum google-cloud-bigquery-storage. --- bigquery/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/setup.py b/bigquery/setup.py index 84a0b384816c..f13439692c4c 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -36,7 +36,7 @@ ] extras = { "bqstorage": [ - "google-cloud-bigquery-storage >= 0.4.0, <2.0.0dev", + "google-cloud-bigquery-storage >= 0.6.0, <2.0.0dev", "pyarrow >= 0.4.1", ], "pandas": ["pandas>=0.17.1"], From 0ef0625332237778a729c2716e5fab1736079e70 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 16:28:35 -0500 Subject: [PATCH 14/17] Blacken --- bigquery/google/cloud/bigquery/table.py | 1 - bigquery/tests/system.py | 19 ++++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index cd826e78c043..9125f0eafc35 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1511,7 +1511,6 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema) return pyarrow.Table.from_batches(record_batches, schema=arrow_schema) - def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): """Create an iterable of pandas DataFrames, to process the table as a stream. diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index ea885030aaca..f234a431d51f 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1982,16 +1982,9 @@ def test_nested_table_to_arrow(self): ), SF("float_col", "FLOAT", mode="NULLABLE"), ] - record = { - "nested_string": "another string value", - "nested_repeated": [0, 1, 2], - } + record = {"nested_string": "another string value", "nested_repeated": [0, 1, 2]} to_insert = [ - { - "string_col": "Some value", - "record_col": record, - "float_col": 3.14, - } + {"string_col": "Some value", "record_col": record, "float_col": 3.14} ] rows = [json.dumps(row) for row in to_insert] body = six.BytesIO("{}\n".format("\n".join(rows)).encode("ascii")) @@ -2017,8 +2010,12 @@ def test_nested_table_to_arrow(self): self.assertEqual(tbl.num_rows, 1) self.assertEqual(tbl.num_columns, 3) # Columns may not appear in the requested order. - self.assertTrue(pyarrow.types.is_float64(tbl.schema.field_by_name("float_col").type)) - self.assertTrue(pyarrow.types.is_string(tbl.schema.field_by_name("string_col").type)) + self.assertTrue( + pyarrow.types.is_float64(tbl.schema.field_by_name("float_col").type) + ) + self.assertTrue( + pyarrow.types.is_string(tbl.schema.field_by_name("string_col").type) + ) record_col = tbl.schema.field_by_name("record_col").type self.assertTrue(pyarrow.types.is_struct(record_col)) self.assertEqual(record_col.num_children, 2) From 2e1a46d81c1f30bcf487b1264e7d57429169a02d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 17:20:49 -0500 Subject: [PATCH 15/17] Add tests for to_arrow. --- bigquery/google/cloud/bigquery/table.py | 96 +++++++++---------- bigquery/tests/unit/test_table.py | 118 ++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 47 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 9125f0eafc35..3c8ce6cc39d7 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -18,6 +18,7 @@ import copy import datetime +import functools import operator import warnings @@ -1403,20 +1404,15 @@ def _get_progress_bar(self, progress_bar_type): warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) return None - def _to_arrow_iterable(self, bqstorage_client=None): - """Create an iterable of arrow RecordBatches, to process the table as a stream.""" + def _to_page_iterable( + self, bqstorage_download, tabledata_list_download, bqstorage_client=None + ): if bqstorage_client is not None: try: # Iterate over the stream so that read errors are raised (and # the method can then fallback to tabledata.list). - for record_batch in _pandas_helpers.download_arrow_bqstorage( - self._project, - self._table, - bqstorage_client, - preserve_order=self._preserve_order, - selected_fields=self._selected_fields, - ): - yield record_batch + for item in bqstorage_download(): + yield item return except google.api_core.exceptions.Forbidden: # Don't hide errors such as insufficient permissions to create @@ -1431,10 +1427,27 @@ def _to_arrow_iterable(self, bqstorage_client=None): # with the tabledata.list API. pass - for record_batch in _pandas_helpers.download_arrow_tabledata_list( - iter(self.pages), self.schema - ): - yield record_batch + for item in tabledata_list_download(): + yield item + + def _to_arrow_iterable(self, bqstorage_client=None): + """Create an iterable of arrow RecordBatches, to process the table as a stream.""" + bqstorage_download = functools.partial( + _pandas_helpers.download_arrow_bqstorage, + self._project, + self._table, + bqstorage_client, + preserve_order=self._preserve_order, + selected_fields=self._selected_fields, + ) + tabledata_list_download = functools.partial( + _pandas_helpers.download_arrow_tabledata_list, iter(self.pages), self.schema + ) + return self._to_page_iterable( + bqstorage_download, + tabledata_list_download, + bqstorage_client=bqstorage_client, + ) def to_arrow(self, progress_bar_type=None, bqstorage_client=None): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a @@ -1516,39 +1529,28 @@ def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): See ``to_dataframe`` for argument descriptions. """ - if bqstorage_client is not None: - column_names = [field.name for field in self._schema] - try: - # Iterate over the stream so that read errors are raised (and - # the method can then fallback to tabledata.list). - for frame in _pandas_helpers.download_dataframe_bqstorage( - self._project, - self._table, - bqstorage_client, - column_names, - dtypes, - preserve_order=self._preserve_order, - selected_fields=self._selected_fields, - ): - yield frame - return - except google.api_core.exceptions.Forbidden: - # Don't hide errors such as insufficient permissions to create - # a read session, or the API is not enabled. Both of those are - # clearly problems if the developer has explicitly asked for - # BigQuery Storage API support. - raise - except google.api_core.exceptions.GoogleAPICallError: - # There is a known issue with reading from small anonymous - # query results tables, so some errors are expected. Rather - # than throw those errors, try reading the DataFrame again, but - # with the tabledata.list API. - pass - - for frame in _pandas_helpers.download_dataframe_tabledata_list( - iter(self.pages), self.schema, dtypes - ): - yield frame + column_names = [field.name for field in self._schema] + bqstorage_download = functools.partial( + _pandas_helpers.download_dataframe_bqstorage, + self._project, + self._table, + bqstorage_client, + column_names, + dtypes, + preserve_order=self._preserve_order, + selected_fields=self._selected_fields, + ) + tabledata_list_download = functools.partial( + _pandas_helpers.download_dataframe_tabledata_list, + iter(self.pages), + self.schema, + dtypes, + ) + return self._to_page_iterable( + bqstorage_download, + tabledata_list_download, + bqstorage_client=bqstorage_client, + ) def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Create a pandas DataFrame by loading all pages of a query. diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index a892dccf9f28..21b0f42b5a25 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1703,6 +1703,124 @@ def test_to_arrow_w_empty_table(self): self.assertEqual(child_field.type.value_type[0].name, "name") self.assertEqual(child_field.type.value_type[1].name, "age") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_arrow_w_bqstorage(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + streams = [ + # Use two streams we want to check frames are read from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) + session = bigquery_storage_v1beta1.types.ReadSession() + arrow_schema = pyarrow.schema( + [ + pyarrow.field("colA", pyarrow.int64()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.float64()), + pyarrow.field("colB", pyarrow.string()), + ] + ) + session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes() + bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + page_items = [ + pyarrow.array([1, -1]), + pyarrow.array([2.0, 4.0]), + pyarrow.array(["abc", "def"]), + ] + + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays( + page_items, arrow_schema + ) + mock_pages = (mock_page, mock_page, mock_page) + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + schema = [ + schema.SchemaField("colA", "INTEGER"), + schema.SchemaField("colC", "FLOAT"), + schema.SchemaField("colB", "STRING"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + actual_tbl = row_iterator.to_arrow(bqstorage_client=bqstorage_client) + + # Are the columns in the expected order? + self.assertEqual(actual_tbl.num_columns, 3) + self.assertEqual(actual_tbl.schema[0].name, "colA") + self.assertEqual(actual_tbl.schema[1].name, "colC") + self.assertEqual(actual_tbl.schema[2].name, "colB") + + # Have expected number of rows? + total_pages = len(streams) * len(mock_pages) + total_rows = len(page_items) * total_pages + self.assertEqual(actual_tbl.num_rows, total_rows) + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_arrow_w_bqstorage_no_streams(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession() + arrow_schema = pyarrow.schema( + [ + pyarrow.field("colA", pyarrow.string()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.string()), + pyarrow.field("colB", pyarrow.string()), + ] + ) + session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes() + bqstorage_client.create_read_session.return_value = session + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + [ + schema.SchemaField("colA", "STRING"), + schema.SchemaField("colC", "STRING"), + schema.SchemaField("colB", "STRING"), + ], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + + actual_table = row_iterator.to_arrow(bqstorage_client=bqstorage_client) + self.assertEqual(actual_table.num_columns, 3) + self.assertEqual(actual_table.num_rows, 0) + self.assertEqual(actual_table.schema[0].name, "colA") + self.assertEqual(actual_table.schema[1].name, "colC") + self.assertEqual(actual_table.schema[2].name, "colB") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf(tqdm is None, "Requires `tqdm`") @mock.patch("tqdm.tqdm_gui") From 46f1fab23a9f53ade7fba2ace844b91101943b8e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 11 Jul 2019 18:38:21 -0500 Subject: [PATCH 16/17] Exclude bad pyarrow release. --- bigquery/setup.py | 4 +++- bigquery_storage/setup.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/bigquery/setup.py b/bigquery/setup.py index f13439692c4c..6dfde2e439b2 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -37,7 +37,9 @@ extras = { "bqstorage": [ "google-cloud-bigquery-storage >= 0.6.0, <2.0.0dev", - "pyarrow >= 0.4.1", + # Bad Linux release for 0.14.0. + # https://issues.apache.org/jira/browse/ARROW-5868 + "pyarrow>=0.13.0, != 0.14.0", ], "pandas": ["pandas>=0.17.1"], # Exclude PyArrow dependency from Windows Python 2.7. diff --git a/bigquery_storage/setup.py b/bigquery_storage/setup.py index 1279497956ab..865a06d3bddc 100644 --- a/bigquery_storage/setup.py +++ b/bigquery_storage/setup.py @@ -31,7 +31,7 @@ extras = { 'pandas': 'pandas>=0.17.1', 'fastavro': 'fastavro>=0.21.2', - 'pyarrow': 'pyarrow>=0.13.0', + 'pyarrow': 'pyarrow>=0.13.0, != 0.14.0', } package_root = os.path.abspath(os.path.dirname(__file__)) From a88690e732c881ff7e8b842ba1c09455eb6b5dde Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 12 Jul 2019 08:32:46 -0500 Subject: [PATCH 17/17] Set streams in mock session. --- bigquery/tests/unit/test_table.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 21b0f42b5a25..a141d8f38abf 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1721,7 +1721,6 @@ def test_to_arrow_w_bqstorage(self): {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, ] session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) - session = bigquery_storage_v1beta1.types.ReadSession() arrow_schema = pyarrow.schema( [ pyarrow.field("colA", pyarrow.int64()), @@ -1738,6 +1737,8 @@ def test_to_arrow_w_bqstorage(self): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows + expected_num_rows = 2 + expected_num_columns = 3 page_items = [ pyarrow.array([1, -1]), pyarrow.array([2.0, 4.0]), @@ -1769,14 +1770,14 @@ def test_to_arrow_w_bqstorage(self): actual_tbl = row_iterator.to_arrow(bqstorage_client=bqstorage_client) # Are the columns in the expected order? - self.assertEqual(actual_tbl.num_columns, 3) + self.assertEqual(actual_tbl.num_columns, expected_num_columns) self.assertEqual(actual_tbl.schema[0].name, "colA") self.assertEqual(actual_tbl.schema[1].name, "colC") self.assertEqual(actual_tbl.schema[2].name, "colB") # Have expected number of rows? total_pages = len(streams) * len(mock_pages) - total_rows = len(page_items) * total_pages + total_rows = expected_num_rows * total_pages self.assertEqual(actual_tbl.num_rows, total_rows) @unittest.skipIf(pyarrow is None, "Requires `pyarrow`")