From 13bd849797be6f8ec076782d12413f75de56bff2 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 19 Dec 2019 01:45:16 +0000 Subject: [PATCH] feat(bigquery): add timeout parameter to QueryJob.done() method (#9875) * feat(bigquery): add timeout to QueryJob.done() * Add tests for methods that got timeout param In addition, fix the timeout logic in QueryJob.done() - the timeouts are in different units (seconds vs. milliseconds) * Fix lint warning (unused variable) * Adjust timeout exception type in QueryJob.result() * Update dependency pins The new timeout feature requires more recent versions of the API core and google auth dependencies. * Add safety margin on top of server-side timeout If the server-side processing timeout is used (the `timeout_ms` API parameter) as the total timeout, it should be slightly longer than the actual server-side timeout in order to not timeout the connection while there might still be chance that the server-side processing has actually completed. --- bigquery/google/cloud/bigquery/client.py | 7 +- bigquery/google/cloud/bigquery/job.py | 52 +++++++-- bigquery/setup.py | 2 + bigquery/tests/system.py | 24 ++++ bigquery/tests/unit/test_client.py | 3 + bigquery/tests/unit/test_job.py | 133 +++++++++++++++++++++-- 6 files changed, 202 insertions(+), 19 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index f3b7aab40789..5fd7bceea973 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1081,7 +1081,7 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False): raise def _get_query_results( - self, job_id, retry, project=None, timeout_ms=None, location=None + self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None, ): """Get the query results object for a query job. @@ -1096,6 +1096,9 @@ def _get_query_results( (Optional) number of milliseconds the the API call should wait for the query to complete before the request times out. location (str): Location of the query job. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before retrying the HTTP request. Returns: google.cloud.bigquery.query._QueryResults: @@ -1122,7 +1125,7 @@ def _get_query_results( # job is complete (from QueryJob.done(), called ultimately from # QueryJob.result()). So we don't need to poll here. resource = self._call_api( - retry, method="GET", path=path, query_params=extra_params + retry, method="GET", path=path, query_params=extra_params, timeout=timeout ) return _QueryResults.from_api_repr(resource) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 19e4aaf185e4..d20e5b5fb11f 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -14,10 +14,14 @@ """Define API Jobs.""" +from __future__ import division + +import concurrent.futures import copy import re import threading +import requests import six from six.moves import http_client @@ -50,6 +54,7 @@ _DONE_STATE = "DONE" _STOPPED_REASON = "stopped" _TIMEOUT_BUFFER_SECS = 0.1 +_SERVER_TIMEOUT_MARGIN_SECS = 1.0 _CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE) _ERROR_REASON_TO_EXCEPTION = { @@ -663,7 +668,7 @@ def exists(self, client=None, retry=DEFAULT_RETRY): else: return True - def reload(self, client=None, retry=DEFAULT_RETRY): + def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None): """API call: refresh job properties via a GET request. See @@ -675,6 +680,9 @@ def reload(self, client=None, retry=DEFAULT_RETRY): ``client`` stored on the current dataset. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before retrying the HTTP request. """ client = self._require_client(client) @@ -683,7 +691,11 @@ def reload(self, client=None, retry=DEFAULT_RETRY): extra_params["location"] = self.location api_response = client._call_api( - retry, method="GET", path=self.path, query_params=extra_params + retry, + method="GET", + path=self.path, + query_params=extra_params, + timeout=timeout, ) self._set_properties(api_response) @@ -2994,9 +3006,16 @@ def estimated_bytes_processed(self): result = int(result) return result - def done(self, retry=DEFAULT_RETRY): + def done(self, retry=DEFAULT_RETRY, timeout=None): """Refresh the job and checks if it is complete. + Args: + retry (Optional[google.api_core.retry.Retry]): + How to retry the call that retrieves query results. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before retrying the HTTP request. + Returns: bool: True if the job is complete, False otherwise. """ @@ -3007,11 +3026,25 @@ def done(self, retry=DEFAULT_RETRY): timeout_ms = None if self._done_timeout is not None: # Subtract a buffer for context switching, network latency, etc. - timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS - timeout = max(min(timeout, 10), 0) - self._done_timeout -= timeout + api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS + api_timeout = max(min(api_timeout, 10), 0) + self._done_timeout -= api_timeout self._done_timeout = max(0, self._done_timeout) - timeout_ms = int(timeout * 1000) + timeout_ms = int(api_timeout * 1000) + + # If the server-side processing timeout (timeout_ms) is specified and + # would be picked as the total request timeout, we want to add a small + # margin to it - we don't want to timeout the connection just as the + # server-side processing might have completed, but instead slightly + # after the server-side deadline. + # However, if `timeout` is specified, and is shorter than the adjusted + # server timeout, the former prevails. + if timeout_ms is not None and timeout_ms > 0: + server_timeout_with_margin = timeout_ms / 1000 + _SERVER_TIMEOUT_MARGIN_SECS + if timeout is not None: + timeout = min(server_timeout_with_margin, timeout) + else: + timeout = server_timeout_with_margin # Do not refresh is the state is already done, as the job will not # change once complete. @@ -3022,13 +3055,14 @@ def done(self, retry=DEFAULT_RETRY): project=self.project, timeout_ms=timeout_ms, location=self.location, + timeout=timeout, ) # Only reload the job once we know the query is complete. # This will ensure that fields such as the destination table are # correctly populated. if self._query_results.complete: - self.reload(retry=retry) + self.reload(retry=retry, timeout=timeout) return self.state == _DONE_STATE @@ -3132,6 +3166,8 @@ def result( exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self raise + except requests.exceptions.Timeout as exc: + six.raise_from(concurrent.futures.TimeoutError, exc) # If the query job is complete but there are no query results, this was # special job, such as a DDL query. Return an empty result set to diff --git a/bigquery/setup.py b/bigquery/setup.py index 42d53301ee0a..5e2ba87a4b1c 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -30,6 +30,8 @@ release_status = "Development Status :: 5 - Production/Stable" dependencies = [ 'enum34; python_version < "3.4"', + "google-auth >= 1.9.0, < 2.0dev", + "google-api-core >= 1.15.0, < 2.0dev", "google-cloud-core >= 1.0.3, < 2.0dev", "google-resumable-media >= 0.3.1, != 0.4.0, < 0.6.0dev", "protobuf >= 3.6.0", diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index bba527178f47..b431f628d001 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -26,6 +26,7 @@ import uuid import re +import requests import six import psutil import pytest @@ -1893,6 +1894,29 @@ def test_query_iter(self): row_tuples = [r.values() for r in query_job] self.assertEqual(row_tuples, [(1,)]) + def test_querying_data_w_timeout(self): + job_config = bigquery.QueryJobConfig() + job_config.use_query_cache = False + + query_job = Config.CLIENT.query( + """ + SELECT name, SUM(number) AS total_people + FROM `bigquery-public-data.usa_names.usa_1910_current` + GROUP BY name + """, + location="US", + job_config=job_config, + ) + + # Specify a very tight deadline to demonstrate that the timeout + # actually has effect. + with self.assertRaises(requests.exceptions.Timeout): + query_job.done(timeout=0.1) + + # Now wait for the result using a more realistic deadline. + query_job.result(timeout=30) + self.assertTrue(query_job.done(timeout=30)) + @unittest.skipIf(pandas is None, "Requires `pandas`") def test_query_results_to_dataframe(self): QUERY = """ diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index c4cdb7fdfd2f..c9166bd5d7c0 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -226,12 +226,14 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self): project="other-project", location=self.LOCATION, timeout_ms=500, + timeout=42, ) conn.api_request.assert_called_once_with( method="GET", path="/projects/other-project/queries/nothere", query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION}, + timeout=42, ) def test__get_query_results_miss_w_client_location(self): @@ -248,6 +250,7 @@ def test__get_query_results_miss_w_client_location(self): method="GET", path="/projects/PROJECT/queries/nothere", query_params={"maxResults": 0, "location": self.LOCATION}, + timeout=None, ) def test__get_query_results_hit(self): diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 52e2abf8f304..e732bed4dcc6 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent import copy import json import textwrap @@ -19,6 +20,7 @@ import mock import pytest +import requests from six.moves import http_client try: @@ -725,6 +727,7 @@ def test_reload_defaults(self): method="GET", path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), query_params={"location": self.LOCATION}, + timeout=None, ) self.assertEqual(job._properties, resource) @@ -746,13 +749,14 @@ def test_reload_explicit(self): call_api.return_value = resource retry = DEFAULT_RETRY.with_deadline(1) - job.reload(client=client, retry=retry) + job.reload(client=client, retry=retry, timeout=4.2) call_api.assert_called_once_with( retry, method="GET", path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), query_params={}, + timeout=4.2, ) self.assertEqual(job._properties, resource) @@ -2489,7 +2493,7 @@ def test_reload_w_bound_client(self): job.reload() conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={} + method="GET", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -2506,7 +2510,7 @@ def test_reload_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={} + method="GET", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -2527,6 +2531,7 @@ def test_reload_w_job_reference(self): method="GET", path="/projects/alternative-project/jobs/{}".format(self.JOB_ID), query_params={"location": "US"}, + timeout=None, ) def test_cancel_w_bound_client(self): @@ -2988,7 +2993,7 @@ def test_reload_w_bound_client(self): job.reload() conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={} + method="GET", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -3007,7 +3012,7 @@ def test_reload_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={} + method="GET", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -3351,7 +3356,7 @@ def test_reload_w_bound_client(self): job.reload() conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={} + method="GET", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -3372,7 +3377,7 @@ def test_reload_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={} + method="GET", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -3901,6 +3906,72 @@ def test_done(self): job = self._get_target_class().from_api_repr(resource, client) self.assertTrue(job.done()) + def test_done_w_timeout(self): + client = _make_client(project=self.PROJECT) + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + + with mock.patch.object( + client, "_get_query_results" + ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: + job.done(timeout=42) + + fake_get_results.assert_called_once() + call_args = fake_get_results.call_args + self.assertEqual(call_args.kwargs.get("timeout"), 42) + + call_args = fake_reload.call_args + self.assertEqual(call_args.kwargs.get("timeout"), 42) + + def test_done_w_timeout_and_shorter_internal_api_timeout(self): + from google.cloud.bigquery.job import _TIMEOUT_BUFFER_SECS + from google.cloud.bigquery.job import _SERVER_TIMEOUT_MARGIN_SECS + + client = _make_client(project=self.PROJECT) + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + job._done_timeout = 8.8 + + with mock.patch.object( + client, "_get_query_results" + ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: + job.done(timeout=42) + + # The expected timeout used is the job's own done_timeout minus a + # fixed amount (bigquery.job._TIMEOUT_BUFFER_SECS) increased by the + # safety margin on top of server-side processing timeout - that's + # because that final number is smaller than the given timeout (42 seconds). + expected_timeout = 8.8 - _TIMEOUT_BUFFER_SECS + _SERVER_TIMEOUT_MARGIN_SECS + + fake_get_results.assert_called_once() + call_args = fake_get_results.call_args + self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) + + call_args = fake_reload.call_args + self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) + + def test_done_w_timeout_and_longer_internal_api_timeout(self): + client = _make_client(project=self.PROJECT) + resource = self._make_resource(ended=False) + job = self._get_target_class().from_api_repr(resource, client) + job._done_timeout = 8.8 + + with mock.patch.object( + client, "_get_query_results" + ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: + job.done(timeout=5.5) + + # The expected timeout used is simply the given timeout, as the latter + # is shorter than the job's internal done timeout. + expected_timeout = 5.5 + + fake_get_results.assert_called_once() + call_args = fake_get_results.call_args + self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) + + call_args = fake_reload.call_args + self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) + def test_query_plan(self): from google.cloud._helpers import _RFC3339_MICROS from google.cloud.bigquery.job import QueryPlanEntry @@ -4561,6 +4632,26 @@ def test_result_error(self): expected_line = "{}:{}".format(i, line) assert expected_line in full_text + def test_result_transport_timeout_error(self): + query = textwrap.dedent( + """ + SELECT foo, bar + FROM table_baz + WHERE foo == bar""" + ) + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, query, client) + call_api_patch = mock.patch( + "google.cloud.bigquery.client.Client._call_api", + autospec=True, + side_effect=requests.exceptions.Timeout("Server response took too long."), + ) + + # Make sure that timeout errors get rebranded to concurrent futures timeout. + with call_api_patch, self.assertRaises(concurrent.futures.TimeoutError): + job.result(timeout=1) + def test__begin_error(self): from google.cloud import exceptions @@ -5003,7 +5094,7 @@ def test_reload_w_bound_client(self): self.assertNotEqual(job.destination, table_ref) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={} + method="GET", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -5028,10 +5119,34 @@ def test_reload_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={} + method="GET", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) + def test_reload_w_timeout(self): + from google.cloud.bigquery.dataset import DatasetReference + from google.cloud.bigquery.job import QueryJobConfig + + PATH = "/projects/%s/jobs/%s" % (self.PROJECT, self.JOB_ID) + DS_ID = "DATASET" + DEST_TABLE = "dest_table" + RESOURCE = self._make_resource() + conn = _make_connection(RESOURCE) + client = _make_client(project=self.PROJECT, connection=conn) + dataset_ref = DatasetReference(self.PROJECT, DS_ID) + table_ref = dataset_ref.table(DEST_TABLE) + config = QueryJobConfig() + config.destination = table_ref + job = self._make_one(self.JOB_ID, None, client, job_config=config) + + job.reload(timeout=4.2) + + self.assertNotEqual(job.destination, table_ref) + + conn.api_request.assert_called_once_with( + method="GET", path=PATH, query_params={}, timeout=4.2 + ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow(self): begun_resource = self._make_resource()