-
Notifications
You must be signed in to change notification settings - Fork 307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: avoid unnecessary API call in QueryJob.result() when job is already finished #1900
Changes from 7 commits
0c9a8a2
07839b5
5112315
6297efd
e5990eb
fddb557
6a43cfd
12fa9fb
5149c25
4ee4975
08373bc
2a587aa
f7f1e81
e42d8ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -17,11 +17,11 @@ | |||||||
import concurrent.futures | ||||||||
import copy | ||||||||
import re | ||||||||
import time | ||||||||
import typing | ||||||||
from typing import Any, Dict, Iterable, List, Optional, Union | ||||||||
|
||||||||
from google.api_core import exceptions | ||||||||
from google.api_core.future import polling as polling_future | ||||||||
from google.api_core import retry as retries | ||||||||
import requests | ||||||||
|
||||||||
|
@@ -1383,7 +1383,7 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): | |||||||
def _reload_query_results( | ||||||||
self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: Optional[float] = None | ||||||||
): | ||||||||
"""Refresh the cached query results. | ||||||||
"""Refresh the cached query results unless already cached and complete. | ||||||||
|
||||||||
Args: | ||||||||
retry (Optional[google.api_core.retry.Retry]): | ||||||||
|
@@ -1392,6 +1392,8 @@ def _reload_query_results( | |||||||
The number of seconds to wait for the underlying HTTP transport | ||||||||
before using ``retry``. | ||||||||
""" | ||||||||
# Optimization: avoid a call to jobs.getQueryResults if it's already | ||||||||
# been fetched, e.g. from jobs.query first page of results. | ||||||||
if self._query_results and self._query_results.complete: | ||||||||
return | ||||||||
|
||||||||
|
@@ -1430,40 +1432,6 @@ def _reload_query_results( | |||||||
timeout=transport_timeout, | ||||||||
) | ||||||||
|
||||||||
def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None): | ||||||||
"""Check if the query has finished running and raise if it's not. | ||||||||
|
||||||||
If the query has finished, also reload the job itself. | ||||||||
""" | ||||||||
# If an explicit timeout is not given, fall back to the transport timeout | ||||||||
# stored in _blocking_poll() in the process of polling for job completion. | ||||||||
transport_timeout = timeout if timeout is not None else self._transport_timeout | ||||||||
|
||||||||
try: | ||||||||
self._reload_query_results(retry=retry, timeout=transport_timeout) | ||||||||
except exceptions.GoogleAPIError as exc: | ||||||||
# Reloading also updates error details on self, thus no need for an | ||||||||
# explicit self.set_exception() call if reloading succeeds. | ||||||||
try: | ||||||||
self.reload(retry=retry, timeout=transport_timeout) | ||||||||
except exceptions.GoogleAPIError: | ||||||||
# Use the query results reload exception, as it generally contains | ||||||||
# much more useful error information. | ||||||||
self.set_exception(exc) | ||||||||
finally: | ||||||||
return | ||||||||
|
||||||||
# Only reload the job once we know the query is complete. | ||||||||
# This will ensure that fields such as the destination table are | ||||||||
# correctly populated. | ||||||||
if not self._query_results.complete: | ||||||||
raise polling_future._OperationNotComplete() | ||||||||
else: | ||||||||
try: | ||||||||
self.reload(retry=retry, timeout=transport_timeout) | ||||||||
except exceptions.GoogleAPIError as exc: | ||||||||
self.set_exception(exc) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thought: We probably should have been calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. We are. 😅
Which we call from
Which we call from
|
||||||||
|
||||||||
def result( # type: ignore # (incompatible with supertype) | ||||||||
self, | ||||||||
page_size: Optional[int] = None, | ||||||||
|
@@ -1528,6 +1496,10 @@ def result( # type: ignore # (incompatible with supertype) | |||||||
If Non-``None`` and non-default ``job_retry`` is | ||||||||
provided and the job is not retryable. | ||||||||
""" | ||||||||
# Note: Since waiting for a query job to finish is more complex than | ||||||||
# refreshing the job state in a loop, we avoid calling the superclass | ||||||||
# in this method. | ||||||||
|
||||||||
if self.dry_run: | ||||||||
return _EmptyRowIterator( | ||||||||
project=self.project, | ||||||||
|
@@ -1548,46 +1520,92 @@ def result( # type: ignore # (incompatible with supertype) | |||||||
" provided to the query that created this job." | ||||||||
) | ||||||||
|
||||||||
first = True | ||||||||
restart_query_job = False | ||||||||
|
||||||||
def do_get_result(): | ||||||||
nonlocal first | ||||||||
def is_job_done(): | ||||||||
nonlocal restart_query_job | ||||||||
|
||||||||
if first: | ||||||||
first = False | ||||||||
else: | ||||||||
# Note that we won't get here if retry_do_query is | ||||||||
# None, because we won't use a retry. | ||||||||
if restart_query_job: | ||||||||
restart_query_job = False | ||||||||
|
||||||||
# The orinal job is failed. Create a new one. | ||||||||
tswast marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
# | ||||||||
# Note that we won't get here if retry_do_query is | ||||||||
# None, because we won't use a retry. | ||||||||
job = retry_do_query() | ||||||||
|
||||||||
# If it's already failed, we might as well stop: | ||||||||
if job.done() and job.exception() is not None: | ||||||||
raise job.exception() | ||||||||
|
||||||||
# Become the new job: | ||||||||
self.__dict__.clear() | ||||||||
self.__dict__.update(job.__dict__) | ||||||||
|
||||||||
# This shouldn't be necessary, because once we have a good | ||||||||
# job, it should stay good,and we shouldn't have to retry. | ||||||||
# But let's be paranoid. :) | ||||||||
# It's possible the job fails again and we'll have to | ||||||||
# retry that too. | ||||||||
self._retry_do_query = retry_do_query | ||||||||
self._job_retry = job_retry | ||||||||
|
||||||||
super(QueryJob, self).result(retry=retry, timeout=timeout) | ||||||||
|
||||||||
# Since the job could already be "done" (e.g. got a finished job | ||||||||
# via client.get_job), the superclass call to done() might not | ||||||||
# set the self._query_results cache. | ||||||||
if self._query_results is None or not self._query_results.complete: | ||||||||
self._reload_query_results(retry=retry, timeout=timeout) | ||||||||
# Refresh the job status with jobs.get because some of the | ||||||||
# exceptions thrown by jobs.getQueryResults like timeout and | ||||||||
# rateLimitExceeded errors are ambiguous. We want to know if | ||||||||
# the query job failed and not just the call to | ||||||||
# jobs.getQueryResults. | ||||||||
if self.done(retry=retry, timeout=timeout): | ||||||||
# If it's already failed, we might as well stop. | ||||||||
if self.exception() is not None: | ||||||||
# Only try to restart the query job if the job failed for | ||||||||
# a retriable reason. For example, don't restart the query | ||||||||
# if the call to reload the job metadata within self.done() | ||||||||
# timed out. | ||||||||
restart_query_job = True | ||||||||
raise self.exception() | ||||||||
else: | ||||||||
# Make sure that the _query_results are cached so we | ||||||||
# can return a complete RowIterator. | ||||||||
# | ||||||||
# Note: As an optimization, _reload_query_results | ||||||||
# doesn't make any API calls if the query results are | ||||||||
# already cached and have jobComplete=True in the | ||||||||
# response from the REST API. This ensures we aren't | ||||||||
# making any extra API calls if the previous loop | ||||||||
# iteration fetched the finished job. | ||||||||
self._reload_query_results(retry=retry, timeout=timeout) | ||||||||
return True | ||||||||
|
||||||||
# Call jobs.getQueryResults with max results set to 0 just to | ||||||||
# wait for the query to finish. Unlike most methods, | ||||||||
# jobs.getQueryResults hangs as long as it can to ensure we | ||||||||
# know when the query has finished as soon as possible. | ||||||||
self._reload_query_results(retry=retry, timeout=timeout) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uh oh, if But we don't want There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't the worst way to fail, but it'd be nice to do the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||
|
||||||||
# Even if the query is finished now according to | ||||||||
# jobs.getQueryResults, we'll want to reload the job status if | ||||||||
# it's not already DONE. | ||||||||
return False | ||||||||
|
||||||||
if retry_do_query is not None and job_retry is not None: | ||||||||
do_get_result = job_retry(do_get_result) | ||||||||
is_job_done = job_retry(is_job_done) | ||||||||
|
||||||||
do_get_result() | ||||||||
# timeout can be `None` or an object from our superclass | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I've updated the comments with some more info as well as some things to consider in case we want to have a default value for timeout in future. |
||||||||
# indicating a default timeout. | ||||||||
remaining_timeout = timeout if isinstance(timeout, (float, int)) else None | ||||||||
|
||||||||
if remaining_timeout is None: | ||||||||
# Since is_job_done() calls jobs.getQueryResults, which is a | ||||||||
# long-running API, don't delay the next request at all. | ||||||||
while not is_job_done(): | ||||||||
pass | ||||||||
else: | ||||||||
# Use a monotonic clock since we don't actually care about | ||||||||
# daylight savings or similar, just the elapsed time. | ||||||||
previous_time = time.monotonic() | ||||||||
|
||||||||
while not is_job_done(): | ||||||||
current_time = time.monotonic() | ||||||||
elapsed_time = current_time - previous_time | ||||||||
remaining_timeout = remaining_timeout - elapsed_time | ||||||||
previous_time = current_time | ||||||||
|
||||||||
if remaining_timeout < 0: | ||||||||
raise concurrent.futures.TimeoutError() | ||||||||
|
||||||||
except exceptions.GoogleAPICallError as exc: | ||||||||
exc.message = _EXCEPTION_FOOTER_TEMPLATE.format( | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was overridden because we wanted
result()
from the superclass to calljobs.getQueryResults
, not justjobs.get
(i.e.job.reload()
in Python). Now that we aren't using the superclass forresult()
, this method is no longer necessary.