diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index a71acf8ecc8ac..18c67175bca04 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1519,6 +1519,17 @@ def to_arrow( if pyarrow is None: raise ValueError(_NO_PYARROW_ERROR) + if ( + bqstorage_client or create_bqstorage_client + ) and self.max_results is not None: + warnings.warn( + "Cannot use bqstorage_client if max_results is set, " + "reverting to fetching data with the tabledata.list endpoint.", + stacklevel=2, + ) + create_bqstorage_client = False + bqstorage_client = None + owns_bqstorage_client = False if not bqstorage_client and create_bqstorage_client: owns_bqstorage_client = True @@ -1674,33 +1685,39 @@ def to_dataframe( create_bqstorage_client = False bqstorage_client = None - owns_bqstorage_client = False - if not bqstorage_client and create_bqstorage_client: - owns_bqstorage_client = True - bqstorage_client = self.client._create_bqstorage_client() - - try: - progress_bar = self._get_progress_bar(progress_bar_type) + if pyarrow is not None: + # If pyarrow is available, calling to_arrow, then converting to a + # pandas dataframe is about 2x faster. This is because pandas.concat is + # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is + # usually no-copy. + record_batch = self.to_arrow( + progress_bar_type=progress_bar_type, + bqstorage_client=bqstorage_client, + create_bqstorage_client=create_bqstorage_client, + ) + df = record_batch.to_pandas() + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + return df - frames = [] - for frame in self._to_dataframe_iterable( - bqstorage_client=bqstorage_client, dtypes=dtypes - ): - frames.append(frame) + # The bqstorage_client is only used if pyarrow is available, so the + # rest of this method only needs to account for tabledata.list. + progress_bar = self._get_progress_bar(progress_bar_type) - if progress_bar is not None: - # In some cases, the number of total rows is not populated - # until the first page of rows is fetched. Update the - # progress bar's total to keep an accurate count. - progress_bar.total = progress_bar.total or self.total_rows - progress_bar.update(len(frame)) + frames = [] + for frame in self._to_dataframe_iterable(dtypes=dtypes): + frames.append(frame) if progress_bar is not None: - # Indicate that the download has finished. - progress_bar.close() - finally: - if owns_bqstorage_client: - bqstorage_client.transport.channel.close() + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(len(frame)) + + if progress_bar is not None: + # Indicate that the download has finished. + progress_bar.close() # Avoid concatting an empty list. if not frames: diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 1043df45f9a3c..7004505ef38ad 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1809,6 +1809,43 @@ def test_to_arrow_w_empty_table(self): self.assertEqual(child_field.type.value_type[0].name, "name") self.assertEqual(child_field.type.value_type[1].name, "age") + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_arrow_max_results_w_create_bqstorage_warning(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + mock_client = _mock_client() + + row_iterator = self._make_one( + client=mock_client, + api_request=api_request, + path=path, + schema=schema, + max_results=42, + ) + + with warnings.catch_warnings(record=True) as warned: + row_iterator.to_arrow(create_bqstorage_client=True) + + matches = [ + warning + for warning in warned + if warning.category is UserWarning + and "cannot use bqstorage_client" in str(warning).lower() + and "tabledata.list" in str(warning) + ] + self.assertEqual(len(matches), 1, msg="User warning was not emitted.") + mock_client._create_bqstorage_client.assert_not_called() + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" @@ -1856,7 +1893,7 @@ def test_to_arrow_w_bqstorage(self): mock_page = mock.create_autospec(reader.ReadRowsPage) mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays( - page_items, arrow_schema + page_items, schema=arrow_schema ) mock_pages = (mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -2216,9 +2253,9 @@ def test_to_dataframe_w_various_types_nullable(self): ] row_data = [ [None, None, None, None, None, None], - ["1.4338368E9", "420", "1.1", "Cash", "true", "1999-12-01"], - ["1.3878117E9", "2580", "17.7", "Cash", "false", "1953-06-14"], - ["1.3855653E9", "2280", "4.4", "Credit", "true", "1981-11-04"], + ["1.4338368E9", "420", "1.1", u"Cash", "true", "1999-12-01"], + ["1.3878117E9", "2580", "17.7", u"Cash", "false", "1953-06-14"], + ["1.3855653E9", "2280", "4.4", u"Credit", "true", "1981-11-04"], ] rows = [{"f": [{"v": field} for field in row]} for row in row_data] path = "/foo" @@ -2238,7 +2275,7 @@ def test_to_dataframe_w_various_types_nullable(self): else: self.assertIsInstance(row.start_timestamp, pandas.Timestamp) self.assertIsInstance(row.seconds, float) - self.assertIsInstance(row.payment_type, str) + self.assertIsInstance(row.payment_type, six.string_types) self.assertIsInstance(row.complete, bool) self.assertIsInstance(row.date, datetime.date) @@ -2256,9 +2293,9 @@ def test_to_dataframe_column_dtypes(self): SchemaField("date", "DATE"), ] row_data = [ - ["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"], - ["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"], - ["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"], + ["1.4338368E9", "420", "1.1", "1.77", u"Cash", "true", "1999-12-01"], + ["1.3878117E9", "2580", "17.7", "28.5", u"Cash", "false", "1953-06-14"], + ["1.3855653E9", "2280", "4.4", "7.1", u"Credit", "true", "1981-11-04"], ] rows = [{"f": [{"v": field} for field in row]} for row in row_data] path = "/foo" @@ -2424,9 +2461,9 @@ def test_to_dataframe_w_bqstorage_no_streams(self): api_request=None, path=None, schema=[ - schema.SchemaField("colA", "IGNORED"), - schema.SchemaField("colC", "IGNORED"), - schema.SchemaField("colB", "IGNORED"), + schema.SchemaField("colA", "INTEGER"), + schema.SchemaField("colC", "FLOAT"), + schema.SchemaField("colB", "STRING"), ], table=mut.TableReference.from_string("proj.dset.tbl"), ) @@ -2498,10 +2535,11 @@ def test_to_dataframe_w_bqstorage_empty_streams(self): mock_pages = mock.PropertyMock(return_value=()) type(mock_rows).pages = mock_pages + # Schema is required when there are no record batches in the stream. schema = [ - schema.SchemaField("colA", "IGNORED"), - schema.SchemaField("colC", "IGNORED"), - schema.SchemaField("colB", "IGNORED"), + schema.SchemaField("colA", "INTEGER"), + schema.SchemaField("colC", "FLOAT"), + schema.SchemaField("colB", "STRING"), ] row_iterator = mut.RowIterator( @@ -2560,14 +2598,15 @@ def test_to_dataframe_w_bqstorage_nonempty(self): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows page_items = [ - {"colA": 1, "colB": "abc", "colC": 2.0}, - {"colA": -1, "colB": "def", "colC": 4.0}, + pyarrow.array([1, -1]), + pyarrow.array([2.0, 4.0]), + pyarrow.array(["abc", "def"]), ] - - mock_page = mock.create_autospec(reader.ReadRowsPage) - mock_page.to_dataframe.return_value = pandas.DataFrame( - page_items, columns=["colA", "colB", "colC"] + page_record_batch = pyarrow.RecordBatch.from_arrays( + page_items, schema=arrow_schema ) + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_arrow.return_value = page_record_batch mock_pages = (mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -2594,7 +2633,7 @@ def test_to_dataframe_w_bqstorage_nonempty(self): # Have expected number of rows? total_pages = len(streams) * len(mock_pages) - total_rows = len(page_items) * total_pages + total_rows = len(page_items[0]) * total_pages self.assertEqual(len(got.index), total_rows) # Don't close the client if it was passed in. @@ -2633,11 +2672,14 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_rowstream.rows.return_value = mock_rows - page_data_frame = pandas.DataFrame( - [{"colA": 1}, {"colA": -1}], columns=["colA"] + page_items = [ + pyarrow.array([1, -1]), + ] + page_record_batch = pyarrow.RecordBatch.from_arrays( + page_items, schema=arrow_schema ) mock_page = mock.create_autospec(reader.ReadRowsPage) - mock_page.to_dataframe.return_value = page_data_frame + mock_page.to_arrow.return_value = page_record_batch mock_pages = (mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -2649,7 +2691,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): self.assertEqual(list(got), ["colA"]) total_pages = len(streams) * len(mock_pages) - total_rows = len(page_data_frame) * total_pages + total_rows = len(page_items[0]) * total_pages self.assertEqual(len(got.index), total_rows) self.assertTrue(got.index.is_unique) @@ -2695,14 +2737,15 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): page_items = [-1, 0, 1] type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items)) - def blocking_to_dataframe(*args, **kwargs): - # Sleep for longer than the waiting interval. This ensures the - # progress_queue gets written to more than once because it gives - # the worker->progress updater time to sum intermediate updates. + def blocking_to_arrow(*args, **kwargs): + # Sleep for longer than the waiting interval so that we know we're + # only reading one page per loop at most. time.sleep(2 * mut._PROGRESS_INTERVAL) - return pandas.DataFrame({"testcol": page_items}) + return pyarrow.RecordBatch.from_arrays( + [pyarrow.array(page_items)], schema=arrow_schema + ) - mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_page.to_arrow.side_effect = blocking_to_arrow mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page) type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) @@ -2728,7 +2771,7 @@ def blocking_to_dataframe(*args, **kwargs): progress_updates = [ args[0] for args, kwargs in tqdm_mock().update.call_args_list ] - # Should have sent >1 update due to delay in blocking_to_dataframe. + # Should have sent >1 update due to delay in blocking_to_arrow. self.assertGreater(len(progress_updates), 1) self.assertEqual(sum(progress_updates), expected_total_rows) tqdm_mock().close.assert_called_once() @@ -2768,18 +2811,20 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) bqstorage_client.create_read_session.return_value = session + page_items = [ + pyarrow.array([1, -1]), + pyarrow.array([2.0, 4.0]), + pyarrow.array(["abc", "def"]), + ] - def blocking_to_dataframe(*args, **kwargs): + def blocking_to_arrow(*args, **kwargs): # Sleep for longer than the waiting interval so that we know we're # only reading one page per loop at most. time.sleep(2 * mut._PROGRESS_INTERVAL) - return pandas.DataFrame( - {"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]}, - columns=["colA", "colB", "colC"], - ) + return pyarrow.RecordBatch.from_arrays(page_items, schema=arrow_schema) mock_page = mock.create_autospec(reader.ReadRowsPage) - mock_page.to_dataframe.side_effect = blocking_to_dataframe + mock_page.to_arrow.side_effect = blocking_to_arrow mock_rows = mock.create_autospec(reader.ReadRowsIterable) mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page)) type(mock_rows).pages = mock_pages