Skip to content

Commit

Permalink
feat: use faster query_and_wait method from google-cloud-bigquery whe…
Browse files Browse the repository at this point in the history
…n available

fix unit tests

fix python 3.7

fix python 3.7

fix python 3.7

fix python 3.7

fix wait_timeout units

boost test coverage

remove dead code

boost a little more coverage
  • Loading branch information
tswast committed Jan 16, 2024
1 parent e1c384e commit 7739f41
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 89 deletions.
10 changes: 9 additions & 1 deletion pandas_gbq/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

"""Module for checking dependency versions and supported features."""

# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
# https://github.com/googleapis/python-bigquery/blob/main/CHANGELOG.md
BIGQUERY_MINIMUM_VERSION = "3.3.5"
BIGQUERY_QUERY_AND_WAIT_VERSION = "3.14.0"
PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0"
PANDAS_BOOLEAN_DTYPE_VERSION = "1.0.0"
PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION = "1.1.0"
Expand Down Expand Up @@ -45,6 +46,13 @@ def bigquery_try_import(self):

return google.cloud.bigquery

@property
def bigquery_has_query_and_wait(self):
import packaging.version

min_version = packaging.version.parse(BIGQUERY_QUERY_AND_WAIT_VERSION)
return self.bigquery_installed_version >= min_version

@property
def pandas_installed_version(self):
import pandas
Expand Down
44 changes: 31 additions & 13 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,17 @@ def process_http_error(ex):
# See `BigQuery Troubleshooting Errors
# <https://cloud.google.com/bigquery/troubleshooting-errors>`__

if "cancelled" in ex.message:
message = (
ex.message.casefold()
if hasattr(ex, "message") and ex.message is not None
else ""
)
if "cancelled" in message:
raise QueryTimeout("Reason: {0}".format(ex))
elif "Provided Schema does not match" in ex.message:
elif "schema does not match" in message:
error_message = ex.errors[0]["message"]
raise InvalidSchema(f"Reason: {error_message}")
elif "Already Exists: Table" in ex.message:
elif "already exists: table" in message:
error_message = ex.errors[0]["message"]
raise TableCreationError(f"Reason: {error_message}")
else:
Expand Down Expand Up @@ -410,16 +415,29 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):

self._start_timer()
job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict)
rows_iter = pandas_gbq.query.query_and_wait(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)

if FEATURES.bigquery_has_query_and_wait:
rows_iter = pandas_gbq.query.query_and_wait_via_client_library(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)
else:
rows_iter = pandas_gbq.query.query_and_wait(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)

