Skip to content

Commit

Permalink
Add --dry_run option to %%bigquery magic. (#9067)
Browse files Browse the repository at this point in the history
* added dry_run option to bigquery magics. when --dry_run flag is present, a QueryJob object is returned for inspection instead of an empty DataFrame
* print estimated bytes instead of total bytes
* updated docstring for _AsyncJob._begin
* Update docstring for QueryJob._begin
* added SQL query to error output and messaging for failure to save to variable in magics

Co-Authored-By: Peter Lamut <plamut@users.noreply.github.com>
Co-Authored-By: Tim Swast <swast@google.com>
  • Loading branch information
3 people committed Aug 23, 2019
1 parent 2631e44 commit 0f6fa6b
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 8 deletions.
41 changes: 33 additions & 8 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,15 +561,16 @@ def _begin(self, client=None, retry=DEFAULT_RETRY):
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
:type retry: :class:`google.api_core.retry.Retry`
:param retry: (Optional) How to retry the RPC.
Args:
client (Optional[google.cloud.bigquery.client.Client]):
The client to use. If not passed, falls back to the ``client``
associated with the job object or``NoneType``
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
:raises: :exc:`ValueError` if the job has already begin.
Raises:
ValueError:
If the job has already begun.
"""
if self.state is not None:
raise ValueError("Job already begun.")
Expand Down Expand Up @@ -2880,6 +2881,30 @@ def _format_for_exception(query, job_id):

return template.format(job_id=job_id, header=header, ruler=ruler, body=body)

def _begin(self, client=None, retry=DEFAULT_RETRY):
"""API call: begin the job via a POST request
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
Args:
client (Optional[google.cloud.bigquery.client.Client]):
The client to use. If not passed, falls back to the ``client``
associated with the job object or``NoneType``.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
Raises:
ValueError:
If the job has already begun.
"""

try:
super(QueryJob, self)._begin(client=client, retry=retry)
except exceptions.GoogleCloudError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
raise

def result(self, timeout=None, page_size=None, retry=DEFAULT_RETRY):
"""Start the job and wait for it to complete and get the result.
Expand Down
30 changes: 30 additions & 0 deletions bigquery/google/cloud/bigquery/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ def _run_query(client, query, job_config=None):
"""
start_time = time.time()
query_job = client.query(query, job_config=job_config)

if job_config and job_config.dry_run:
return query_job

print("Executing query with job ID: {}".format(query_job.job_id))

while True:
Expand Down Expand Up @@ -324,6 +328,15 @@ def _run_query(client, query, job_config=None):
"the context default_query_job_config.maximum_bytes_billed."
),
)
@magic_arguments.argument(
"--dry_run",
action="store_true",
default=False,
help=(
"Sets query to be a dry run to estimate costs. "
"Defaults to executing the query instead of dry run if this argument is not used."
),
)
@magic_arguments.argument(
"--use_legacy_sql",
action="store_true",
Expand Down Expand Up @@ -410,6 +423,7 @@ def _cell_magic(line, query):
job_config = bigquery.job.QueryJobConfig()
job_config.query_parameters = params
job_config.use_legacy_sql = args.use_legacy_sql
job_config.dry_run = args.dry_run

if args.maximum_bytes_billed == "None":
job_config.maximum_bytes_billed = 0
Expand All @@ -427,9 +441,25 @@ def _cell_magic(line, query):
display.clear_output()

if error:
if args.destination_var:
print(
"Could not save output to variable '{}'.".format(args.destination_var),
file=sys.stderr,
)
print("\nERROR:\n", error, file=sys.stderr)
return

if args.dry_run and args.destination_var:
IPython.get_ipython().push({args.destination_var: query_job})
return
elif args.dry_run:
print(
"Query validated. This query will process {} bytes.".format(
query_job.total_bytes_processed
)
)
return query_job

result = query_job.to_dataframe(bqstorage_client=bqstorage_client)
if args.destination_var:
IPython.get_ipython().push({args.destination_var: result})
Expand Down
33 changes: 33 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4307,6 +4307,39 @@ def test_result_error(self):
expected_line = "{}:{}".format(i, line)
assert expected_line in full_text

def test__begin_error(self):
from google.cloud import exceptions

query = textwrap.dedent(
"""
SELECT foo, bar
FROM table_baz
WHERE foo == bar"""
)

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, query, client)
call_api_patch = mock.patch(
"google.cloud.bigquery.client.Client._call_api",
autospec=True,
side_effect=exceptions.BadRequest("Syntax error in SQL query"),
)

with call_api_patch, self.assertRaises(exceptions.GoogleCloudError) as exc_info:
job.result()

self.assertIsInstance(exc_info.exception, exceptions.GoogleCloudError)
self.assertEqual(exc_info.exception.code, http_client.BAD_REQUEST)

full_text = str(exc_info.exception)

assert job.job_id in full_text
assert "Query Job SQL Follows" in full_text

for i, line in enumerate(query.splitlines(), start=1):
expected_line = "{}:{}".format(i, line)
assert expected_line in full_text

def test_begin_w_bound_client(self):
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.job import QueryJobConfig
Expand Down
117 changes: 117 additions & 0 deletions bigquery/tests/unit/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,25 @@ def test__run_query():
assert re.match("Query complete after .*s", updates[-1])


def test__run_query_dry_run_without_errors_is_silent():
magics.context._credentials = None

sql = "SELECT 17"

client_patch = mock.patch(
"google.cloud.bigquery.magics.bigquery.Client", autospec=True
)

job_config = job.QueryJobConfig()
job_config.dry_run = True
with client_patch as client_mock, io.capture_output() as captured:
client_mock().query(sql).job_id = None
magics._run_query(client_mock(), sql, job_config=job_config)

assert len(captured.stderr) == 0
assert len(captured.stdout) == 0


def test__make_bqstorage_client_false():
credentials_mock = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
Expand Down Expand Up @@ -626,6 +645,104 @@ def test_bigquery_magic_without_bqstorage(monkeypatch):
assert isinstance(return_value, pandas.DataFrame)


@pytest.mark.usefixtures("ipython_interactive")
def test_bigquery_magic_dryrun_option_sets_job_config():
ip = IPython.get_ipython()
ip.extension_manager.load_extension("google.cloud.bigquery")
magics.context.credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)

run_query_patch = mock.patch(
"google.cloud.bigquery.magics._run_query", autospec=True
)

sql = "SELECT 17 AS num"

with run_query_patch as run_query_mock:
ip.run_cell_magic("bigquery", "--dry_run", sql)

job_config_used = run_query_mock.call_args_list[0][0][-1]
assert job_config_used.dry_run is True


@pytest.mark.usefixtures("ipython_interactive")
def test_bigquery_magic_dryrun_option_returns_query_job():
ip = IPython.get_ipython()
ip.extension_manager.load_extension("google.cloud.bigquery")
magics.context.credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)
query_job_mock = mock.create_autospec(
google.cloud.bigquery.job.QueryJob, instance=True
)
run_query_patch = mock.patch(
"google.cloud.bigquery.magics._run_query", autospec=True
)

sql = "SELECT 17 AS num"

with run_query_patch as run_query_mock, io.capture_output() as captured_io:
run_query_mock.return_value = query_job_mock
return_value = ip.run_cell_magic("bigquery", "--dry_run", sql)

assert "Query validated. This query will process" in captured_io.stdout
assert isinstance(return_value, job.QueryJob)


@pytest.mark.usefixtures("ipython_interactive")
def test_bigquery_magic_dryrun_option_variable_error_message():
ip = IPython.get_ipython()
ip.extension_manager.load_extension("google.cloud.bigquery")
magics.context.credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)

run_query_patch = mock.patch(
"google.cloud.bigquery.magics._run_query",
autospec=True,
side_effect=exceptions.BadRequest("Syntax error in SQL query"),
)

sql = "SELECT SELECT 17 AS num"

assert "q_job" not in ip.user_ns

with run_query_patch, io.capture_output() as captured:
ip.run_cell_magic("bigquery", "q_job --dry_run", sql)

full_text = captured.stderr
assert "Could not save output to variable 'q_job'." in full_text


@pytest.mark.usefixtures("ipython_interactive")
def test_bigquery_magic_dryrun_option_saves_query_job_to_variable():
ip = IPython.get_ipython()
ip.extension_manager.load_extension("google.cloud.bigquery")
magics.context.credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)
query_job_mock = mock.create_autospec(
google.cloud.bigquery.job.QueryJob, instance=True
)
run_query_patch = mock.patch(
"google.cloud.bigquery.magics._run_query", autospec=True
)

sql = "SELECT 17 AS num"

assert "q_job" not in ip.user_ns

with run_query_patch as run_query_mock:
run_query_mock.return_value = query_job_mock
return_value = ip.run_cell_magic("bigquery", "q_job --dry_run", sql)

assert return_value is None
assert "q_job" in ip.user_ns
q_job = ip.user_ns["q_job"]
assert isinstance(q_job, job.QueryJob)


@pytest.mark.usefixtures("ipython_interactive")
def test_bigquery_magic_w_maximum_bytes_billed_invalid():
ip = IPython.get_ipython()
Expand Down

0 comments on commit 0f6fa6b

Please sign in to comment.