From 622b42099605cccb4d1c68bb4512fa14ed2037d1 Mon Sep 17 00:00:00 2001 From: Colin Date: Fri, 9 Jun 2023 13:29:27 -0700 Subject: [PATCH 1/2] Pass python model timeout to polling instead of retry --- .../unreleased/Fixes-20230609-132727.yaml | 7 +++++ dbt/adapters/bigquery/python_submissions.py | 8 ++++-- tests/functional/adapter/test_python_model.py | 28 +++++++++++++++++-- tox.ini | 2 +- 4 files changed, 39 insertions(+), 6 deletions(-) create mode 100644 .changes/unreleased/Fixes-20230609-132727.yaml diff --git a/.changes/unreleased/Fixes-20230609-132727.yaml b/.changes/unreleased/Fixes-20230609-132727.yaml new file mode 100644 index 000000000..c37d2e72a --- /dev/null +++ b/.changes/unreleased/Fixes-20230609-132727.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Pass python model timeout to polling operation so model execution times out + as expected. +time: 2023-06-09T13:27:27.279842-07:00 +custom: + Author: colin-rogers-dbt + Issue: "577" diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index e5fbf037e..caa16cab6 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -1,6 +1,8 @@ from typing import Dict, Union from dbt.adapters.base import PythonJobHelper +from google.api_core.future.polling import POLLING_PREDICATE + from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials from dbt.adapters.bigquery.connections import DataprocBatchConfig from google.api_core import retry @@ -43,7 +45,7 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: self.timeout = self.parsed_model["config"].get( "timeout", self.credential.job_execution_timeout_seconds or 60 * 60 * 24 ) - self.retry = retry.Retry(maximum=10.0, deadline=self.timeout) + self.retry = retry.Retry(predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout) self.client_options = ClientOptions( api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region) ) @@ -98,7 +100,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: "job": job, } ) - response = operation.result(retry=self.retry) + response = operation.result(polling=self.retry) # check if job failed if response.status.state == 6: raise ValueError(response.status.details) @@ -123,7 +125,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: operation = self.job_client.create_batch(request=request) # type: ignore # this takes quite a while, waiting on GCP response to resolve # (not a google-api-core issue, more likely a dataproc serverless issue) - response = operation.result(retry=self.retry) + response = operation.result(polling=self.retry) return response # there might be useful results here that we can parse and return # Dataproc job output is saved to the Cloud Storage bucket diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 955bc6be0..7f17429f1 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -1,14 +1,38 @@ import os import pytest -from dbt.tests.util import run_dbt, write_file +from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file import dbt.tests.adapter.python_model.test_python_model as dbt_tests TEST_SKIP_MESSAGE = ( "Skipping the Tests since Dataproc serverless is not stable. " "TODO: Fix later" ) +blocks_for_thirty_sec = """ +def model(dbt, _): + dbt.config( + materialized='table', + timeout=5 + ) + import pandas as pd + data = {'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']} + df = pd.DataFrame.from_dict(data) + import time + time.sleep(30) + return df +""" + + +class TestPythonModelDataprocTimeoutTest: + @pytest.fixture(scope="class") + def models(self): + return {"30_sec_python_model.py": blocks_for_thirty_sec} + + def test_model_times_out(self, project): + result, output = run_dbt_and_capture(["run"], expect_pass=False) + assert len(result) == 1 + assert "Operation did not complete within the designated timeout of 5 seconds." in output + -@pytest.mark.skip(reason=TEST_SKIP_MESSAGE) class TestPythonModelDataproc(dbt_tests.BasePythonModelTests): pass diff --git a/tox.ini b/tox.ini index efa18083d..e0342e04d 100644 --- a/tox.ini +++ b/tox.ini @@ -23,7 +23,7 @@ passenv = DATAPROC_* GCS_BUCKET commands = - bigquery: {envpython} -m pytest {posargs} -vv tests/functional --profile service_account + bigquery: {envpython} -m pytest {posargs} -vv tests/functional -k "not TestPython" --profile service_account deps = -rdev-requirements.txt -e. From ad1f979be4260cbda84fe95bfb1c3b8c7e78d4da Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 20 Jun 2023 13:29:00 -0700 Subject: [PATCH 2/2] rename retry to result_polling_policy --- dbt/adapters/bigquery/python_submissions.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index caa16cab6..0c7ce1917 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -45,7 +45,9 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None: self.timeout = self.parsed_model["config"].get( "timeout", self.credential.job_execution_timeout_seconds or 60 * 60 * 24 ) - self.retry = retry.Retry(predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout) + self.result_polling_policy = retry.Retry( + predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout + ) self.client_options = ClientOptions( api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region) ) @@ -100,7 +102,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: "job": job, } ) - response = operation.result(polling=self.retry) + response = operation.result(polling=self.result_polling_policy) # check if job failed if response.status.state == 6: raise ValueError(response.status.details) @@ -125,7 +127,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: operation = self.job_client.create_batch(request=request) # type: ignore # this takes quite a while, waiting on GCP response to resolve # (not a google-api-core issue, more likely a dataproc serverless issue) - response = operation.result(polling=self.retry) + response = operation.result(polling=self.result_polling_policy) return response # there might be useful results here that we can parse and return # Dataproc job output is saved to the Cloud Storage bucket