Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass python model timeout to polling instead of retry #766

Merged
merged 4 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20230609-132727.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Pass python model timeout to polling operation so model execution times out
as expected.
time: 2023-06-09T13:27:27.279842-07:00
custom:
Author: colin-rogers-dbt
Issue: "577"
10 changes: 7 additions & 3 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Dict, Union

from dbt.adapters.base import PythonJobHelper
from google.api_core.future.polling import POLLING_PREDICATE

from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
from dbt.adapters.bigquery.connections import DataprocBatchConfig
from google.api_core import retry
Expand Down Expand Up @@ -43,7 +45,9 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None:
self.timeout = self.parsed_model["config"].get(
"timeout", self.credential.job_execution_timeout_seconds or 60 * 60 * 24
)
self.retry = retry.Retry(maximum=10.0, deadline=self.timeout)
self.result_polling_policy = retry.Retry(
predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout
)
self.client_options = ClientOptions(
api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region)
)
Expand Down Expand Up @@ -98,7 +102,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
"job": job,
}
)
response = operation.result(retry=self.retry)
response = operation.result(polling=self.result_polling_policy)
# check if job failed
if response.status.state == 6:
raise ValueError(response.status.details)
Expand All @@ -123,7 +127,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
operation = self.job_client.create_batch(request=request) # type: ignore
# this takes quite a while, waiting on GCP response to resolve
# (not a google-api-core issue, more likely a dataproc serverless issue)
response = operation.result(retry=self.retry)
response = operation.result(polling=self.result_polling_policy)
return response
# there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
Expand Down
28 changes: 26 additions & 2 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
import os
import pytest
from dbt.tests.util import run_dbt, write_file
from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

TEST_SKIP_MESSAGE = (
"Skipping the Tests since Dataproc serverless is not stable. " "TODO: Fix later"
)

blocks_for_thirty_sec = """
def model(dbt, _):
dbt.config(
materialized='table',
timeout=5
)
import pandas as pd
data = {'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']}
df = pd.DataFrame.from_dict(data)
import time
time.sleep(30)
return df
"""


class TestPythonModelDataprocTimeoutTest:
@pytest.fixture(scope="class")
def models(self):
return {"30_sec_python_model.py": blocks_for_thirty_sec}

def test_model_times_out(self, project):
result, output = run_dbt_and_capture(["run"], expect_pass=False)
assert len(result) == 1
assert "Operation did not complete within the designated timeout of 5 seconds." in output


@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestPythonModelDataproc(dbt_tests.BasePythonModelTests):
pass

Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ passenv =
DATAPROC_*
GCS_BUCKET
commands =
bigquery: {envpython} -m pytest {posargs} -vv tests/functional --profile service_account
bigquery: {envpython} -m pytest {posargs} -vv tests/functional -k "not TestPython" --profile service_account
deps =
-rdev-requirements.txt
-e.
Expand Down