Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve order in to_dataframe with BQ Storage from queries containing ORDER BY #7793

Merged
merged 3 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Define API Jobs."""

import copy
import re
import threading

import six
Expand Down Expand Up @@ -45,6 +46,7 @@
_DONE_STATE = "DONE"
_STOPPED_REASON = "stopped"
_TIMEOUT_BUFFER_SECS = 0.1
_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)

_ERROR_REASON_TO_EXCEPTION = {
"accessDenied": http_client.FORBIDDEN,
Expand Down Expand Up @@ -92,6 +94,29 @@ def _error_result_to_exception(error_result):
)


def _contains_order_by(query):
"""Do we need to preserve the order of the query results?

This function has known false positives, such as with ordered window
functions:

.. code-block:: sql

SELECT SUM(x) OVER (
window_name
PARTITION BY...
ORDER BY...
window_frame_clause)
FROM ...

This false positive failure case means the behavior will be correct, but
downloading results with the BigQuery Storage API may be slower than it
otherwise would. This is preferable to the false negative case, where
results are expected to be in order but are not (due to parallel reads).
"""
return query and _CONTAINS_ORDER_BY.search(query)


class Compression(object):
"""The compression type to use for exported files. The default value is
:attr:`NONE`.
Expand Down Expand Up @@ -2546,7 +2571,7 @@ def from_api_repr(cls, resource, client):
:returns: Job parsed from ``resource``.
"""
job_id, config = cls._get_resource_config(resource)
query = config["query"]["query"]
query = _helpers._get_sub_prop(config, ["query", "query"])
job = cls(job_id, query, client=client)
job._set_properties(resource)
return job
Expand Down Expand Up @@ -2849,7 +2874,9 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
dest_table_ref = self.destination
dest_table = Table(dest_table_ref, schema=schema)
dest_table._properties["numRows"] = self._query_results.total_rows
return self._client.list_rows(dest_table, retry=retry)
rows = self._client.list_rows(dest_table, retry=retry)
rows._preserve_order = _contains_order_by(self.query)
return rows

def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
"""Return a pandas DataFrame from a QueryJob
Expand Down
10 changes: 9 additions & 1 deletion bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,7 @@ def __init__(
)
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
self._project = client.project
self._schema = schema
self._selected_fields = selected_fields
Expand Down Expand Up @@ -1496,10 +1497,15 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
for field in self._selected_fields:
read_options.selected_fields.append(field.name)

requested_streams = 0
if self._preserve_order:
requested_streams = 1

session = bqstorage_client.create_read_session(
self._table.to_bqstorage(),
"projects/{}".format(self._project),
read_options=read_options,
requested_streams=requested_streams,
)

# We need to parse the schema manually so that we can rearrange the
Expand All @@ -1512,6 +1518,8 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
if not session.streams:
return pandas.DataFrame(columns=columns)

total_streams = len(session.streams)

# Use _to_dataframe_finished to notify worker threads when to quit.
# See: https://stackoverflow.com/a/29237343/101923
self._to_dataframe_finished = False
Expand Down Expand Up @@ -1560,7 +1568,7 @@ def get_frames(pool):

return frames

with concurrent.futures.ThreadPoolExecutor() as pool:
with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
try:
frames = get_frames(pool)
finally:
Expand Down
180 changes: 154 additions & 26 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import unittest

import mock
import pytest
from six.moves import http_client

try:
Expand Down Expand Up @@ -59,6 +60,47 @@ def _make_connection(*responses):
return mock_conn


def _make_job_resource(
creation_time_ms=1437767599006,
started_time_ms=1437767600007,
ended_time_ms=1437767601008,
started=False,
ended=False,
etag="abc-def-hjk",
endpoint="https://www.googleapis.com",
job_type="load",
job_id="a-random-id",
project_id="some-project",
user_email="bq-user@example.com",
):
resource = {
"configuration": {job_type: {}},
"statistics": {"creationTime": creation_time_ms, job_type: {}},
"etag": etag,
"id": "{}:{}".format(project_id, job_id),
"jobReference": {"projectId": project_id, "jobId": job_id},
"selfLink": "{}/bigquery/v2/projects/{}/jobs/{}".format(
endpoint, project_id, job_id
),
"user_email": user_email,
}

if started or ended:
resource["statistics"]["startTime"] = started_time_ms

if ended:
resource["statistics"]["endTime"] = ended_time_ms

if job_type == "query":
resource["configuration"]["query"]["destinationTable"] = {
"projectId": project_id,
"datasetId": "_temp_dataset",
"tableId": "_temp_table",
}

return resource


class Test__error_result_to_exception(unittest.TestCase):
def _call_fut(self, *args, **kwargs):
from google.cloud.bigquery import job
Expand Down Expand Up @@ -974,6 +1016,7 @@ class _Base(object):
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.table import TableReference

