From 0f6fa6b3d2c99ec5672337eefed3b9eb054953f0 Mon Sep 17 00:00:00 2001 From: Shubha Rajan Date: Fri, 23 Aug 2019 15:10:15 -0700 Subject: [PATCH] Add `--dry_run` option to `%%bigquery` magic. (#9067) * added dry_run option to bigquery magics. when --dry_run flag is present, a QueryJob object is returned for inspection instead of an empty DataFrame * print estimated bytes instead of total bytes * updated docstring for _AsyncJob._begin * Update docstring for QueryJob._begin * added SQL query to error output and messaging for failure to save to variable in magics Co-Authored-By: Peter Lamut Co-Authored-By: Tim Swast --- bigquery/google/cloud/bigquery/job.py | 41 ++++++-- bigquery/google/cloud/bigquery/magics.py | 30 ++++++ bigquery/tests/unit/test_job.py | 33 +++++++ bigquery/tests/unit/test_magics.py | 117 +++++++++++++++++++++++ 4 files changed, 213 insertions(+), 8 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 38b3b39c2c3e..6e1eb81648f5 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -561,15 +561,16 @@ def _begin(self, client=None, retry=DEFAULT_RETRY): See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert - :type client: :class:`~google.cloud.bigquery.client.Client` or - ``NoneType`` - :param client: the client to use. If not passed, falls back to the - ``client`` stored on the current dataset. - - :type retry: :class:`google.api_core.retry.Retry` - :param retry: (Optional) How to retry the RPC. + Args: + client (Optional[google.cloud.bigquery.client.Client]): + The client to use. If not passed, falls back to the ``client`` + associated with the job object or``NoneType`` + retry (Optional[google.api_core.retry.Retry]): + How to retry the RPC. - :raises: :exc:`ValueError` if the job has already begin. + Raises: + ValueError: + If the job has already begun. """ if self.state is not None: raise ValueError("Job already begun.") @@ -2880,6 +2881,30 @@ def _format_for_exception(query, job_id): return template.format(job_id=job_id, header=header, ruler=ruler, body=body) + def _begin(self, client=None, retry=DEFAULT_RETRY): + """API call: begin the job via a POST request + + See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert + + Args: + client (Optional[google.cloud.bigquery.client.Client]): + The client to use. If not passed, falls back to the ``client`` + associated with the job object or``NoneType``. + retry (Optional[google.api_core.retry.Retry]): + How to retry the RPC. + + Raises: + ValueError: + If the job has already begun. + """ + + try: + super(QueryJob, self)._begin(client=client, retry=retry) + except exceptions.GoogleCloudError as exc: + exc.message += self._format_for_exception(self.query, self.job_id) + raise + def result(self, timeout=None, page_size=None, retry=DEFAULT_RETRY): """Start the job and wait for it to complete and get the result. diff --git a/bigquery/google/cloud/bigquery/magics.py b/bigquery/google/cloud/bigquery/magics.py index b4ec8951b0a6..b2dae2511ec8 100644 --- a/bigquery/google/cloud/bigquery/magics.py +++ b/bigquery/google/cloud/bigquery/magics.py @@ -291,6 +291,10 @@ def _run_query(client, query, job_config=None): """ start_time = time.time() query_job = client.query(query, job_config=job_config) + + if job_config and job_config.dry_run: + return query_job + print("Executing query with job ID: {}".format(query_job.job_id)) while True: @@ -324,6 +328,15 @@ def _run_query(client, query, job_config=None): "the context default_query_job_config.maximum_bytes_billed." ), ) +@magic_arguments.argument( + "--dry_run", + action="store_true", + default=False, + help=( + "Sets query to be a dry run to estimate costs. " + "Defaults to executing the query instead of dry run if this argument is not used." + ), +) @magic_arguments.argument( "--use_legacy_sql", action="store_true", @@ -410,6 +423,7 @@ def _cell_magic(line, query): job_config = bigquery.job.QueryJobConfig() job_config.query_parameters = params job_config.use_legacy_sql = args.use_legacy_sql + job_config.dry_run = args.dry_run if args.maximum_bytes_billed == "None": job_config.maximum_bytes_billed = 0 @@ -427,9 +441,25 @@ def _cell_magic(line, query): display.clear_output() if error: + if args.destination_var: + print( + "Could not save output to variable '{}'.".format(args.destination_var), + file=sys.stderr, + ) print("\nERROR:\n", error, file=sys.stderr) return + if args.dry_run and args.destination_var: + IPython.get_ipython().push({args.destination_var: query_job}) + return + elif args.dry_run: + print( + "Query validated. This query will process {} bytes.".format( + query_job.total_bytes_processed + ) + ) + return query_job + result = query_job.to_dataframe(bqstorage_client=bqstorage_client) if args.destination_var: IPython.get_ipython().push({args.destination_var: result}) diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 8bd62d7e4f51..19409d8d43c3 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -4307,6 +4307,39 @@ def test_result_error(self): expected_line = "{}:{}".format(i, line) assert expected_line in full_text + def test__begin_error(self): + from google.cloud import exceptions + + query = textwrap.dedent( + """ + SELECT foo, bar + FROM table_baz + WHERE foo == bar""" + ) + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, query, client) + call_api_patch = mock.patch( + "google.cloud.bigquery.client.Client._call_api", + autospec=True, + side_effect=exceptions.BadRequest("Syntax error in SQL query"), + ) + + with call_api_patch, self.assertRaises(exceptions.GoogleCloudError) as exc_info: + job.result() + + self.assertIsInstance(exc_info.exception, exceptions.GoogleCloudError) + self.assertEqual(exc_info.exception.code, http_client.BAD_REQUEST) + + full_text = str(exc_info.exception) + + assert job.job_id in full_text + assert "Query Job SQL Follows" in full_text + + for i, line in enumerate(query.splitlines(), start=1): + expected_line = "{}:{}".format(i, line) + assert expected_line in full_text + def test_begin_w_bound_client(self): from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.job import QueryJobConfig diff --git a/bigquery/tests/unit/test_magics.py b/bigquery/tests/unit/test_magics.py index ebe194329ec1..fbea9bdd9050 100644 --- a/bigquery/tests/unit/test_magics.py +++ b/bigquery/tests/unit/test_magics.py @@ -274,6 +274,25 @@ def test__run_query(): assert re.match("Query complete after .*s", updates[-1]) +def test__run_query_dry_run_without_errors_is_silent(): + magics.context._credentials = None + + sql = "SELECT 17" + + client_patch = mock.patch( + "google.cloud.bigquery.magics.bigquery.Client", autospec=True + ) + + job_config = job.QueryJobConfig() + job_config.dry_run = True + with client_patch as client_mock, io.capture_output() as captured: + client_mock().query(sql).job_id = None + magics._run_query(client_mock(), sql, job_config=job_config) + + assert len(captured.stderr) == 0 + assert len(captured.stdout) == 0 + + def test__make_bqstorage_client_false(): credentials_mock = mock.create_autospec( google.auth.credentials.Credentials, instance=True @@ -626,6 +645,104 @@ def test_bigquery_magic_without_bqstorage(monkeypatch): assert isinstance(return_value, pandas.DataFrame) +@pytest.mark.usefixtures("ipython_interactive") +def test_bigquery_magic_dryrun_option_sets_job_config(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics._run_query", autospec=True + ) + + sql = "SELECT 17 AS num" + + with run_query_patch as run_query_mock: + ip.run_cell_magic("bigquery", "--dry_run", sql) + + job_config_used = run_query_mock.call_args_list[0][0][-1] + assert job_config_used.dry_run is True + + +@pytest.mark.usefixtures("ipython_interactive") +def test_bigquery_magic_dryrun_option_returns_query_job(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + query_job_mock = mock.create_autospec( + google.cloud.bigquery.job.QueryJob, instance=True + ) + run_query_patch = mock.patch( + "google.cloud.bigquery.magics._run_query", autospec=True + ) + + sql = "SELECT 17 AS num" + + with run_query_patch as run_query_mock, io.capture_output() as captured_io: + run_query_mock.return_value = query_job_mock + return_value = ip.run_cell_magic("bigquery", "--dry_run", sql) + + assert "Query validated. This query will process" in captured_io.stdout + assert isinstance(return_value, job.QueryJob) + + +@pytest.mark.usefixtures("ipython_interactive") +def test_bigquery_magic_dryrun_option_variable_error_message(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + + run_query_patch = mock.patch( + "google.cloud.bigquery.magics._run_query", + autospec=True, + side_effect=exceptions.BadRequest("Syntax error in SQL query"), + ) + + sql = "SELECT SELECT 17 AS num" + + assert "q_job" not in ip.user_ns + + with run_query_patch, io.capture_output() as captured: + ip.run_cell_magic("bigquery", "q_job --dry_run", sql) + + full_text = captured.stderr + assert "Could not save output to variable 'q_job'." in full_text + + +@pytest.mark.usefixtures("ipython_interactive") +def test_bigquery_magic_dryrun_option_saves_query_job_to_variable(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context.credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + query_job_mock = mock.create_autospec( + google.cloud.bigquery.job.QueryJob, instance=True + ) + run_query_patch = mock.patch( + "google.cloud.bigquery.magics._run_query", autospec=True + ) + + sql = "SELECT 17 AS num" + + assert "q_job" not in ip.user_ns + + with run_query_patch as run_query_mock: + run_query_mock.return_value = query_job_mock + return_value = ip.run_cell_magic("bigquery", "q_job --dry_run", sql) + + assert return_value is None + assert "q_job" in ip.user_ns + q_job = ip.user_ns["q_job"] + assert isinstance(q_job, job.QueryJob) + + @pytest.mark.usefixtures("ipython_interactive") def test_bigquery_magic_w_maximum_bytes_billed_invalid(): ip = IPython.get_ipython()