Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert df to pyspark DataFrame if it is pandas before writing #469

Merged
merged 8 commits into from
Sep 20, 2022
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20220916-125706.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Convert df to pyspark DataFrame if it is pandas before writing
time: 2022-09-16T12:57:06.846297-06:00
custom:
Author: chamini2 dbeatty10
Issue: "468"
PR: "469"
12 changes: 11 additions & 1 deletion dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,17 @@
# --- Autogenerated dbt materialization code. --- #
dbt = dbtObj(spark.table)
df = model(dbt, spark)
df.write.mode("overwrite").format("delta").saveAsTable("{{ target_relation }}")

# make sure pandas exists
import importlib.util
package_name = 'pandas'
if importlib.util.find_spec(package_name):
import pandas
Copy link
Contributor

@b-per b-per Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line required? (the import pandas one)

From what I see, in databricks we might usually want to load pyspark.pandas rather than pandas (and I don't think it is required here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-per I believe it is required in order to do test the type: isinstance(df, pandas.core.frame.DataFrame). I tried commenting out the single line with the import, and it did not work.

However, digging into your question did uncover something else...

Goal

For the py_write_table() macro, we want the return type to be pyspark.sql.dataframe.DataFrame.

Potential input types

There are three different data types we expect 1 it to be:

  1. pyspark.sql.dataframe.DataFrame (PySpark DataFrame)
  2. pandas.core.frame.DataFrame (Pandas DataFrame)
  3. pyspark.pandas.frame.DataFrame (Pandas-on-Spark DataFrame)

Are we handling each of the three cases?

  1. ✅ For the first case, it is already a pyspark.sql.dataframe.DataFrame, so no conversion necessary.
  2. ✅ For the second case, this PR infers if it is a pandas.core.frame.DataFrame and converts it using spark.createDataFrame() if so.
  3. ❌ However, the third case isn't handled yet! We can handle it by calling to_spark() on it 2.

I'll update this PR to include the third case.


  1. We could proactively raise an exception if it falls outside of these expected types (rather than sending it off to the database where it will fall on its face and emit an unhelpful error message).
  2. Thank you to @Adricarpin for this handy resource: https://towardsdatascience.com/run-pandas-as-fast-as-spark-f5eefe780c45#64b7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, my bad, I thought isinstance was comparing with a string and not an object type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-per Very glad you asked, because now we have a more robust implementation as a result!

if isinstance(df, pandas.core.frame.DataFrame):
# convert to pyspark.DataFrame
df = spark.createDataFrame(df)

df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("{{ target_relation }}")
{%- endmacro -%}

{%macro py_script_comment()%}
Expand Down