Skip to content

Commit

Permalink
Pass python model timeout to polling instead of retry (#766)
Browse files Browse the repository at this point in the history
* Pass python model timeout to polling instead of retry

* rename retry to result_polling_policy
  • Loading branch information
colin-rogers-dbt authored Jun 21, 2023
1 parent b8d127f commit 5dd68e2
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20230609-132727.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Pass python model timeout to polling operation so model execution times out
as expected.
time: 2023-06-09T13:27:27.279842-07:00
custom:
Author: colin-rogers-dbt
Issue: "577"
10 changes: 7 additions & 3 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Dict, Union

from dbt.adapters.base import PythonJobHelper
from google.api_core.future.polling import POLLING_PREDICATE

from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
from dbt.adapters.bigquery.connections import DataprocBatchConfig
from google.api_core import retry
Expand Down Expand Up @@ -43,7 +45,9 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None:
self.timeout = self.parsed_model["config"].get(
"timeout", self.credential.job_execution_timeout_seconds or 60 * 60 * 24
)
self.retry = retry.Retry(maximum=10.0, deadline=self.timeout)
self.result_polling_policy = retry.Retry(
predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout
)
self.client_options = ClientOptions(
api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region)
)
Expand Down Expand Up @@ -98,7 +102,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
"job": job,
}
)
response = operation.result(retry=self.retry)
response = operation.result(polling=self.result_polling_policy)
# check if job failed
if response.status.state == 6:
raise ValueError(response.status.details)
Expand All @@ -123,7 +127,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
operation = self.job_client.create_batch(request=request) # type: ignore
# this takes quite a while, waiting on GCP response to resolve
# (not a google-api-core issue, more likely a dataproc serverless issue)
response = operation.result(retry=self.retry)
response = operation.result(polling=self.result_polling_policy)
return response
# there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
Expand Down
28 changes: 26 additions & 2 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
import os
import pytest
from dbt.tests.util import run_dbt, write_file
from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

TEST_SKIP_MESSAGE = (
"Skipping the Tests since Dataproc serverless is not stable. " "TODO: Fix later"
)

blocks_for_thirty_sec = """
def model(dbt, _):
dbt.config(
materialized='table',
timeout=5
)
import pandas as pd
data = {'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']}
df = pd.DataFrame.from_dict(data)
import time
time.sleep(30)
return df
"""


class TestPythonModelDataprocTimeoutTest:
@pytest.fixture(scope="class")
def models(self):
return {"30_sec_python_model.py": blocks_for_thirty_sec}

def test_model_times_out(self, project):
result, output = run_dbt_and_capture(["run"], expect_pass=False)
assert len(result) == 1
assert "Operation did not complete within the designated timeout of 5 seconds." in output


@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestPythonModelDataproc(dbt_tests.BasePythonModelTests):
pass

Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ passenv =
DATAPROC_*
GCS_BUCKET
commands =
bigquery: {envpython} -m pytest {posargs} -vv tests/functional --profile service_account
bigquery: {envpython} -m pytest {posargs} -vv tests/functional -k "not TestPython" --profile service_account
deps =
-rdev-requirements.txt
-e.
Expand Down

0 comments on commit 5dd68e2

Please sign in to comment.