Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: DB-API uses more efficient query_and_wait when no job ID is provided #1747

Merged
merged 58 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b8c583a
perf: use the first page a results when `query(api_method="QUERY")`
tswast Nov 15, 2023
6a8059d
add tests
tswast Nov 15, 2023
1f0e38e
respect max_results with cached page
tswast Nov 16, 2023
4401725
respect page_size, also avoid bqstorage if almost fully downloaded
tswast Nov 16, 2023
d078941
skip true test if bqstorage not installed
tswast Nov 16, 2023
660aa76
Merge remote-tracking branch 'origin/main' into issue589-RowIterator.…
tswast Nov 16, 2023
476bcd7
coverage
tswast Nov 16, 2023
05d6a3e
Merge remote-tracking branch 'origin/main' into issue589-RowIterator.…
tswast Nov 20, 2023
c16e4be
feat: add `Client.query_and_wait` which directly returns a `RowIterat…
tswast Nov 15, 2023
222f91b
implement basic query_and_wait and add code sample to test
tswast Nov 20, 2023
73e5817
avoid duplicated QueryJob construction
tswast Nov 20, 2023
9508121
update unit tests
tswast Nov 21, 2023
543481d
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 21, 2023
85f1cab
fix merge conflict in rowiterator
tswast Nov 21, 2023
c0e6c86
support max_results, add tests
tswast Nov 21, 2023
d4a322d
retry tests
tswast Nov 21, 2023
e0b2d2e
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 21, 2023
9daccbd
unit test coverage
tswast Nov 22, 2023
bba36d2
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 22, 2023
adf0b49
dont retry twice
tswast Nov 22, 2023
6dfbf92
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 22, 2023
765a644
fix mypy_samples session
tswast Nov 27, 2023
e461ebe
consolidate docstrings for query_and_wait
tswast Nov 27, 2023
895b6d0
remove mention of job ID
tswast Nov 28, 2023
d5345cd
fallback to jobs.insert for unsupported features
tswast Nov 29, 2023
f75d8ab
distinguish API timeout from wait timeout
tswast Nov 29, 2023
baff9d6
add test for jobs.insert fallback
tswast Nov 29, 2023
221898d
populate default job config
tswast Nov 29, 2023
18f825a
refactor default config
tswast Nov 29, 2023
5afbc41
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 29, 2023
08167d8
add coverage for job_config
tswast Nov 29, 2023
3e10ea4
cancel job if hasn't finished
tswast Nov 29, 2023
dc5e5be
mypy
tswast Nov 30, 2023
f1556bc
allow unrealeased features in samples
tswast Dec 1, 2023
db71a1b
Merge branch 'main' into issue589-query_and_wait
tswast Dec 3, 2023
bd7e767
fix for 3.12
tswast Dec 4, 2023
4ffec17
Merge branch 'main' into issue589-query_and_wait
tswast Dec 6, 2023
95b3b0e
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Dec 7, 2023
ed317ec
perf: DB-API uses more efficient `query_and_wait` when no job ID is p…
tswast Dec 8, 2023
f08dac3
fix: keep `RowIterator.total_rows` populated after iteration
tswast Dec 8, 2023
5b324ee
Merge remote-tracking branch 'origin/issue589-query_and_wait' into b1…
tswast Dec 8, 2023
3b20ed7
fix unit tests and query_job property
tswast Dec 8, 2023
a376bd6
Update google/cloud/bigquery/table.py
tswast Dec 8, 2023
4019bbf
Merge remote-tracking branch 'origin/issue589-total_rows' into b1745-…
tswast Dec 8, 2023
7c3d813
Merge remote-tracking branch 'origin/issue589-query_and_wait' into is…
tswast Dec 8, 2023
304799a
Merge remote-tracking branch 'origin/issue589-total_rows' into issue5…
tswast Dec 8, 2023
425f6b0
fix comments
tswast Dec 8, 2023
ae06cd2
Merge branch 'main' into issue589-query_and_wait
tswast Dec 8, 2023
2baa5d3
Merge remote-tracking branch 'origin/issue589-query_and_wait' into b1…
tswast Dec 8, 2023
e819421
Merge remote-tracking branch 'origin/main' into b1745-dbapi-query_and…
tswast Dec 8, 2023
4771602
fix unit tests
tswast Dec 11, 2023
b435e41
unit test coverage
tswast Dec 11, 2023
64bf4a4
more coverage
tswast Dec 11, 2023
225049e
coverage for real
tswast Dec 11, 2023
5154866
Merge branch 'main' into b1745-dbapi-query_and_wait
tswast Dec 12, 2023
3b60b5b
Merge branch 'main' into b1745-dbapi-query_and_wait
tswast Dec 14, 2023
83679ab
Merge branch 'main' into b1745-dbapi-query_and_wait
Linchin Dec 15, 2023
0bd35e5
Merge branch 'main' into b1745-dbapi-query_and_wait
tswast Dec 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ def do_query():
job_id=query_results.job_id,
query_id=query_results.query_id,
project=query_results.project,
num_dml_affected_rows=query_results.num_dml_affected_rows,
)

if job_retry is not None:
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3963,6 +3963,7 @@ def _list_rows_from_query_results(
timeout: TimeoutType = DEFAULT_TIMEOUT,
query_id: Optional[str] = None,
first_page_response: Optional[Dict[str, Any]] = None,
num_dml_affected_rows: Optional[int] = None,
) -> RowIterator:
"""List the rows of a completed query.
See
Expand Down Expand Up @@ -4007,6 +4008,10 @@ def _list_rows_from_query_results(
and not guaranteed to be populated.
first_page_response (Optional[dict]):
API response for the first page of results (if available).
num_dml_affected_rows (Optional[int]):
If this RowIterator is the result of a DML query, the number of
rows that were affected.

Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
Expand Down Expand Up @@ -4047,6 +4052,7 @@ def _list_rows_from_query_results(
job_id=job_id,
query_id=query_id,
first_page_response=first_page_response,
num_dml_affected_rows=num_dml_affected_rows,
)
return row_iterator

Expand Down
122 changes: 69 additions & 53 deletions google/cloud/bigquery/dbapi/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

"""Cursor for the Google BigQuery DB-API."""

from __future__ import annotations

import collections
from collections import abc as collections_abc
import copy
import logging
import re
from typing import Optional

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
Expand All @@ -34,8 +35,6 @@
import google.cloud.exceptions # type: ignore


_LOGGER = logging.getLogger(__name__)

# Per PEP 249: A 7-item sequence containing information describing one result
# column. The first two items (name and type_code) are mandatory, the other
# five are optional and are set to None if no meaningful values can be
Expand Down Expand Up @@ -76,18 +75,31 @@ def __init__(self, connection):
# most appropriate size.
self.arraysize = None
self._query_data = None
self._query_job = None
self._query_rows = None
self._closed = False

@property
def query_job(self):
"""google.cloud.bigquery.job.query.QueryJob: The query job created by
the last ``execute*()`` call.
def query_job(self) -> Optional[job.QueryJob]:
"""google.cloud.bigquery.job.query.QueryJob | None: The query job
created by the last ``execute*()`` call, if a query job was created.

.. note::
If the last ``execute*()`` call was ``executemany()``, this is the
last job created by ``executemany()``."""
return self._query_job
rows = self._query_rows

if rows is None:
return None

job_id = rows.job_id
project = rows.project
location = rows.location
client = self.connection._client

if job_id is None:
return None

return client.get_job(job_id, location=location, project=project)

def close(self):
"""Mark the cursor as closed, preventing its further use."""
Expand Down Expand Up @@ -117,8 +129,8 @@ def _set_description(self, schema):
for field in schema
)

def _set_rowcount(self, query_results):
"""Set the rowcount from query results.
def _set_rowcount(self, rows):
"""Set the rowcount from a RowIterator.

Normally, this sets rowcount to the number of rows returned by the
query, but if it was a DML statement, it sets rowcount to the number
Expand All @@ -129,10 +141,10 @@ def _set_rowcount(self, query_results):
Results of a query.
"""
total_rows = 0
num_dml_affected_rows = query_results.num_dml_affected_rows
num_dml_affected_rows = rows.num_dml_affected_rows

if query_results.total_rows is not None and query_results.total_rows > 0:
total_rows = query_results.total_rows
if rows.total_rows is not None and rows.total_rows > 0:
total_rows = rows.total_rows
if num_dml_affected_rows is not None and num_dml_affected_rows > 0:
total_rows = num_dml_affected_rows
self.rowcount = total_rows
Expand Down Expand Up @@ -165,9 +177,10 @@ def execute(self, operation, parameters=None, job_id=None, job_config=None):
parameters (Union[Mapping[str, Any], Sequence[Any]]):
(Optional) dictionary or sequence of parameter values.

job_id (str):
(Optional) The job_id to use. If not set, a job ID
is generated at random.
job_id (str | None):
(Optional and discouraged) The job ID to use when creating
the query job. For best performance and reliability, manually
setting a job ID is discouraged.

job_config (google.cloud.bigquery.job.QueryJobConfig):
(Optional) Extra configuration options for the query job.
Expand All @@ -181,7 +194,7 @@ def _execute(
self, formatted_operation, parameters, job_id, job_config, parameter_types
):
self._query_data = None
self._query_job = None
self._query_results = None
client = self.connection._client

# The DB-API uses the pyformat formatting, since the way BigQuery does
Expand All @@ -190,33 +203,35 @@ def _execute(
# libraries.
query_parameters = _helpers.to_query_parameters(parameters, parameter_types)

if client._default_query_job_config:
if job_config:
config = job_config._fill_from_default(client._default_query_job_config)
else:
config = copy.deepcopy(client._default_query_job_config)
else:
config = job_config or job.QueryJobConfig(use_legacy_sql=False)

config = job_config or job.QueryJobConfig()
config.query_parameters = query_parameters
self._query_job = client.query(
formatted_operation, job_config=config, job_id=job_id
)

if self._query_job.dry_run:
self._set_description(schema=None)
self.rowcount = 0
return

# Wait for the query to finish.
# Start the query and wait for the query to finish.
try:
self._query_job.result()
if job_id is not None:
rows = client.query(
formatted_operation,
job_config=job_config,
job_id=job_id,
).result(
page_size=self.arraysize,
)
else:
rows = client.query_and_wait(
formatted_operation,
job_config=config,
page_size=self.arraysize,
)
except google.cloud.exceptions.GoogleCloudError as exc:
raise exceptions.DatabaseError(exc)

query_results = self._query_job._query_results
self._set_rowcount(query_results)
self._set_description(query_results.schema)
self._query_rows = rows
self._set_description(rows.schema)

if config.dry_run:
self.rowcount = 0
else:
self._set_rowcount(rows)

def executemany(self, operation, seq_of_parameters):
"""Prepare and execute a database operation multiple times.
Expand Down Expand Up @@ -250,25 +265,26 @@ def _try_fetch(self, size=None):

Mutates self to indicate that iteration has started.
"""
if self._query_job is None:
if self._query_data is not None:
# Already started fetching the data.
return

rows = self._query_rows
if rows is None:
raise exceptions.InterfaceError(
"No query results: execute() must be called before fetch."
)

if self._query_job.dry_run:
self._query_data = iter([])
bqstorage_client = self.connection._bqstorage_client
if rows._should_use_bqstorage(
bqstorage_client,
create_bqstorage_client=False,
):
rows_iterable = self._bqstorage_fetch(bqstorage_client)
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
return

if self._query_data is None:
bqstorage_client = self.connection._bqstorage_client

if bqstorage_client is not None:
rows_iterable = self._bqstorage_fetch(bqstorage_client)
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
return

rows_iter = self._query_job.result(page_size=self.arraysize)
self._query_data = iter(rows_iter)
self._query_data = iter(rows)

def _bqstorage_fetch(self, bqstorage_client):
"""Start fetching data with the BigQuery Storage API.
Expand All @@ -290,7 +306,7 @@ def _bqstorage_fetch(self, bqstorage_client):
# bigquery_storage can indeed be imported here without errors.
from google.cloud import bigquery_storage

table_reference = self._query_job.destination
table_reference = self._query_rows._table

requested_session = bigquery_storage.types.ReadSession(
table=table_reference.to_bqstorage(),
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,7 @@ def do_get_result():
project=self.project,
job_id=self.job_id,
query_id=self.query_id,
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
)

# We know that there's at least 1 row, so only treat the response from
Expand All @@ -1639,6 +1640,7 @@ def do_get_result():
timeout=timeout,
query_id=self.query_id,
first_page_response=first_page_response,
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
32 changes: 24 additions & 8 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,7 @@ def __init__(
job_id: Optional[str] = None,
query_id: Optional[str] = None,
project: Optional[str] = None,
num_dml_affected_rows: Optional[int] = None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1592,6 +1593,7 @@ def __init__(
self._job_id = job_id
self._query_id = query_id
self._project = project
self._num_dml_affected_rows = num_dml_affected_rows

@property
def _billing_project(self) -> Optional[str]:
Expand All @@ -1616,6 +1618,16 @@ def location(self) -> Optional[str]:
"""
return self._location

@property
def num_dml_affected_rows(self) -> Optional[int]:
"""If this RowIterator is the result of a DML query, the number of
rows that were affected.

See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#body.QueryResponse.FIELDS.num_dml_affected_rows
"""
return self._num_dml_affected_rows

@property
def project(self) -> Optional[str]:
"""GCP Project ID where these rows are read from."""
Expand All @@ -1635,7 +1647,10 @@ def _is_almost_completely_cached(self):
This is useful to know, because we can avoid alternative download
mechanisms.
"""
if self._first_page_response is None:
if (
not hasattr(self, "_first_page_response")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we set at line 1591 self._first_page_response = first_page_response, this attribute will always exist? Maybe we can check whether the value is None or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also was needed for some tests where we have a mock row iterator but want to test with a real implementation of this method.

or self._first_page_response is None
):
return False

total_cached_rows = len(self._first_page_response.get(self._items_key, []))
Expand All @@ -1655,7 +1670,7 @@ def _is_almost_completely_cached(self):

return False

def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
def _should_use_bqstorage(self, bqstorage_client, create_bqstorage_client):
"""Returns True if the BigQuery Storage API can be used.

Returns:
Expand All @@ -1669,8 +1684,9 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
if self._table is None:
return False

# The developer is manually paging through results if this is set.
if self.next_page_token is not None:
# The developer has already started paging through results if
# next_page_token is set.
if hasattr(self, "next_page_token") and self.next_page_token is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my education, it looks like attribute next_page_token is inherited from "grandparent" class Iterator from the core library, which creates this attribute at init. Is it necessary to check whether this attribute exist or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was purely for some failing unit tests where this superclass was mocked out.

return False

if self._is_almost_completely_cached():
Expand Down Expand Up @@ -1726,7 +1742,7 @@ def schema(self):

@property
def total_rows(self):
"""int: The total number of rows in the table."""
"""int: The total number of rows in the table or query results."""
return self._total_rows

def _maybe_warn_max_results(
Expand All @@ -1752,7 +1768,7 @@ def _maybe_warn_max_results(
def _to_page_iterable(
self, bqstorage_download, tabledata_list_download, bqstorage_client=None
):
if not self._validate_bqstorage(bqstorage_client, False):
if not self._should_use_bqstorage(bqstorage_client, False):
bqstorage_client = None

result_pages = (
Expand Down Expand Up @@ -1882,7 +1898,7 @@ def to_arrow(

self._maybe_warn_max_results(bqstorage_client)

if not self._validate_bqstorage(bqstorage_client, create_bqstorage_client):
if not self._should_use_bqstorage(bqstorage_client, create_bqstorage_client):
create_bqstorage_client = False
bqstorage_client = None

Expand Down Expand Up @@ -2223,7 +2239,7 @@ def to_dataframe(

self._maybe_warn_max_results(bqstorage_client)

if not self._validate_bqstorage(bqstorage_client, create_bqstorage_client):
if not self._should_use_bqstorage(bqstorage_client, create_bqstorage_client):
create_bqstorage_client = False
bqstorage_client = None

Expand Down
Loading