Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Poll via getQueryResults method. #3844

Merged
merged 1 commit into from
Aug 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions bigquery/google/cloud/bigquery/dbapi/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,13 @@ def execute(self, operation, parameters=None, job_id=None):
query_parameters=query_parameters)
query_job.use_legacy_sql = False

# Wait for the query to finish.
try:
query_results = query_job.result()
query_job = query_job.result()
except google.cloud.exceptions.GoogleCloudError:
raise exceptions.DatabaseError(query_job.errors)

# Force the iterator to run because the query_results doesn't
# have the total_rows populated. See:
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3506
query_iterator = query_results.fetch_data()
try:
six.next(iter(query_iterator))
except StopIteration:
pass

query_results = query_job.query_results()
self._query_data = iter(
query_results.fetch_data(max_results=self.arraysize))
self._set_rowcount(query_results)
Expand Down
35 changes: 19 additions & 16 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,7 @@ def __init__(self, name, query, client,
self.udf_resources = udf_resources
self.query_parameters = query_parameters
self._configuration = _AsyncQueryConfiguration()
self._query_results = None

allow_large_results = _TypedProperty('allow_large_results', bool)
"""See
Expand Down Expand Up @@ -1284,23 +1285,25 @@ def query_results(self):
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: results instance
"""
from google.cloud.bigquery.query import QueryResults
return QueryResults.from_query_job(self)
if not self._query_results:
self._query_results = self._client.get_query_results(self.name)
return self._query_results

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.
def done(self):
"""Refresh the job and checks if it is complete.

:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.
:rtype: bool
:returns: True if the job is complete, False otherwise.
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
self._query_results = self._client.get_query_results(self.name)

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.
# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload()

:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
super(QueryJob, self).result(timeout=timeout)
# Return a QueryResults instance instead of returning the job.
return self.query_results()
return self.state == _DONE_STATE
3 changes: 2 additions & 1 deletion bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,8 @@ def test_async_query_future(self):
str(uuid.uuid4()), 'SELECT 1')
query_job.use_legacy_sql = False

iterator = query_job.result(timeout=JOB_TIMEOUT).fetch_data()
query_job = query_job.result(timeout=JOB_TIMEOUT)
iterator = query_job.query_results().fetch_data()
rows = list(iterator)
self.assertEqual(rows, [(1,)])

Expand Down
3 changes: 2 additions & 1 deletion bigquery/tests/unit/test_dbapi_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def _mock_job(
mock_job = mock.create_autospec(job.QueryJob)
mock_job.error_result = None
mock_job.state = 'DONE'
mock_job.result.return_value = self._mock_results(
mock_job.result.return_value = mock_job
mock_job.query_results.return_value = self._mock_results(
rows=rows, schema=schema,
num_dml_affected_rows=num_dml_affected_rows)
return mock_job
Expand Down
80 changes: 71 additions & 9 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def _makeResource(self, started=False, ended=False):
}

if ended:
resource['status'] = {'state': 'DONE'}
resource['statistics']['load']['inputFiles'] = self.INPUT_FILES
resource['statistics']['load']['inputFileBytes'] = self.INPUT_BYTES
resource['statistics']['load']['outputBytes'] = self.OUTPUT_BYTES
Expand Down Expand Up @@ -310,6 +311,37 @@ def test_ctor_w_schema(self):
schema=[full_name, age])
self.assertEqual(job.schema, [full_name, age])

def test_done(self):
client = _Client(self.PROJECT)
resource = self._makeResource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
self.assertTrue(job.done())

def test_result(self):
client = _Client(self.PROJECT)
resource = self._makeResource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)

result = job.result()

self.assertIs(result, job)

def test_result_invokes_begins(self):
begun_resource = self._makeResource()
done_resource = copy.deepcopy(begun_resource)
done_resource['status'] = {'state': 'DONE'}
connection = _Connection(begun_resource, done_resource)
client = _Client(self.PROJECT, connection=connection)
table = _Table()
job = self._make_one(self.JOB_NAME, table, [self.SOURCE1], client)

job.result()

self.assertEqual(len(connection._requested), 2)
begin_request, reload_request = connection._requested
self.assertEqual(begin_request['method'], 'POST')
self.assertEqual(reload_request['method'], 'GET')

def test_schema_setter_non_list(self):
client = _Client(self.PROJECT)
table = _Table()
Expand Down Expand Up @@ -1421,6 +1453,10 @@ def _makeResource(self, started=False, ended=False):
started, ended)
config = resource['configuration']['query']
config['query'] = self.QUERY

if ended:
resource['status'] = {'state': 'DONE'}

return resource

def _verifyBooleanResourceProperties(self, job, config):
Expand Down Expand Up @@ -1640,40 +1676,60 @@ def test_cancelled(self):

self.assertTrue(job.cancelled())

def test_done(self):
client = _Client(self.PROJECT)
resource = self._makeResource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
self.assertTrue(job.done())

def test_query_results(self):
from google.cloud.bigquery.query import QueryResults

client = _Client(self.PROJECT)
query_resource = {'jobComplete': True}
connection = _Connection(query_resource)
client = _Client(self.PROJECT, connection=connection)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
results = job.query_results()
self.assertIsInstance(results, QueryResults)
self.assertIs(results._job, job)

def test_result(self):
def test_query_results_w_cached_value(self):
from google.cloud.bigquery.query import QueryResults

client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
job._properties['status'] = {'state': 'DONE'}
query_results = QueryResults(None, client)
job._query_results = query_results

results = job.query_results()

self.assertIs(results, query_results)

def test_result(self):
client = _Client(self.PROJECT)
resource = self._makeResource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)

result = job.result()

self.assertIsInstance(result, QueryResults)
self.assertIs(result._job, job)
self.assertIs(result, job)

def test_result_invokes_begins(self):
begun_resource = self._makeResource()
incomplete_resource = {'jobComplete': False}
query_resource = {'jobComplete': True}
done_resource = copy.deepcopy(begun_resource)
done_resource['status'] = {'state': 'DONE'}
connection = _Connection(begun_resource, done_resource)
connection = _Connection(
begun_resource, incomplete_resource, query_resource, done_resource)
client = _Client(self.PROJECT, connection=connection)
job = self._make_one(self.JOB_NAME, self.QUERY, client)

job.result()

self.assertEqual(len(connection._requested), 2)
begin_request, reload_request = connection._requested
self.assertEqual(len(connection._requested), 4)
begin_request, _, query_request, reload_request = connection._requested
self.assertEqual(begin_request['method'], 'POST')
self.assertEqual(query_request['method'], 'GET')
self.assertEqual(reload_request['method'], 'GET')

def test_result_error(self):
Expand Down Expand Up @@ -2088,6 +2144,12 @@ def dataset(self, name):

return Dataset(name, client=self)

def get_query_results(self, job_id):
from google.cloud.bigquery.query import QueryResults

resource = self._connection.api_request(method='GET')
return QueryResults.from_api_repr(resource, self)


class _Table(object):

Expand Down