Skip to content

Commit

Permalink
feat: add Client.query_and_wait which directly returns a `RowIterat…
Browse files Browse the repository at this point in the history
…or` of results

Set the `QUERY_PREVIEW_ENABLED=TRUE` environment variable to use this with the
new JOB_CREATION_OPTIONAL mode (currently in preview).
  • Loading branch information
tswast committed Nov 15, 2023
1 parent 58b3152 commit 93d47e7
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 12 deletions.
97 changes: 86 additions & 11 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for interacting with the job REST APIs from the client."""
"""Helpers for interacting with the job REST APIs from the client.
For queries, there are three cases to consider:
1. jobs.insert: This always returns a job resource.
2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
This sometimes can return the results inline, but always includes a job ID.
3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
This sometimes doesn't create a job at all, instead returning the results.
Client.query() calls either (1) or (2), depending on what the user provides
for the api_method parameter. query() always returns a QueryJob object, which
can retry the query when the query job fails for a retriable reason.
Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
local results from the response or may wrap a query job containing multiple
pages of results. Even though query_and_wait() waits for the job to complete,
we still need a separate job_retry object because there are different
predicates where it is safe to generate a new query ID.
"""

import copy
import os
import uuid
from typing import Any, Dict, TYPE_CHECKING, Optional

import google.api_core.exceptions as core_exceptions
from google.api_core import retry as retries

from google.cloud.bigquery import job
from google.cloud.bigquery import table

# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
Expand Down Expand Up @@ -122,7 +143,12 @@ def do_query():
return future


def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]:
def _to_query_request(
query: str,
job_config: Optional[job.QueryJobConfig],
location: Optional[str],
timeout: Optional[float],
) -> Dict[str, Any]:
"""Transform from Job resource to QueryRequest resource.
Most of the keys in job.configuration.query are in common with
Expand All @@ -149,6 +175,12 @@ def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any
request_body.setdefault("formatOptions", {})
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore

if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
request_body["location"] = location
request_body["query"] = query

return request_body


Expand Down Expand Up @@ -211,6 +243,10 @@ def _to_query_job(
return query_job


def _to_query_path(project: str) -> str:
return f"/projects/{project}/queries"


def query_jobs_query(
client: "Client",
query: str,
Expand All @@ -221,18 +257,12 @@ def query_jobs_query(
timeout: Optional[float],
job_retry: retries.Retry,
) -> job.QueryJob:
"""Initiate a query using jobs.query.
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
"""
path = f"/projects/{project}/queries"
request_body = _to_query_request(job_config)

if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
request_body["location"] = location
request_body["query"] = query
path = _to_query_path(project)
request_body = _to_query_request(query, job_config, location, timeout)

def do_query():
request_body["requestId"] = make_job_id()
Expand All @@ -257,3 +287,48 @@ def do_query():
future._job_retry = job_retry

return future


def query_and_wait(
client: "Client",
query: str,
job_config: Optional[job.QueryJobConfig],
location: Optional[str],
project: str,
retry: retries.Retry,
timeout: Optional[float],
job_retry: retries.Retry,
) -> table.RowIterator:
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
"""
path = _to_query_path(project)
request_body = _to_query_request(query, job_config, location, timeout)

if os.getenv("QUERY_PREVIEW_ENABLED").casefold() == "true":
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"

@job_retry
def do_query():
request_body["requestId"] = make_job_id()
span_attributes = {"path": path}
return client._call_api(
retry,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=timeout,
)

results_or_not = do_query()

# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
future._retry_do_query = do_query # in case we have to retry later
future._job_retry = job_retry

return future
14 changes: 14 additions & 0 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3405,6 +3405,20 @@ def query(
else:
raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}")

def query_and_wait(self) -> RowIterator:
"""[Preview] Run the query and return the results."""
maybe_job = _job_helpers.query_jobs_query(
self,
query,
job_config,
location,
project,
retry,
timeout,
job_retry,
# TODO: add no job mode
)

def insert_rows(
self,
table: Union[Table, TableReference, str],
Expand Down
38 changes: 37 additions & 1 deletion google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,10 @@ def __init__(
selected_fields=None,
total_rows=None,
first_page_response=None,
location: Optional[str] = None,
job_id: Optional[str] = None,
query_id: Optional[str] = None,
project: Optional[str] = None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1575,12 +1579,44 @@ def __init__(
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
self._project = client.project if client is not None else None
self._schema = schema
self._selected_fields = selected_fields
self._table = table
self._total_rows = total_rows
self._first_page_response = first_page_response
self._location = location
self._job_id = job_id
self._query_id = query_id

if project:
self._project = project
elif client is not None:
self._project = client.project
else:
self._project = None

def project(self) -> Optional[str]:
"""GCP Project ID where these rows are read from."""
return self._project

def location(self) -> Optional[str]:
"""Location where the query executed (if applicable).
See: https://cloud.google.com/bigquery/docs/locations
"""
self._location

def job_id(self) -> Optional[str]:
"""ID of the query job (if applicable).
To get the job metadata, call
``job = client.get_job(rows.job_id, location=rows.location)``.
"""
return self._job_id

def query_id(self) -> Optional[str]:
"""ID of the stateless query (if applicable)."""
return self._query_id

def _is_completely_cached(self):
"""Check if all results are completely cached.
Expand Down

0 comments on commit 93d47e7

Please sign in to comment.