Skip to content

Commit

Permalink
fix: avoid unnecessary API call in QueryJob.result() when job is alre…
Browse files Browse the repository at this point in the history
…ady finished (#1900)


fix: retry query job after ambiguous failures
Co-authored-by: Chalmer Lowe <chalmerlowe@google.com>
  • Loading branch information
tswast and chalmerlowe authored Apr 18, 2024
1 parent bf8861c commit 1367b58
Show file tree
Hide file tree
Showing 6 changed files with 547 additions and 230 deletions.
9 changes: 5 additions & 4 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,16 @@ def _to_query_job(
errors = query_response["errors"]
query_job._properties["status"]["errors"] = errors

# Transform job state so that QueryJob doesn't try to restart the query.
# Avoid an extra call to `getQueryResults` if the query has finished.
job_complete = query_response.get("jobComplete")
if job_complete:
query_job._properties["status"]["state"] = "DONE"
query_job._query_results = google.cloud.bigquery.query._QueryResults(
query_response
)
else:
query_job._properties["status"]["state"] = "PENDING"

# We want job.result() to refresh the job state, so the conversion is
# always "PENDING", even if the job is finished.
query_job._properties["status"]["state"] = "PENDING"

return query_job

Expand Down
172 changes: 111 additions & 61 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import concurrent.futures
import copy
import re
import time
import typing
from typing import Any, Dict, Iterable, List, Optional, Union

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
from google.api_core import retry as retries
import requests

Expand Down Expand Up @@ -1383,7 +1383,7 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
def _reload_query_results(
self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: Optional[float] = None
):
"""Refresh the cached query results.
"""Refresh the cached query results unless already cached and complete.
Args:
retry (Optional[google.api_core.retry.Retry]):
Expand All @@ -1392,6 +1392,8 @@ def _reload_query_results(
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
"""
# Optimization: avoid a call to jobs.getQueryResults if it's already
# been fetched, e.g. from jobs.query first page of results.
if self._query_results and self._query_results.complete:
return

Expand Down Expand Up @@ -1430,40 +1432,6 @@ def _reload_query_results(
timeout=transport_timeout,
)

def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
"""Check if the query has finished running and raise if it's not.
If the query has finished, also reload the job itself.
"""
# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Use the query results reload exception, as it generally contains
# much more useful error information.
self.set_exception(exc)
finally:
return

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if not self._query_results.complete:
raise polling_future._OperationNotComplete()
else:
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)

def result( # type: ignore # (incompatible with supertype)
self,
page_size: Optional[int] = None,
Expand Down Expand Up @@ -1528,6 +1496,10 @@ def result( # type: ignore # (incompatible with supertype)
If Non-``None`` and non-default ``job_retry`` is
provided and the job is not retryable.
"""
# Note: Since waiting for a query job to finish is more complex than
# refreshing the job state in a loop, we avoid calling the superclass
# in this method.

if self.dry_run:
return _EmptyRowIterator(
project=self.project,
Expand All @@ -1548,46 +1520,124 @@ def result( # type: ignore # (incompatible with supertype)
" provided to the query that created this job."
)

first = True
restart_query_job = False

def is_job_done():
nonlocal restart_query_job

def do_get_result():
nonlocal first
if restart_query_job:
restart_query_job = False

if first:
first = False
else:
# The original job has failed. Create a new one.
#
# Note that we won't get here if retry_do_query is
# None, because we won't use a retry.

# The orinal job is failed. Create a new one.
job = retry_do_query()

# If it's already failed, we might as well stop:
if job.done() and job.exception() is not None:
raise job.exception()

# Become the new job:
self.__dict__.clear()
self.__dict__.update(job.__dict__)

# This shouldn't be necessary, because once we have a good
# job, it should stay good,and we shouldn't have to retry.
# But let's be paranoid. :)
# It's possible the job fails again and we'll have to
# retry that too.
self._retry_do_query = retry_do_query
self._job_retry = job_retry

super(QueryJob, self).result(retry=retry, timeout=timeout)

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
if self._query_results is None or not self._query_results.complete:
self._reload_query_results(retry=retry, timeout=timeout)
# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
# Only try to restart the query job if the job failed for
# a retriable reason. For example, don't restart the query
# if the call to reload the job metadata within self.done()
# timed out.
#
# The `restart_query_job` must only be called after a
# successful call to the `jobs.get` REST API and we
# determine that the job has failed.
#
# The `jobs.get` REST API
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
# is called via `self.done()` which calls
# `self.reload()`.
#
# To determine if the job failed, the `self.exception()`
# is set from `self.reload()` via
# `self._set_properties()`, which translates the
# `Job.status.errorResult` field
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
# into an exception that can be processed by the
# `job_retry` predicate.
restart_query_job = True
raise job_failed_exception
else:
# Make sure that the _query_results are cached so we
# can return a complete RowIterator.
#
# Note: As an optimization, _reload_query_results
# doesn't make any API calls if the query results are
# already cached and have jobComplete=True in the
# response from the REST API. This ensures we aren't
# making any extra API calls if the previous loop
# iteration fetched the finished job.
self._reload_query_results(retry=retry, timeout=timeout)
return True

# Call jobs.getQueryResults with max results set to 0 just to
# wait for the query to finish. Unlike most methods,
# jobs.getQueryResults hangs as long as it can to ensure we
# know when the query has finished as soon as possible.
self._reload_query_results(retry=retry, timeout=timeout)

# Even if the query is finished now according to
# jobs.getQueryResults, we'll want to reload the job status if
# it's not already DONE.
return False

if retry_do_query is not None and job_retry is not None:
do_get_result = job_retry(do_get_result)

do_get_result()
is_job_done = job_retry(is_job_done)

# timeout can be a number of seconds, `None`, or a
# `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
# sentinel object indicating a default timeout if we choose to add
# one some day. This value can come from our PollingFuture
# superclass and was introduced in
# https://github.com/googleapis/python-api-core/pull/462.
if isinstance(timeout, (float, int)):
remaining_timeout = timeout
else:
# Note: we may need to handle _DEFAULT_VALUE as a separate
# case someday, but even then the best we can do for queries
# is 72+ hours for hyperparameter tuning jobs:
# https://cloud.google.com/bigquery/quotas#query_jobs
#
# The timeout for a multi-statement query is 24+ hours. See:
# https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
remaining_timeout = None

if remaining_timeout is None:
# Since is_job_done() calls jobs.getQueryResults, which is a
# long-running API, don't delay the next request at all.
while not is_job_done():
pass
else:
# Use a monotonic clock since we don't actually care about
# daylight savings or similar, just the elapsed time.
previous_time = time.monotonic()

while not is_job_done():
current_time = time.monotonic()
elapsed_time = current_time - previous_time
remaining_timeout = remaining_timeout - elapsed_time
previous_time = current_time

if remaining_timeout < 0:
raise concurrent.futures.TimeoutError()

except exceptions.GoogleAPICallError as exc:
exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
Expand Down
52 changes: 47 additions & 5 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,25 @@

_DEFAULT_RETRY_DEADLINE = 10.0 * 60.0 # 10 minutes

# Allow for a few retries after the API request times out. This relevant for
# rateLimitExceeded errors, which can be raised either by the Google load
# balancer or the BigQuery job server.
_DEFAULT_JOB_DEADLINE = 3.0 * _DEFAULT_RETRY_DEADLINE
# Ambiguous errors (e.g. internalError, backendError, rateLimitExceeded) retry
# until the full `_DEFAULT_RETRY_DEADLINE`. This is because the
# `jobs.getQueryResults` REST API translates a job failure into an HTTP error.
#
# TODO(https://github.com/googleapis/python-bigquery/issues/1903): Investigate
# if we can fail early for ambiguous errors in `QueryJob.result()`'s call to
# the `jobs.getQueryResult` API.
#
# We need `_DEFAULT_JOB_DEADLINE` to be some multiple of
# `_DEFAULT_RETRY_DEADLINE` to allow for a few retries after the retry
# timeout is reached.
#
# Note: This multiple should actually be a multiple of
# (2 * _DEFAULT_RETRY_DEADLINE). After an ambiguous exception, the first
# call from `job_retry()` refreshes the job state without actually restarting
# the query. The second `job_retry()` actually restarts the query. For a more
# detailed explanation, see the comments where we set `restart_query_job = True`
# in `QueryJob.result()`'s inner `is_job_done()` function.
_DEFAULT_JOB_DEADLINE = 2.0 * (2.0 * _DEFAULT_RETRY_DEADLINE)


def _should_retry(exc):
Expand All @@ -66,17 +81,44 @@ def _should_retry(exc):
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""

# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We
# briefly had a default timeout, but even setting it at more than twice the
# theoretical server-side default timeout of 2 minutes was not enough for
# complex queries. See:
# https://github.com/googleapis/python-bigquery/issues/970#issuecomment-921934647
DEFAULT_TIMEOUT = None
"""The default API timeout.
This is the time to wait per request. To adjust the total wait time, set a
deadline on the retry object.
"""

job_retry_reasons = "rateLimitExceeded", "backendError", "jobRateLimitExceeded"
job_retry_reasons = (
"rateLimitExceeded",
"backendError",
"internalError",
"jobRateLimitExceeded",
)


def _job_should_retry(exc):
# Sometimes we have ambiguous errors, such as 'backendError' which could
# be due to an API problem or a job problem. For these, make sure we retry
# our is_job_done() function.
#
# Note: This won't restart the job unless we know for sure it's because of
# the job status and set restart_query_job = True in that loop. This means
# that we might end up calling this predicate twice for the same job
# but from different paths: (1) from jobs.getQueryResults RetryError and
# (2) from translating the job error from the body of a jobs.get response.
#
# Note: If we start retrying job types other than queries where we don't
# call the problematic getQueryResults API to check the status, we need
# to provide a different predicate, as there shouldn't be ambiguous
# errors in those cases.
if isinstance(exc, exceptions.RetryError):
exc = exc.cause

if not hasattr(exc, "errors") or len(exc.errors) == 0:
return False

Expand Down
Loading

0 comments on commit 1367b58

Please sign in to comment.