Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass python model timeout to polling instead of retry #766

Merged
merged 4 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20230609-132727.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Pass python model timeout to polling operation so model execution times out
as expected.
time: 2023-06-09T13:27:27.279842-07:00
custom:
Author: colin-rogers-dbt
Issue: "577"
8 changes: 5 additions & 3 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Dict, Union

from dbt.adapters.base import PythonJobHelper
from google.api_core.future.polling import POLLING_PREDICATE

from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
from dbt.adapters.bigquery.connections import DataprocBatchConfig
from google.api_core import retry
Expand Down Expand Up @@ -43,7 +45,7 @@ def __init__(self, parsed_model: Dict, credential: BigQueryCredentials) -> None:
self.timeout = self.parsed_model["config"].get(
"timeout", self.credential.job_execution_timeout_seconds or 60 * 60 * 24
)
self.retry = retry.Retry(maximum=10.0, deadline=self.timeout)
self.retry = retry.Retry(predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://googleapis.dev/python/google-api-core/latest/_modules/google/api_core/future/polling.html
predicate=POLLING_PREDICATE, is already a default so you can skip it

Suggested change
self.retry = retry.Retry(predicate=POLLING_PREDICATE, maximum=10.0, timeout=self.timeout)
self.retry = retry.Retry(maximum=10.0, timeout=self.timeout)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My read on this is that if we supply a custom Retry to polling (i.e. that replaces the DEFAULT_POLLING retry) it won't necessarily add the default polling predicate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess you're right, the DEFAULT_POLLING is not used anymore then and the retry constructor doesn't have a predicate by default

self.client_options = ClientOptions(
api_endpoint="{}-dataproc.googleapis.com:443".format(self.credential.dataproc_region)
)
Expand Down Expand Up @@ -98,7 +100,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
"job": job,
}
)
response = operation.result(retry=self.retry)
response = operation.result(polling=self.retry)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider rename self.retry to some thing different like self.poll_policy or self.get_result_policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, I think result_polling_policy is the most explicit

# check if job failed
if response.status.state == 6:
raise ValueError(response.status.details)
Expand All @@ -123,7 +125,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
operation = self.job_client.create_batch(request=request) # type: ignore
# this takes quite a while, waiting on GCP response to resolve
# (not a google-api-core issue, more likely a dataproc serverless issue)
response = operation.result(retry=self.retry)
response = operation.result(polling=self.retry)
return response
# there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
Expand Down
28 changes: 26 additions & 2 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
import os
import pytest
from dbt.tests.util import run_dbt, write_file
from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

TEST_SKIP_MESSAGE = (
"Skipping the Tests since Dataproc serverless is not stable. " "TODO: Fix later"
)

blocks_for_thirty_sec = """
def model(dbt, _):
dbt.config(
materialized='table',
timeout=5
)
import pandas as pd
data = {'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']}
df = pd.DataFrame.from_dict(data)
import time
time.sleep(30)
return df
"""


class TestPythonModelDataprocTimeoutTest:
@pytest.fixture(scope="class")
def models(self):
return {"30_sec_python_model.py": blocks_for_thirty_sec}

def test_model_times_out(self, project):
result, output = run_dbt_and_capture(["run"], expect_pass=False)
assert len(result) == 1
assert "Operation did not complete within the designated timeout of 5 seconds." in output


@pytest.mark.skip(reason=TEST_SKIP_MESSAGE)
class TestPythonModelDataproc(dbt_tests.BasePythonModelTests):
pass

Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ passenv =
DATAPROC_*
GCS_BUCKET
commands =
bigquery: {envpython} -m pytest {posargs} -vv tests/functional --profile service_account
bigquery: {envpython} -m pytest {posargs} -vv tests/functional -k "not TestPython" --profile service_account
deps =
-rdev-requirements.txt
-e.
Expand Down