Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor submission method and add command API as defualt #442

Merged
merged 4 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20220829-164426.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Submit python model with Command API by default. Adjusted run name
time: 2022-08-29T16:44:26.509138-07:00
custom:
Author: ChenyuLInx
Issue: "424"
PR: "442"
108 changes: 9 additions & 99 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import re
import requests
import time
import base64
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Union
Expand All @@ -20,6 +17,7 @@
from dbt.adapters.spark import SparkConnectionManager
from dbt.adapters.spark import SparkRelation
from dbt.adapters.spark import SparkColumn
from dbt.adapters.spark.python_submissions import PYTHON_SUBMISSION_HELPERS
from dbt.adapters.base import BaseRelation
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt.events import AdapterLogger
Expand Down Expand Up @@ -394,105 +392,17 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str, timeout=None
# of `None` which evaluates to True!

# TODO limit this function to run only when doing the materialization of python nodes

# assuming that for python job running over 1 day user would mannually overwrite this
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
identifier = parsed_model["alias"]
if not timeout:
timeout = 60 * 60 * 24
if timeout <= 0:
raise ValueError("Timeout must larger than 0")

auth_header = {"Authorization": f"Bearer {self.connections.profile.credentials.token}"}

# create new dir
if not self.connections.profile.credentials.user:
raise ValueError("Need to supply user in profile to submit python job")
# it is safe to call mkdirs even if dir already exists and have content inside
work_dir = f"/Users/{self.connections.profile.credentials.user}/{schema}"
response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/mkdirs",
headers=auth_header,
json={
"path": work_dir,
},
)
if response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating work_dir for python notebooks\n {response.content!r}"
submission_method = parsed_model["config"].get("submission_method", "commands")
if submission_method not in PYTHON_SUBMISSION_HELPERS:
raise NotImplementedError(
"Submission method {} is not supported".format(submission_method)
)

# add notebook
b64_encoded_content = base64.b64encode(compiled_code.encode()).decode()
response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/import",
headers=auth_header,
json={
"path": f"{work_dir}/{identifier}",
"content": b64_encoded_content,
"language": "PYTHON",
"overwrite": True,
"format": "SOURCE",
},
job_helper = PYTHON_SUBMISSION_HELPERS[submission_method](
parsed_model, self.connections.profile.credentials
)
if response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating python notebook.\n {response.content!r}"
)

# submit job
submit_response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.1/jobs/runs/submit",
headers=auth_header,
json={
"run_name": "debug task",
"existing_cluster_id": self.connections.profile.credentials.cluster,
"notebook_task": {
"notebook_path": f"{work_dir}/{identifier}",
},
},
)
if submit_response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating python run.\n {response.content!r}"
)

# poll until job finish
state = None
start = time.time()
run_id = submit_response.json()["run_id"]
terminal_states = ["TERMINATED", "SKIPPED", "INTERNAL_ERROR"]
while state not in terminal_states and time.time() - start < timeout:
time.sleep(1)
resp = requests.get(
f"https://{self.connections.profile.credentials.host}"
f"/api/2.1/jobs/runs/get?run_id={run_id}",
headers=auth_header,
)
json_resp = resp.json()
state = json_resp["state"]["life_cycle_state"]
# logger.debug(f"Polling.... in state: {state}")
if state != "TERMINATED":
raise dbt.exceptions.RuntimeException(
"python model run ended in state"
f"{state} with state_message\n{json_resp['state']['state_message']}"
)

# get end state to return to user
run_output = requests.get(
f"https://{self.connections.profile.credentials.host}"
f"/api/2.1/jobs/runs/get-output?run_id={run_id}",
headers=auth_header,
)
json_run_output = run_output.json()
result_state = json_run_output["metadata"]["state"]["result_state"]
if result_state != "SUCCESS":
raise dbt.exceptions.RuntimeException(
"Python model failed with traceback as:\n"
"(Note that the line number here does not "
"match the line number in your code due to dbt templating)\n"
f"{json_run_output['error_trace']}"
)
job_helper.submit(compiled_code)
# we don't really get any useful information back from the job submission other than success
return self.connections.get_response(None)

def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
Expand Down
Loading