Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert df to pyspark DataFrame if it is pandas before writing #469

Merged
merged 8 commits into from
Sep 20, 2022
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20220916-125706.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Enable Pandas and Pandas-on-Spark DataFrames for dbt python models
time: 2022-09-16T12:57:06.846297-06:00
custom:
Author: chamini2 dbeatty10
Issue: "468"
PR: "469"
35 changes: 34 additions & 1 deletion dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,40 @@
# --- Autogenerated dbt materialization code. --- #
dbt = dbtObj(spark.table)
df = model(dbt, spark)
df.write.mode("overwrite").format("delta").saveAsTable("{{ target_relation }}")

import importlib.util

pandas_available = False
pyspark_available = False

# make sure pandas exists before using it
if importlib.util.find_spec("pandas"):
import pandas
pandas_available = True

# make sure pyspark.pandas exists before using it
if importlib.util.find_spec("pyspark.pandas"):
import pyspark.pandas
pyspark_available = True

# preferentially convert pandas DataFrames to pandas-on-Spark DataFrames first
# since they know how to convert pandas DataFrames better than `spark.createDataFrame(df)`
# and converting from pandas-on-Spark to Spark DataFrame has no overhead
if pyspark_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
df = pyspark.pandas.frame.DataFrame(df)
Copy link
Contributor

@jtcohen6 jtcohen6 Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the risk of making this even more complex than it needs to be — I believe pyspark.pandas was introduced in Spark v3.2: https://www.databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html

It won't be available in earlier versions. (The same functionality was available via the koalas package, which was the old codename for pandas-on-PySpark.)

I don't want us to get too-too clever with this logic, though! Could just look like a try/except here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what will happen currently if the input is a pandas-on-Spark DataFrame but pyspark.pandas is not available:

  • msg = f"{type(df)} is not a supported type for dbt Python materialization"

What I believe will happen if the input is a pandas DataFrame but pyspark.pandas is not available:

  • df = spark.createDataFrame(df)

We can add in an attempt to import databricks.koalas so we are covering as many bases as possible. If we go that route, is there an environment we could test it out on?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go that route, is there an environment we could test it out on?

We could spin up a Databricks cluster running an older Spark version (v3.1). This is also what will be running inside Dataproc — the latest Apache Spark release it supports is v3.1.


# convert to pyspark.sql.dataframe.DataFrame
if isinstance(df, pyspark.sql.dataframe.DataFrame):
pass # since it is already a Spark DataFrame
elif pyspark_available and isinstance(df, pyspark.pandas.frame.DataFrame):
df = df.to_spark()
elif pandas_available and isinstance(df, pandas.core.frame.DataFrame):
df = spark.createDataFrame(df)
else:
msg = f"{type(df)} is not a supported type for dbt Python materialization"
raise Exception(msg)

df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("{{ target_relation }}")
{%- endmacro -%}

{%macro py_script_comment()%}
Expand Down