diff --git a/.changes/unreleased/Under the Hood-20220916-125706.yaml b/.changes/unreleased/Under the Hood-20220916-125706.yaml new file mode 100644 index 000000000..54b82eb55 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20220916-125706.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Enable Pandas and Pandas-on-Spark DataFrames for dbt python models +time: 2022-09-16T12:57:06.846297-06:00 +custom: + Author: chamini2 dbeatty10 + Issue: "468" + PR: "469" diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index d39ba0b44..25d70c722 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -41,7 +41,40 @@ # --- Autogenerated dbt materialization code. --- # dbt = dbtObj(spark.table) df = model(dbt, spark) -df.write.mode("overwrite").format("delta").saveAsTable("{{ target_relation }}") + +import importlib.util + +pandas_available = False +pyspark_available = False + +# make sure pandas exists before using it +if importlib.util.find_spec("pandas"): + import pandas + pandas_available = True + +# make sure pyspark.pandas exists before using it +if importlib.util.find_spec("pyspark.pandas"): + import pyspark.pandas + pyspark_available = True + +# preferentially convert pandas DataFrames to pandas-on-Spark DataFrames first +# since they know how to convert pandas DataFrames better than `spark.createDataFrame(df)` +# and converting from pandas-on-Spark to Spark DataFrame has no overhead +if pyspark_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame): + df = pyspark.pandas.frame.DataFrame(df) + +# convert to pyspark.sql.dataframe.DataFrame +if isinstance(df, pyspark.sql.dataframe.DataFrame): + pass # since it is already a Spark DataFrame +elif pyspark_available and isinstance(df, pyspark.pandas.frame.DataFrame): + df = df.to_spark() +elif pandas_available and isinstance(df, pandas.core.frame.DataFrame): + df = spark.createDataFrame(df) +else: + msg = f"{type(df)} is not a supported type for dbt Python materialization" + raise Exception(msg) + +df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("{{ target_relation }}") {%- endmacro -%} {%macro py_script_comment()%}