Skip to content

Commit

Permalink
Timeout BQ queries (#902)
Browse files Browse the repository at this point in the history
* use dynamic schema in test_grant_access_to.py

* use dynamic schema in test_grant_access_to.py

* experiment with query job cancel on timeout

* modify unit tests

* remove test grants change

* starting functional test

* update functional test and experiment with polling logic

* experiment with async wait_for

* modifying connections.py for asyncio logic

* swap back to new_event_loop

* close loop, now seeing asyncio timeoutError

* improve order and update functional test

* update unit test

* add changie

* add max_result back to result call in async path

* rescope the dbt_profile_target to being a class fixture

* raise DbtRuntimeError instead database

* remove exception type check in job timeout

---------

Co-authored-by: Matthew McKnight <matthew.mcknight@dbtlabs.com>
Co-authored-by: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 11, 2023
1 parent 0a9ab72 commit 2eb407d
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230829-162111.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Time out queries if user supplies `job_execution_timeout`
time: 2023-08-29T16:21:11.69291-07:00
custom:
Author: colin-rogers-dbt McKnight-42
Issue: "231"
23 changes: 21 additions & 2 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import functools
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass, field

from mashumaro.helper import pass_through

from functools import lru_cache
Expand Down Expand Up @@ -710,7 +713,6 @@ def _query_and_results(
# Cannot reuse job_config if destination is set and ddl is used
job_config = google.cloud.bigquery.QueryJobConfig(**job_params)
query_job = client.query(query=sql, job_config=job_config, timeout=job_creation_timeout)

if (
query_job.location is not None
and query_job.job_id is not None
Expand All @@ -720,8 +722,25 @@ def _query_and_results(
self._bq_job_link(query_job.location, query_job.project, query_job.job_id)
)

iterator = query_job.result(max_results=limit, timeout=job_execution_timeout)
# only use async logic if user specifies a timeout
if job_execution_timeout:
loop = asyncio.new_event_loop()
future_iterator = asyncio.wait_for(
loop.run_in_executor(None, functools.partial(query_job.result, max_results=limit)),
timeout=job_execution_timeout,
)

try:
iterator = loop.run_until_complete(future_iterator)
except asyncio.TimeoutError:
query_job.cancel()
raise DbtRuntimeError(
f"Query exceeded configured timeout of {job_execution_timeout}s"
)
finally:
loop.close()
else:
iterator = query_job.result(max_results=limit)
return query_job, iterator

def _retry_and_handle(self, msg, conn, fn):
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def pytest_addoption(parser):
parser.addoption("--profile", action="store", default="oauth", type=str)


@pytest.fixture(scope="session")
@pytest.fixture(scope="class")
def dbt_profile_target(request):
profile_type = request.config.getoption("--profile")
if profile_type == "oauth":
Expand Down
62 changes: 62 additions & 0 deletions tests/functional/test_job_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import pytest

from dbt.tests.util import run_dbt

_REASONABLE_TIMEOUT = 300
_SHORT_TIMEOUT = 1

_LONG_RUNNING_MODEL_SQL = """
{{ config(materialized='table') }}
with array_1 as (
select generated_ids from UNNEST(GENERATE_ARRAY(1, 200000)) AS generated_ids
),
array_2 as (
select generated_ids from UNNEST(GENERATE_ARRAY(2, 200000)) AS generated_ids
)
SELECT array_1.generated_ids
FROM array_1
LEFT JOIN array_1 as jnd on 1=1
LEFT JOIN array_2 as jnd2 on 1=1
LEFT JOIN array_1 as jnd3 on jnd3.generated_ids >= jnd2.generated_ids
"""

_SHORT_RUNNING_QUERY = """
SELECT 1 as id
"""


class TestSuccessfulJobRun:
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": _SHORT_RUNNING_QUERY,
}

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
outputs = {"default": dbt_profile_target}
outputs["default"]["job_execution_timeout_seconds"] = _REASONABLE_TIMEOUT
return {"test": {"outputs": outputs, "target": "default"}}

def test_bigquery_job_run_succeeds_within_timeout(self, project):
result = run_dbt()
assert len(result) == 1


class TestJobTimeout:
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": _LONG_RUNNING_MODEL_SQL,
}

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
outputs = {"default": dbt_profile_target}
outputs["default"]["job_execution_timeout_seconds"] = _SHORT_TIMEOUT
return {"test": {"outputs": outputs, "target": "default"}}

def test_job_timeout(self, project):
result = run_dbt(["run"], expect_pass=False) # project setup will fail
assert f"Query exceeded configured timeout of {_SHORT_TIMEOUT}s" in result[0].message
25 changes: 24 additions & 1 deletion tests/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

import agate
import decimal
import json
Expand Down Expand Up @@ -634,18 +636,39 @@ def test_drop_dataset(self):

@patch("dbt.adapters.bigquery.impl.google.cloud.bigquery")
def test_query_and_results(self, mock_bq):
self.mock_client.query = Mock(return_value=Mock(state="DONE"))
self.connections._query_and_results(
self.mock_client,
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=100,
job_execution_timeout=3,
)

mock_bq.QueryJobConfig.assert_called_once()
self.mock_client.query.assert_called_once_with(
query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15
)

@patch("dbt.adapters.bigquery.impl.google.cloud.bigquery")
def test_query_and_results_timeout(self, mock_bq):
self.mock_client.query = Mock(
return_value=Mock(result=lambda *args, **kwargs: time.sleep(4))
)
with pytest.raises(dbt.exceptions.DbtRuntimeError) as exc:
self.connections._query_and_results(
self.mock_client,
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=1,
)

mock_bq.QueryJobConfig.assert_called_once()
self.mock_client.query.assert_called_once_with(
query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15
)
assert "Query exceeded configured timeout of 1s" in str(exc.value)

def test_copy_bq_table_appends(self):
self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND)
Expand Down

0 comments on commit 2eb407d

Please sign in to comment.