Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenyuLInx committed Aug 30, 2022
1 parent 965fdbf commit b4e177e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
6 changes: 3 additions & 3 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dbt.adapters.spark import SparkConnectionManager
from dbt.adapters.spark import SparkRelation
from dbt.adapters.spark import SparkColumn
from dbt.adapters.spark.python_submissions import python_submission_helpers
from dbt.adapters.spark.python_submissions import PYTHON_SUBMISSION_HELPERS
from dbt.adapters.base import BaseRelation
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt.events import AdapterLogger
Expand Down Expand Up @@ -394,11 +394,11 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str, timeout=None
# TODO limit this function to run only when doing the materialization of python nodes
# assuming that for python job running over 1 day user would mannually overwrite this
submission_method = parsed_model["config"].get("submission_method", "commands")
if submission_method not in python_submission_helpers:
if submission_method not in PYTHON_SUBMISSION_HELPERS:
raise NotImplementedError(
"Submission method {} is not supported".format(submission_method)
)
job_helper = python_submission_helpers[submission_method](
job_helper = PYTHON_SUBMISSION_HELPERS[submission_method](
parsed_model, self.connections.profile.credentials
)
job_helper.submit(compiled_code)
Expand Down
10 changes: 6 additions & 4 deletions dbt/adapters/spark/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import dbt.exceptions

DEFAULT_POLLING_INTERVAL = 3
SUBMISSION_LANGUAGE = "python"
DEFAULT_TIMEOUT = 60 * 60 * 24


class BasePythonJobHelper:
Expand All @@ -20,7 +22,7 @@ def __init__(self, parsed_model, credentials):
self.polling_interval = DEFAULT_POLLING_INTERVAL

def get_timeout(self):
timeout = self.parsed_model["config"].get("timeout", 60 * 60 * 24)
timeout = self.parsed_model["config"].get("timeout", DEFAULT_TIMEOUT)
if timeout <= 0:
raise ValueError("Timeout must be a positive integer")
return timeout
Expand Down Expand Up @@ -177,7 +179,7 @@ def create(self) -> str:
headers=self.auth_header,
json={
"clusterId": self.cluster,
"language": "python",
"language": SUBMISSION_LANGUAGE,
},
)
if response.status_code != 200:
Expand Down Expand Up @@ -217,7 +219,7 @@ def execute(self, context_id: str, command: str) -> str:
json={
"clusterId": self.cluster,
"contextId": context_id,
"language": "python",
"language": SUBMISSION_LANGUAGE,
"command": command,
},
)
Expand Down Expand Up @@ -276,7 +278,7 @@ def submit(self, compiled_code):
context.destroy(context_id)


python_submission_helpers = {
PYTHON_SUBMISSION_HELPERS = {
"notebook": DBNotebookPythonJobHelper,
"commands": DBCommandsApiPythonJobHelper,
}

0 comments on commit b4e177e

Please sign in to comment.