dtypes = kwargs.get("dtypes")
return self._download_results(
Expand Down
75 changes: 56 additions & 19 deletions pandas_gbq/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from __future__ import annotations

import concurrent.futures
import functools
import logging
from typing import Optional

import google.auth.exceptions
from google.cloud import bigquery

import pandas_gbq.exceptions
Expand Down Expand Up @@ -78,6 +80,26 @@ def _wait_for_query_job(
connector.process_http_error(ex)


def try_query(connector, query_fn):
try:
logger.debug("Requesting query... ")
return query_fn()
except concurrent.futures.TimeoutError as ex:
raise pandas_gbq.exceptions.QueryTimeout("Reason: {0}".format(ex))
except (google.auth.exceptions.RefreshError, ValueError) as ex:
if connector.private_key:
raise pandas_gbq.exceptions.AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise pandas_gbq.exceptions.AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except connector.http_error as ex:
connector.process_http_error(ex)


def query_and_wait(
connector,
client: bigquery.Client,
Expand Down Expand Up @@ -122,29 +144,17 @@ def query_and_wait(
Result iterator from which we can download the results in the
desired format (pandas.DataFrame).
"""
from google.auth.exceptions import RefreshError

try:
logger.debug("Requesting query... ")
query_reply = client.query(
query_reply = try_query(
connector,
functools.partial(
client.query,
query,
job_config=job_config,
location=location,
project=project_id,
)
logger.debug("Query running...")
except (RefreshError, ValueError) as ex:
if connector.private_key:
raise pandas_gbq.exceptions.AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise pandas_gbq.exceptions.AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except connector.http_error as ex:
connector.process_http_error(ex)
),
)
logger.debug("Query running...")

job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)
Expand Down Expand Up @@ -173,3 +183,30 @@ def query_and_wait(
return query_reply.result(max_results=max_results)
except connector.http_error as ex:
connector.process_http_error(ex)


def query_and_wait_via_client_library(
connector,
client: bigquery.Client,
query: str,
*,
job_config: bigquery.QueryJobConfig,
location: Optional[str],
project_id: Optional[str],
max_results: Optional[int],
timeout_ms: Optional[int],
):
rows_iter = try_query(
connector,
functools.partial(
client.query_and_wait,
query,
job_config=job_config,
location=location,
project=project_id,
max_results=max_results,
wait_timeout=timeout_ms / 1000.0 if timeout_ms else None,
),
)
logger.debug("Query done.\n")
return rows_iter
27 changes: 0 additions & 27 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,33 +426,6 @@ def test_query_inside_configuration(self, project_id):
)
tm.assert_frame_equal(df, DataFrame({"valid_string": ["PI"]}))

def test_configuration_without_query(self, project_id):
sql_statement = "SELECT 1"
config = {
"copy": {
"sourceTable": {
"projectId": project_id,
"datasetId": "publicdata:samples",
"tableId": "wikipedia",
},
"destinationTable": {
"projectId": project_id,
"datasetId": "publicdata:samples",
"tableId": "wikipedia_copied",
},
}
}
# Test that only 'query' configurations are supported
# nor 'copy','load','extract'
with pytest.raises(ValueError):
gbq.read_gbq(
sql_statement,
project_id=project_id,
credentials=self.credentials,
configuration=config,
dialect="legacy",
)

def test_configuration_raises_value_error_with_multiple_config(self, project_id):
sql_statement = "SELECT 1"
config = {
Expand Down
42 changes: 41 additions & 1 deletion tests/unit/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import google.cloud.bigquery
import google.cloud.bigquery.table
import packaging.version
import pytest


Expand Down Expand Up @@ -55,8 +56,15 @@ def test_read_gbq_should_save_credentials(mock_get_credentials):
mock_get_credentials.assert_not_called()


def test_read_gbq_should_use_dialect(mock_bigquery_client):
def test_read_gbq_should_use_dialect_with_query(monkeypatch, mock_bigquery_client):
import pandas_gbq
import pandas_gbq.features

monkeypatch.setattr(
pandas_gbq.features.FEATURES,
"_bigquery_installed_version",
packaging.version.parse(pandas_gbq.features.BIGQUERY_MINIMUM_VERSION),
)

assert pandas_gbq.context.dialect is None
pandas_gbq.context.dialect = "legacy"
Expand All @@ -71,3 +79,35 @@ def test_read_gbq_should_use_dialect(mock_bigquery_client):
_, kwargs = mock_bigquery_client.query.call_args
assert not kwargs["job_config"].use_legacy_sql
pandas_gbq.context.dialect = None # Reset the global state.


def test_read_gbq_should_use_dialect_with_query_and_wait(
monkeypatch, mock_bigquery_client
):
if not hasattr(mock_bigquery_client, "query_and_wait"):
pytest.skip(
f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait"
)

import pandas_gbq
import pandas_gbq.features

monkeypatch.setattr(
pandas_gbq.features.FEATURES,
"_bigquery_installed_version",
packaging.version.parse(pandas_gbq.features.BIGQUERY_QUERY_AND_WAIT_VERSION),
)

assert pandas_gbq.context.dialect is None
pandas_gbq.context.dialect = "legacy"
pandas_gbq.read_gbq("SELECT 1")

_, kwargs = mock_bigquery_client.query_and_wait.call_args
assert kwargs["job_config"].use_legacy_sql

pandas_gbq.context.dialect = "standard"
pandas_gbq.read_gbq("SELECT 1")

_, kwargs = mock_bigquery_client.query_and_wait.call_args
assert not kwargs["job_config"].use_legacy_sql
pandas_gbq.context.dialect = None # Reset the global state.
17 changes: 17 additions & 0 deletions tests/unit/test_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ def fresh_bigquery_version(monkeypatch):
monkeypatch.setattr(FEATURES, "_pandas_installed_version", None)


@pytest.mark.parametrize(
["bigquery_version", "expected"],
[
("1.99.100", False),
("2.99.999", False),
("3.13.11", False),
("3.14.0", True),
("4.999.999", True),
],
)
def test_bigquery_has_query_and_wait(monkeypatch, bigquery_version, expected):
import google.cloud.bigquery

monkeypatch.setattr(google.cloud.bigquery, "__version__", bigquery_version)
assert FEATURES.bigquery_has_query_and_wait == expected


@pytest.mark.parametrize(
["pandas_version", "expected"],
[
Expand Down
Loading

0 comments on commit 7739f41

Please sign in to comment.