diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index d14119b1f1b1..18f22270feac 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -15,6 +15,7 @@ """Define API Jobs.""" import copy +import re import threading import six @@ -45,6 +46,7 @@ _DONE_STATE = "DONE" _STOPPED_REASON = "stopped" _TIMEOUT_BUFFER_SECS = 0.1 +_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE) _ERROR_REASON_TO_EXCEPTION = { "accessDenied": http_client.FORBIDDEN, @@ -92,6 +94,29 @@ def _error_result_to_exception(error_result): ) +def _contains_order_by(query): + """Do we need to preserve the order of the query results? + + This function has known false positives, such as with ordered window + functions: + + .. code-block:: sql + + SELECT SUM(x) OVER ( + window_name + PARTITION BY... + ORDER BY... + window_frame_clause) + FROM ... + + This false positive failure case means the behavior will be correct, but + downloading results with the BigQuery Storage API may be slower than it + otherwise would. This is preferable to the false negative case, where + results are expected to be in order but are not (due to parallel reads). + """ + return query and _CONTAINS_ORDER_BY.search(query) + + class Compression(object): """The compression type to use for exported files. The default value is :attr:`NONE`. @@ -2546,7 +2571,7 @@ def from_api_repr(cls, resource, client): :returns: Job parsed from ``resource``. """ job_id, config = cls._get_resource_config(resource) - query = config["query"]["query"] + query = _helpers._get_sub_prop(config, ["query", "query"]) job = cls(job_id, query, client=client) job._set_properties(resource) return job @@ -2849,7 +2874,9 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): dest_table_ref = self.destination dest_table = Table(dest_table_ref, schema=schema) dest_table._properties["numRows"] = self._query_results.total_rows - return self._client.list_rows(dest_table, retry=retry) + rows = self._client.list_rows(dest_table, retry=retry) + rows._preserve_order = _contains_order_by(self.query) + return rows def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): """Return a pandas DataFrame from a QueryJob diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index d50fec487a31..46213d5fe8bf 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1348,6 +1348,7 @@ def __init__( ) self._field_to_index = _helpers._field_to_index_mapping(schema) self._page_size = page_size + self._preserve_order = False self._project = client.project self._schema = schema self._selected_fields = selected_fields @@ -1496,10 +1497,15 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): for field in self._selected_fields: read_options.selected_fields.append(field.name) + requested_streams = 0 + if self._preserve_order: + requested_streams = 1 + session = bqstorage_client.create_read_session( self._table.to_bqstorage(), "projects/{}".format(self._project), read_options=read_options, + requested_streams=requested_streams, ) # We need to parse the schema manually so that we can rearrange the @@ -1512,6 +1518,8 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None): if not session.streams: return pandas.DataFrame(columns=columns) + total_streams = len(session.streams) + # Use _to_dataframe_finished to notify worker threads when to quit. # See: https://stackoverflow.com/a/29237343/101923 self._to_dataframe_finished = False @@ -1560,7 +1568,7 @@ def get_frames(pool): return frames - with concurrent.futures.ThreadPoolExecutor() as pool: + with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: try: frames = get_frames(pool) finally: diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index a30c026a82c0..bb6f03f3efb3 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -17,6 +17,7 @@ import unittest import mock +import pytest from six.moves import http_client try: @@ -59,6 +60,47 @@ def _make_connection(*responses): return mock_conn +def _make_job_resource( + creation_time_ms=1437767599006, + started_time_ms=1437767600007, + ended_time_ms=1437767601008, + started=False, + ended=False, + etag="abc-def-hjk", + endpoint="https://www.googleapis.com", + job_type="load", + job_id="a-random-id", + project_id="some-project", + user_email="bq-user@example.com", +): + resource = { + "configuration": {job_type: {}}, + "statistics": {"creationTime": creation_time_ms, job_type: {}}, + "etag": etag, + "id": "{}:{}".format(project_id, job_id), + "jobReference": {"projectId": project_id, "jobId": job_id}, + "selfLink": "{}/bigquery/v2/projects/{}/jobs/{}".format( + endpoint, project_id, job_id + ), + "user_email": user_email, + } + + if started or ended: + resource["statistics"]["startTime"] = started_time_ms + + if ended: + resource["statistics"]["endTime"] = ended_time_ms + + if job_type == "query": + resource["configuration"]["query"]["destinationTable"] = { + "projectId": project_id, + "datasetId": "_temp_dataset", + "tableId": "_temp_table", + } + + return resource + + class Test__error_result_to_exception(unittest.TestCase): def _call_fut(self, *args, **kwargs): from google.cloud.bigquery import job @@ -974,6 +1016,7 @@ class _Base(object): from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.table import TableReference + ENDPOINT = "https://www.googleapis.com" PROJECT = "project" SOURCE1 = "http://example.com/source1.csv" DS_ID = "dataset_id" @@ -994,7 +1037,9 @@ def _setUpConstants(self): self.WHEN = datetime.datetime.utcfromtimestamp(self.WHEN_TS).replace(tzinfo=UTC) self.ETAG = "ETAG" self.FULL_JOB_ID = "%s:%s" % (self.PROJECT, self.JOB_ID) - self.RESOURCE_URL = "http://example.com/path/to/resource" + self.RESOURCE_URL = "{}/bigquery/v2/projects/{}/jobs/{}".format( + self.ENDPOINT, self.PROJECT, self.JOB_ID + ) self.USER_EMAIL = "phred@example.com" def _table_ref(self, table_id): @@ -1004,30 +1049,19 @@ def _table_ref(self, table_id): def _make_resource(self, started=False, ended=False): self._setUpConstants() - resource = { - "configuration": {self.JOB_TYPE: {}}, - "statistics": {"creationTime": self.WHEN_TS * 1000, self.JOB_TYPE: {}}, - "etag": self.ETAG, - "id": self.FULL_JOB_ID, - "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, - "selfLink": self.RESOURCE_URL, - "user_email": self.USER_EMAIL, - } - - if started or ended: - resource["statistics"]["startTime"] = self.WHEN_TS * 1000 - - if ended: - resource["statistics"]["endTime"] = (self.WHEN_TS + 1000) * 1000 - - if self.JOB_TYPE == "query": - resource["configuration"]["query"]["destinationTable"] = { - "projectId": self.PROJECT, - "datasetId": "_temp_dataset", - "tableId": "_temp_table", - } - - return resource + return _make_job_resource( + creation_time_ms=int(self.WHEN_TS * 1000), + started_time_ms=int(self.WHEN_TS * 1000), + ended_time_ms=int(self.WHEN_TS * 1000) + 1000000, + started=started, + ended=ended, + etag=self.ETAG, + endpoint=self.ENDPOINT, + job_type=self.JOB_TYPE, + job_id=self.JOB_ID, + project_id=self.PROJECT, + user_email=self.USER_EMAIL, + ) def _verifyInitialReadonlyProperties(self, job): # root elements of resource @@ -4684,7 +4718,11 @@ def test_to_dataframe_bqstorage(self): job.to_dataframe(bqstorage_client=bqstorage_client) bqstorage_client.create_read_session.assert_called_once_with( - mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY + mock.ANY, + "projects/{}".format(self.PROJECT), + read_options=mock.ANY, + # Use default number of streams for best performance. + requested_streams=0, ) @unittest.skipIf(pandas is None, "Requires `pandas`") @@ -5039,3 +5077,93 @@ def test_from_api_repr_normal(self): self.assertEqual(entry.pending_units, self.PENDING_UNITS) self.assertEqual(entry.completed_units, self.COMPLETED_UNITS) self.assertEqual(entry.slot_millis, self.SLOT_MILLIS) + + +@pytest.mark.parametrize( + "query,expected", + ( + (None, False), + ("", False), + ("select name, age from table", False), + ("select name, age from table LIMIT 10;", False), + ("select name, age from table order by other_column;", True), + ("Select name, age From table Order By other_column", True), + ("SELECT name, age FROM table ORDER BY other_column;", True), + ("select name, age from table order\nby other_column", True), + ("Select name, age From table Order\nBy other_column;", True), + ("SELECT name, age FROM table ORDER\nBY other_column", True), + ("SelecT name, age froM table OrdeR \n\t BY other_column;", True), + ), +) +def test__contains_order_by(query, expected): + from google.cloud.bigquery import job as mut + + if expected: + assert mut._contains_order_by(query) + else: + assert not mut._contains_order_by(query) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif( + bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" +) +@pytest.mark.parametrize( + "query", + ( + "select name, age from table order by other_column;", + "Select name, age From table Order By other_column;", + "SELECT name, age FROM table ORDER BY other_column;", + "select name, age from table order\nby other_column;", + "Select name, age From table Order\nBy other_column;", + "SELECT name, age FROM table ORDER\nBY other_column;", + "SelecT name, age froM table OrdeR \n\t BY other_column;", + ), +) +def test_to_dataframe_bqstorage_preserve_order(query): + from google.cloud.bigquery.job import QueryJob as target_class + + job_resource = _make_job_resource( + project_id="test-project", job_type="query", ended=True + ) + job_resource["configuration"]["query"]["query"] = query + job_resource["status"] = {"state": "DONE"} + get_query_results_resource = { + "jobComplete": True, + "jobReference": {"projectId": "test-project", "jobId": "test-job"}, + "schema": { + "fields": [ + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, + ] + }, + "totalRows": "4", + } + connection = _make_connection(get_query_results_resource, job_resource) + client = _make_client(connection=connection) + job = target_class.from_api_repr(job_resource, client) + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession() + session.avro_schema.schema = json.dumps( + { + "type": "record", + "name": "__root__", + "fields": [ + {"name": "name", "type": ["null", "string"]}, + {"name": "age", "type": ["null", "long"]}, + ], + } + ) + bqstorage_client.create_read_session.return_value = session + + job.to_dataframe(bqstorage_client=bqstorage_client) + + bqstorage_client.create_read_session.assert_called_once_with( + mock.ANY, + "projects/test-project", + read_options=mock.ANY, + # Use a single stream to preserve row order. + requested_streams=1, + )