diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 2dab03a06..5460f7ca7 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -26,6 +26,7 @@ import logging import queue import threading +import time import warnings from typing import Any, Union, Optional, Callable, Generator, List @@ -869,6 +870,7 @@ def _download_table_bqstorage( max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, max_stream_count: Optional[int] = None, download_state: Optional[_DownloadState] = None, + timeout: Optional[float] = None, ) -> Generator[Any, None, None]: """Downloads a BigQuery table using the BigQuery Storage API. @@ -899,6 +901,9 @@ def _download_table_bqstorage( download_state (Optional[_DownloadState]): A threadsafe state object which can be used to observe the behavior of the worker threads created by this method. + timeout (Optional[float]): + The number of seconds to wait for the download to complete. + If None, wait indefinitely. Yields: pandas.DataFrame: Pandas DataFrames, one for each chunk of data @@ -906,6 +911,8 @@ def _download_table_bqstorage( Raises: ValueError: If attempting to read from a specific partition or snapshot. + concurrent.futures.TimeoutError: + If the download does not complete within the specified timeout. Note: This method requires the `google-cloud-bigquery-storage` library @@ -973,60 +980,73 @@ def _download_table_bqstorage( worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) - with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: - try: - # Manually submit jobs and wait for download to complete rather - # than using pool.map because pool.map continues running in the - # background even if there is an exception on the main thread. - # See: https://github.com/googleapis/google-cloud-python/pull/7698 - not_done = [ - pool.submit( - _download_table_bqstorage_stream, - download_state, - bqstorage_client, - session, - stream, - worker_queue, - page_to_item, - ) - for stream in session.streams - ] - - while not_done: - # Don't block on the worker threads. For performance reasons, - # we want to block on the queue's get method, instead. This - # prevents the queue from filling up, because the main thread - # has smaller gaps in time between calls to the queue's get - # method. For a detailed explanation, see: - # https://friendliness.dev/2019/06/18/python-nowait/ - done, not_done = _nowait(not_done) - for future in done: - # Call result() on any finished threads to raise any - # exceptions encountered. - future.result() + # Manually manage the pool to control shutdown behavior on timeout. + pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) + wait_on_shutdown = True + start_time = time.time() - try: - frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) - yield frame - except queue.Empty: # pragma: NO COVER - continue + try: + # Manually submit jobs and wait for download to complete rather + # than using pool.map because pool.map continues running in the + # background even if there is an exception on the main thread. + # See: https://github.com/googleapis/google-cloud-python/pull/7698 + not_done = [ + pool.submit( + _download_table_bqstorage_stream, + download_state, + bqstorage_client, + session, + stream, + worker_queue, + page_to_item, + ) + for stream in session.streams + ] + + while not_done: + # Check for timeout + if timeout is not None: + elapsed = time.time() - start_time + if elapsed > timeout: + wait_on_shutdown = False + raise concurrent.futures.TimeoutError( + f"Download timed out after {timeout} seconds." + ) + + # Don't block on the worker threads. For performance reasons, + # we want to block on the queue's get method, instead. This + # prevents the queue from filling up, because the main thread + # has smaller gaps in time between calls to the queue's get + # method. For a detailed explanation, see: + # https://friendliness.dev/2019/06/18/python-nowait/ + done, not_done = _nowait(not_done) + for future in done: + # Call result() on any finished threads to raise any + # exceptions encountered. + future.result() + + try: + frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) + yield frame + except queue.Empty: # pragma: NO COVER + continue - # Return any remaining values after the workers finished. - while True: # pragma: NO COVER - try: - frame = worker_queue.get_nowait() - yield frame - except queue.Empty: # pragma: NO COVER - break - finally: - # No need for a lock because reading/replacing a variable is - # defined to be an atomic operation in the Python language - # definition (enforced by the global interpreter lock). - download_state.done = True + # Return any remaining values after the workers finished. + while True: # pragma: NO COVER + try: + frame = worker_queue.get_nowait() + yield frame + except queue.Empty: # pragma: NO COVER + break + finally: + # No need for a lock because reading/replacing a variable is + # defined to be an atomic operation in the Python language + # definition (enforced by the global interpreter lock). + download_state.done = True - # Shutdown all background threads, now that they should know to - # exit early. - pool.shutdown(wait=True) + # Shutdown all background threads, now that they should know to + # exit early. + pool.shutdown(wait=wait_on_shutdown) def download_arrow_bqstorage( @@ -1037,6 +1057,7 @@ def download_arrow_bqstorage( selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, max_stream_count=None, + timeout=None, ): return _download_table_bqstorage( project_id, @@ -1047,6 +1068,7 @@ def download_arrow_bqstorage( page_to_item=_bqstorage_page_to_arrow, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) @@ -1060,6 +1082,7 @@ def download_dataframe_bqstorage( selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, max_stream_count=None, + timeout=None, ): page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) return _download_table_bqstorage( @@ -1071,6 +1094,7 @@ def download_dataframe_bqstorage( page_to_item=page_to_item, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 38b8a7148..e82deb1ef 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1857,6 +1857,7 @@ def to_arrow( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, create_bqstorage_client: bool = True, max_results: Optional[int] = None, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1904,6 +1905,10 @@ def to_arrow( .. versionadded:: 2.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pyarrow.Table A :class:`pyarrow.Table` populated with row data and column @@ -1921,6 +1926,7 @@ def to_arrow( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + timeout=timeout, ) # If changing the signature of this method, make sure to apply the same @@ -1949,6 +1955,7 @@ def to_dataframe( range_timestamp_dtype: Union[ Any, None ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Return a pandas DataFrame from a QueryJob @@ -2141,6 +2148,10 @@ def to_dataframe( .. versionadded:: 3.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data @@ -2174,6 +2185,7 @@ def to_dataframe( range_date_dtype=range_date_dtype, range_datetime_dtype=range_datetime_dtype, range_timestamp_dtype=range_timestamp_dtype, + timeout=timeout, ) # If changing the signature of this method, make sure to apply the same @@ -2191,6 +2203,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "geopandas.GeoDataFrame": """Return a GeoPandas GeoDataFrame from a QueryJob @@ -2269,6 +2282,9 @@ def to_geodataframe( then the data type will be ``numpy.dtype("object")``. BigQuery String type can be found at: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. Returns: geopandas.GeoDataFrame: @@ -2296,6 +2312,7 @@ def to_geodataframe( int_dtype=int_dtype, float_dtype=float_dtype, string_dtype=string_dtype, + timeout=timeout, ) def __iter__(self): diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 5efcb1958..195461006 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2087,6 +2087,7 @@ def to_arrow_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pyarrow.RecordBatch"]: """[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream. @@ -2127,6 +2128,10 @@ def to_arrow_iterable( setting this parameter value to a value > 0 can help reduce system resource consumption. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pyarrow.RecordBatch: A generator of :class:`~pyarrow.RecordBatch`. @@ -2144,6 +2149,7 @@ def to_arrow_iterable( selected_fields=self._selected_fields, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) tabledata_list_download = functools.partial( _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema @@ -2161,6 +2167,7 @@ def to_arrow( progress_bar_type: Optional[str] = None, bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, create_bqstorage_client: bool = True, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -2202,6 +2209,9 @@ def to_arrow( This argument does nothing if ``bqstorage_client`` is supplied. .. versionadded:: 1.24.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. Returns: pyarrow.Table @@ -2236,7 +2246,7 @@ def to_arrow( record_batches = [] for record_batch in self.to_arrow_iterable( - bqstorage_client=bqstorage_client + bqstorage_client=bqstorage_client, timeout=timeout ): record_batches.append(record_batch) @@ -2271,6 +2281,7 @@ def to_dataframe_iterable( dtypes: Optional[Dict[str, Any]] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2317,6 +2328,10 @@ def to_dataframe_iterable( setting this parameter value to a value > 0 can help reduce system resource consumption. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A generator of :class:`~pandas.DataFrame`. @@ -2344,6 +2359,7 @@ def to_dataframe_iterable( selected_fields=self._selected_fields, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) tabledata_list_download = functools.partial( _pandas_helpers.download_dataframe_row_iterator, @@ -2381,6 +2397,7 @@ def to_dataframe( range_timestamp_dtype: Union[ Any, None ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create a pandas DataFrame by loading all pages of a query. @@ -2577,6 +2594,10 @@ def to_dataframe( .. versionadded:: 3.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data and column @@ -2690,6 +2711,7 @@ def to_dataframe( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + timeout=timeout, ) # Default date dtype is `db_dtypes.DateDtype()` that could cause out of bounds error, @@ -2768,6 +2790,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "geopandas.GeoDataFrame": """Create a GeoPandas GeoDataFrame by loading all pages of a query. @@ -2902,6 +2925,7 @@ def to_geodataframe( int_dtype=int_dtype, float_dtype=float_dtype, string_dtype=string_dtype, + timeout=timeout, ) return geopandas.GeoDataFrame( @@ -2917,9 +2941,6 @@ class _EmptyRowIterator(RowIterator): statements. """ - pages = () - total_rows = 0 - def __init__( self, client=None, api_request=None, path=None, schema=(), *args, **kwargs ): @@ -2931,12 +2952,14 @@ def __init__( *args, **kwargs, ) + self._total_rows = 0 def to_arrow( self, progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create an empty class:`pyarrow.Table`. @@ -2944,6 +2967,7 @@ def to_arrow( progress_bar_type (str): Ignored. Added for compatibility with RowIterator. bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pyarrow.Table: An empty :class:`pyarrow.Table`. @@ -2970,6 +2994,7 @@ def to_dataframe( range_date_dtype=None, range_datetime_dtype=None, range_timestamp_dtype=None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -2990,6 +3015,7 @@ def to_dataframe( range_date_dtype (Any): Ignored. Added for compatibility with RowIterator. range_datetime_dtype (Any): Ignored. Added for compatibility with RowIterator. range_timestamp_dtype (Any): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -3008,6 +3034,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -3021,6 +3048,7 @@ def to_geodataframe( int_dtype (Any): Ignored. Added for compatibility with RowIterator. float_dtype (Any): Ignored. Added for compatibility with RowIterator. string_dtype (Any): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -3038,6 +3066,7 @@ def to_dataframe_iterable( dtypes: Optional[Dict[str, Any]] = None, max_queue_size: Optional[int] = None, max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pandas.DataFrame"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -3056,6 +3085,9 @@ def to_dataframe_iterable( max_stream_count: Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pandas.DataFrame`. @@ -3071,6 +3103,7 @@ def to_arrow_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: Optional[int] = None, max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pyarrow.RecordBatch"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -3086,6 +3119,9 @@ def to_arrow_iterable( max_stream_count: Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pyarrow.RecordBatch`. """ diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index a6c59b158..4390309f1 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -1023,5 +1023,38 @@ def test_query_job_to_geodataframe_delegation(wait_for_query): int_dtype=DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) assert df is row_iterator.to_geodataframe.return_value + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_dataframe_delegation(wait_for_query): + job = _make_job() + bqstorage_client = object() + timeout = 123.45 + + job.to_dataframe(bqstorage_client=bqstorage_client, timeout=timeout) + + wait_for_query.assert_called_once_with(job, None, max_results=None) + row_iterator = wait_for_query.return_value + row_iterator.to_dataframe.assert_called_once() + call_args = row_iterator.to_dataframe.call_args + assert call_args.kwargs["timeout"] == timeout + + +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_arrow_delegation(wait_for_query): + job = _make_job() + bqstorage_client = object() + timeout = 123.45 + + job.to_arrow(bqstorage_client=bqstorage_client, timeout=timeout) + + wait_for_query.assert_called_once_with(job, None, max_results=None) + row_iterator = wait_for_query.return_value + row_iterator.to_arrow.assert_called_once() + call_args = row_iterator.to_arrow.call_args + assert call_args.kwargs["timeout"] == timeout diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index bc94f5f54..a1cbb726b 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -13,12 +13,14 @@ # limitations under the License. import collections +import concurrent.futures import datetime import decimal import functools import gc import operator import queue +import time from typing import Union from unittest import mock import warnings @@ -2177,3 +2179,76 @@ def test_determine_requested_streams_invalid_max_stream_count(): """Tests that a ValueError is raised if max_stream_count is negative.""" with pytest.raises(ValueError): determine_requested_streams(preserve_order=False, max_stream_count=-1) + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires google-cloud-bigquery-storage" +) +def test__download_table_bqstorage_w_timeout_error(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + from unittest import mock + + mock_bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + fake_session = mock.Mock(streams=[mock.Mock()]) + mock_bqstorage_client.create_read_session.return_value = fake_session + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), + "table-z", + ) + + def slow_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + # Block until the main thread sets done=True (which it will on timeout) + while not download_state.done: + time.sleep(0.01) + + with mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=slow_download_stream + ): + # Use a very small timeout + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, mock_bqstorage_client, timeout=0.01 + ) + with pytest.raises(concurrent.futures.TimeoutError, match="timed out"): + list(result_gen) + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires google-cloud-bigquery-storage" +) +def test__download_table_bqstorage_w_timeout_success(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + from unittest import mock + + mock_bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + fake_session = mock.Mock(streams=["stream/s0"]) + mock_bqstorage_client.create_read_session.return_value = fake_session + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), + "table-z", + ) + + def fast_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + worker_queue.put("result_page") + + with mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=fast_download_stream + ): + # Use a generous timeout + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, mock_bqstorage_client, timeout=10.0 + ) + results = list(result_gen) + + assert results == ["result_page"] diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index af31d116b..97a1b4916 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -2495,6 +2495,20 @@ def test_to_geodataframe(self): else: assert not hasattr(df, "crs") + def test_methods_w_timeout(self): + pytest.importorskip("pyarrow") + pytest.importorskip("geopandas") + # Ensure that the timeout parameter is accepted by all methods without raising a TypeError, + # even though the _EmptyRowIterator implementations do not use the timeout value. + timeout = 42.0 + + # Call each type to ensure no TypeError is raised + self._make_one().to_arrow(timeout=timeout) + self._make_one().to_arrow_iterable(timeout=timeout) + self._make_one().to_dataframe(timeout=timeout) + self._make_one().to_dataframe_iterable(timeout=timeout) + self._make_one().to_geodataframe(timeout=timeout) + class TestRowIterator(unittest.TestCase): PYARROW_MINIMUM_VERSION = str(_versions_helpers._MIN_PYARROW_VERSION) @@ -5665,6 +5679,7 @@ def test_rowiterator_to_geodataframe_delegation(self, to_dataframe): int_dtype=DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) self.assertIsInstance(df, geopandas.GeoDataFrame) diff --git a/tests/unit/test_table_pandas.py b/tests/unit/test_table_pandas.py index a4fa3fa39..64d8b1451 100644 --- a/tests/unit/test_table_pandas.py +++ b/tests/unit/test_table_pandas.py @@ -301,6 +301,7 @@ def test_rowiterator_to_geodataframe_with_default_dtypes( int_dtype=bigquery.enums.DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) mock_geopandas.GeoDataFrame.assert_called_once_with( mock_df, crs="EPSG:4326", geometry="geo_col" @@ -358,6 +359,7 @@ def test_rowiterator_to_geodataframe_with_custom_dtypes( int_dtype=custom_int_dtype, float_dtype=custom_float_dtype, string_dtype=custom_string_dtype, + timeout=None, ) mock_geopandas.GeoDataFrame.assert_called_once_with( mock_df, crs="EPSG:4326", geometry="geo_col"