ENDPOINT = "https://www.googleapis.com"
PROJECT = "project"
SOURCE1 = "http://example.com/source1.csv"
DS_ID = "dataset_id"
Expand All @@ -994,7 +1037,9 @@ def _setUpConstants(self):
self.WHEN = datetime.datetime.utcfromtimestamp(self.WHEN_TS).replace(tzinfo=UTC)
self.ETAG = "ETAG"
self.FULL_JOB_ID = "%s:%s" % (self.PROJECT, self.JOB_ID)
self.RESOURCE_URL = "http://example.com/path/to/resource"
self.RESOURCE_URL = "{}/bigquery/v2/projects/{}/jobs/{}".format(
self.ENDPOINT, self.PROJECT, self.JOB_ID
)
self.USER_EMAIL = "phred@example.com"

def _table_ref(self, table_id):
Expand All @@ -1004,30 +1049,19 @@ def _table_ref(self, table_id):

def _make_resource(self, started=False, ended=False):
self._setUpConstants()
resource = {
"configuration": {self.JOB_TYPE: {}},
"statistics": {"creationTime": self.WHEN_TS * 1000, self.JOB_TYPE: {}},
"etag": self.ETAG,
"id": self.FULL_JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"selfLink": self.RESOURCE_URL,
"user_email": self.USER_EMAIL,
}

if started or ended:
resource["statistics"]["startTime"] = self.WHEN_TS * 1000

if ended:
resource["statistics"]["endTime"] = (self.WHEN_TS + 1000) * 1000

if self.JOB_TYPE == "query":
resource["configuration"]["query"]["destinationTable"] = {
"projectId": self.PROJECT,
"datasetId": "_temp_dataset",
"tableId": "_temp_table",
}

return resource
return _make_job_resource(
creation_time_ms=int(self.WHEN_TS * 1000),
started_time_ms=int(self.WHEN_TS * 1000),
ended_time_ms=int(self.WHEN_TS * 1000) + 1000000,
started=started,
ended=ended,
etag=self.ETAG,
endpoint=self.ENDPOINT,
job_type=self.JOB_TYPE,
job_id=self.JOB_ID,
project_id=self.PROJECT,
user_email=self.USER_EMAIL,
)

def _verifyInitialReadonlyProperties(self, job):
# root elements of resource
Expand Down Expand Up @@ -4684,7 +4718,11 @@ def test_to_dataframe_bqstorage(self):
job.to_dataframe(bqstorage_client=bqstorage_client)

bqstorage_client.create_read_session.assert_called_once_with(
mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY
mock.ANY,
"projects/{}".format(self.PROJECT),
read_options=mock.ANY,
# Use default number of streams for best performance.
requested_streams=0,
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
Expand Down Expand Up @@ -5039,3 +5077,93 @@ def test_from_api_repr_normal(self):
self.assertEqual(entry.pending_units, self.PENDING_UNITS)
self.assertEqual(entry.completed_units, self.COMPLETED_UNITS)
self.assertEqual(entry.slot_millis, self.SLOT_MILLIS)


@pytest.mark.parametrize(
"query,expected",
(
(None, False),
("", False),
("select name, age from table", False),
("select name, age from table LIMIT 10;", False),
("select name, age from table order by other_column;", True),
("Select name, age From table Order By other_column", True),
("SELECT name, age FROM table ORDER BY other_column;", True),
("select name, age from table order\nby other_column", True),
("Select name, age From table Order\nBy other_column;", True),
("SELECT name, age FROM table ORDER\nBY other_column", True),
("SelecT name, age froM table OrdeR \n\t BY other_column;", True),
),
)
def test__contains_order_by(query, expected):
from google.cloud.bigquery import job as mut

if expected:
assert mut._contains_order_by(query)
else:
assert not mut._contains_order_by(query)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
)
@pytest.mark.parametrize(
"query",
(
"select name, age from table order by other_column;",
"Select name, age From table Order By other_column;",
"SELECT name, age FROM table ORDER BY other_column;",
"select name, age from table order\nby other_column;",
"Select name, age From table Order\nBy other_column;",
"SELECT name, age FROM table ORDER\nBY other_column;",
"SelecT name, age froM table OrdeR \n\t BY other_column;",
),
)
def test_to_dataframe_bqstorage_preserve_order(query):
from google.cloud.bigquery.job import QueryJob as target_class

job_resource = _make_job_resource(
project_id="test-project", job_type="query", ended=True
)
job_resource["configuration"]["query"]["query"] = query
job_resource["status"] = {"state": "DONE"}
get_query_results_resource = {
"jobComplete": True,
"jobReference": {"projectId": "test-project", "jobId": "test-job"},
"schema": {
"fields": [
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
"totalRows": "4",
}
connection = _make_connection(get_query_results_resource, job_resource)
client = _make_client(connection=connection)
job = target_class.from_api_repr(job_resource, client)
bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession()
session.avro_schema.schema = json.dumps(
{
"type": "record",
"name": "__root__",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "long"]},
],
}
)
bqstorage_client.create_read_session.return_value = session

job.to_dataframe(bqstorage_client=bqstorage_client)

bqstorage_client.create_read_session.assert_called_once_with(
mock.ANY,
"projects/test-project",
read_options=mock.ANY,
# Use a single stream to preserve row order.
requested_streams=1,
)