Skip to content

Commit

Permalink
various improvement (#493)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com>
  • Loading branch information
ChenyuLInx and jtcohen6 authored Sep 30, 2022
1 parent 7f233b1 commit 0cb9582
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
16 changes: 13 additions & 3 deletions dbt/adapters/spark/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, parsed_model: Dict, credentials: SparkCredentials) -> None:

@property
def cluster_id(self) -> str:
return self.parsed_model.get("cluster_id", self.credentials.cluster_id)
return self.parsed_model["config"].get("cluster_id", self.credentials.cluster_id)

def get_timeout(self) -> int:
timeout = self.parsed_model["config"].get("timeout", DEFAULT_TIMEOUT)
Expand Down Expand Up @@ -82,7 +82,17 @@ def _submit_job(self, path: str, cluster_spec: dict) -> str:
"notebook_path": path,
},
}
job_spec.update(cluster_spec)
job_spec.update(cluster_spec) # updates 'new_cluster' config
# PYPI packages
packages = self.parsed_model["config"].get("packages", [])
# additional format of packages
additional_libs = self.parsed_model["config"].get("additional_libs", [])
libraries = []
for package in packages:
libraries.append({"pypi": {"package": package}})
for lib in additional_libs:
libraries.append(lib)
job_spec.update({"libraries": libraries}) # type: ignore
submit_response = requests.post(
f"https://{self.credentials.host}/api/2.1/jobs/runs/submit",
headers=self.auth_header,
Expand All @@ -96,7 +106,7 @@ def _submit_job(self, path: str, cluster_spec: dict) -> str:

def _submit_through_notebook(self, compiled_code: str, cluster_spec: dict) -> None:
# it is safe to call mkdirs even if dir already exists and have content inside
work_dir = f"/dbt_python_model/{self.schema}/"
work_dir = f"/Shared/dbt_python_model/{self.schema}/"
self._create_work_dir(work_dir)
# add notebook
whole_file_path = f"{work_dir}{self.identifier}"
Expand Down
10 changes: 6 additions & 4 deletions dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
dbt = dbtObj(spark.table)
df = model(dbt, spark)

# make sure pyspark exists in the namepace, for 7.3.x-scala2.12 it does not exist
import pyspark
# make sure pandas exists before using it
try:
import pandas
Expand All @@ -52,9 +54,9 @@ except ImportError:
# make sure pyspark.pandas exists before using it
try:
import pyspark.pandas
pyspark_available = True
pyspark_pandas_api_available = True
except ImportError:
pyspark_available = False
pyspark_pandas_api_available = False

# make sure databricks.koalas exists before using it
try:
Expand All @@ -66,15 +68,15 @@ except ImportError:
# preferentially convert pandas DataFrames to pandas-on-Spark or Koalas DataFrames first
# since they know how to convert pandas DataFrames better than `spark.createDataFrame(df)`
# and converting from pandas-on-Spark to Spark DataFrame has no overhead
if pyspark_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
if pyspark_pandas_api_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
df = pyspark.pandas.frame.DataFrame(df)
elif koalas_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
df = databricks.koalas.frame.DataFrame(df)

# convert to pyspark.sql.dataframe.DataFrame
if isinstance(df, pyspark.sql.dataframe.DataFrame):
pass # since it is already a Spark DataFrame
elif pyspark_available and isinstance(df, pyspark.pandas.frame.DataFrame):
elif pyspark_pandas_api_available and isinstance(df, pyspark.pandas.frame.DataFrame):
df = df.to_spark()
elif koalas_available and isinstance(df, databricks.koalas.frame.DataFrame):
df = df.to_spark()
Expand Down
16 changes: 16 additions & 0 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,26 @@ def project_config_update(self):

models__simple_python_model = """
import pandas
import torch
import spacy
def model(dbt, spark):
dbt.config(
materialized='table',
submission_method='job_cluster',
job_cluster_config={
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"spark_conf": {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*, 4]"
},
"custom_tags": {
"ResourceClass": "SingleNode"
}
},
packages=['spacy', 'torch']
)
data = [[1,2]] * 10
return spark.createDataFrame(data, schema=['test', 'test2'])
Expand Down

0 comments on commit 0cb9582

Please sign in to comment.