Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert df to pyspark DataFrame if it is pandas before writing #469

Merged
merged 8 commits into from
Sep 20, 2022

Conversation

dbeatty10
Copy link
Contributor

@dbeatty10 dbeatty10 commented Sep 16, 2022

resolves #468

Description

Copies the solution by @chamini2 in dbt-labs/dbt-bigquery#301

Checklist

@cla-bot cla-bot bot added the cla:yes label Sep 16, 2022
@github-actions
Copy link
Contributor

Thank you for your pull request! We could not find a changelog entry for this change. For details on how to document a change, see the dbt-spark contributing guide.

@dbeatty10
Copy link
Contributor Author

Overview

I manually verified that the following didn't work before for the dbt-databrick adapter (which inherits from dbt-spark). Confirmed that it works using @chamini2's fix 👍

Code example
import pandas as pd

def model(dbt, session):
    dbt.config(
        materialized="table",
        packages=["pandas"]
    )

    df = pd.DataFrame(
        {'City': ['Buenos Aires', 'Brasilia', 'Santiago', 'Bogota', 'Caracas'],
        'Country': ['Argentina', 'Brazil', 'Chile', 'Colombia', 'Venezuela'],
        'Latitude': [-34.58, -15.78, -33.45, 4.60, 10.48],
        'Longitude': [-58.66, -47.91, -70.66, -74.08, -66.86]}
        )

    return df

Details

dbt-databricks has it's own implementation of py_write_table which overrides the implementation provided by dbt-spark.

⚠️ As a result, Databricks users won't have support for Pandas DataFrames until one of the following happens:

  1. dbt-databricks deletes the custom implementation of py_write_table
  2. This implementation is copied into dbt-databricks also
  3. Some other update is added to dbt-databricks that enables Pandas DataFrames

We want to provide an equivalent implementation here so that dbt-databricks has the option to fully drop their implementation of py_write_table. So this PR adopts two changes from dbt-databricks's version of py_write_table:

  1. .option("overwriteSchema", "true")
  2. # --- Autogenerated dbt materialization code. --- #

Follow-up

I will open an issue in dbt-databricks for this.

@dbeatty10 dbeatty10 marked this pull request as ready for review September 17, 2022 22:57
@dbeatty10 dbeatty10 added the ready_for_review Externally contributed PR has functional approval, ready for code review from Core engineering label Sep 17, 2022
Copy link
Contributor

@b-per b-per left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can slightly update the code (but I could be wrong as well)

import importlib.util
package_name = 'pandas'
if importlib.util.find_spec(package_name):
import pandas
Copy link
Contributor

@b-per b-per Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line required? (the import pandas one)

From what I see, in databricks we might usually want to load pyspark.pandas rather than pandas (and I don't think it is required here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-per I believe it is required in order to do test the type: isinstance(df, pandas.core.frame.DataFrame). I tried commenting out the single line with the import, and it did not work.

However, digging into your question did uncover something else...

Goal

For the py_write_table() macro, we want the return type to be pyspark.sql.dataframe.DataFrame.

Potential input types

There are three different data types we expect 1 it to be:

  1. pyspark.sql.dataframe.DataFrame (PySpark DataFrame)
  2. pandas.core.frame.DataFrame (Pandas DataFrame)
  3. pyspark.pandas.frame.DataFrame (Pandas-on-Spark DataFrame)

Are we handling each of the three cases?

  1. ✅ For the first case, it is already a pyspark.sql.dataframe.DataFrame, so no conversion necessary.
  2. ✅ For the second case, this PR infers if it is a pandas.core.frame.DataFrame and converts it using spark.createDataFrame() if so.
  3. ❌ However, the third case isn't handled yet! We can handle it by calling to_spark() on it 2.

I'll update this PR to include the third case.


  1. We could proactively raise an exception if it falls outside of these expected types (rather than sending it off to the database where it will fall on its face and emit an unhelpful error message).
  2. Thank you to @Adricarpin for this handy resource: https://towardsdatascience.com/run-pandas-as-fast-as-spark-f5eefe780c45#64b7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, my bad, I thought isinstance was comparing with a string and not an object type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-per Very glad you asked, because now we have a more robust implementation as a result!

# convert to pyspark.sql.dataframe.DataFrame
if isinstance(df, pandas.core.frame.DataFrame):
df = spark.createDataFrame(df)
elif isinstance(df, pyspark.pandas.frame.DataFrame):
Copy link

@chamini2 chamini2 Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can happen outside the if importlib.util.find_spec(package_name):. Since it would run only with pandas package installed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chamini2 -- I manually tested that suggestion and pushed a commit 👍

@dbeatty10
Copy link
Contributor Author

@ChenyuLInx see below for the Python model files that I used for manual testing. Example data is from the GeoPandas documentation.

Pandas DataFrame

# models/pandas_df.py

import pandas as pd


def model(dbt, session):
    dbt.config(
        materialized="table",
        packages=["pandas"]
    )

    df = pd.DataFrame(
        {'City': ['Buenos Aires', 'Brasilia', 'Santiago', 'Bogota', 'Caracas'],
        'Country': ['Argentina', 'Brazil', 'Chile', 'Colombia', 'Venezuela'],
        'Latitude': [-34.58, -15.78, -33.45, 4.60, 10.48],
        'Longitude': [-58.66, -47.91, -70.66, -74.08, -66.86]}
        )

    return df

PySpark DataFrame

# models/pyspark_df.py

def model(dbt, session):
    dbt.config(
        materialized="table",
    )

    df = spark.createDataFrame(
        [
            ("Buenos Aires", "Argentina", -34.58, -58.66),
            ("Brasilia", "Brazil", -15.78, -47.91),
            ("Santiago", "Chile", -33.45, -70.66),
            ("Bogota", "Colombia", 4.60, -74.08),
            ("Caracas", "Venezuela", 10.48, -66.86),
        ],
        ["City", "Country", "Latitude", "Longitude"]
    )

    return df

Pandas-on-Spark DataFrame

# models/pandas_on_spark_df.py

import pyspark.pandas as ps


def model(dbt, session):
    dbt.config(
        materialized="table",
    )

    df = ps.DataFrame(
        {'City': ['Buenos Aires', 'Brasilia', 'Santiago', 'Bogota', 'Caracas'],
        'Country': ['Argentina', 'Brazil', 'Chile', 'Colombia', 'Venezuela'],
        'Latitude': [-34.58, -15.78, -33.45, 4.60, 10.48],
        'Longitude': [-58.66, -47.91, -70.66, -74.08, -66.86]}
        )

    return df

Pandas-on-Spark DataFrame to Pandas DataFrame

# models/pandas_on_spark_df_to_pandas.py

import pyspark.pandas as ps


def model(dbt, session):
    dbt.config(
        materialized="table",
    )

    df = ps.DataFrame(
        {'City': ['Buenos Aires', 'Brasilia', 'Santiago', 'Bogota', 'Caracas'],
        'Country': ['Argentina', 'Brazil', 'Chile', 'Colombia', 'Venezuela'],
        'Latitude': [-34.58, -15.78, -33.45, 4.60, 10.48],
        'Longitude': [-58.66, -47.91, -70.66, -74.08, -66.86]}
        )
    pdf = df.to_pandas()

    return pdf

import pandas

# convert to pyspark.sql.dataframe.DataFrame
if isinstance(df, pandas.core.frame.DataFrame):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this has to happen inside of the importlib.util.find_spec(package_name): if right? If not, you will be accessing a module pandas that is not really imported if it was not present.

Comment on lines 46 to 56
import importlib.util
import pyspark.pandas
package_name = 'pandas'
if importlib.util.find_spec(package_name):
import pandas

# convert to pyspark.sql.dataframe.DataFrame
if isinstance(df, pandas.core.frame.DataFrame):
df = spark.createDataFrame(df)
elif isinstance(df, pyspark.pandas.frame.DataFrame):
df = df.to_spark()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import importlib.util
import pyspark.pandas
package_name = 'pandas'
if importlib.util.find_spec(package_name):
import pandas
# convert to pyspark.sql.dataframe.DataFrame
if isinstance(df, pandas.core.frame.DataFrame):
df = spark.createDataFrame(df)
elif isinstance(df, pyspark.pandas.frame.DataFrame):
df = df.to_spark()
# convert to pyspark.sql.dataframe.DataFrame
import importlib.util
if importlib.util.find_spec('pandas'):
import pandas
if isinstance(df, pandas.core.frame.DataFrame):
df = spark.createDataFrame(df)
import pyspark.pandas
if isinstance(df, pyspark.pandas.frame.DataFrame):
df = df.to_spark()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how I would add the pyspark.pandas check

@dbeatty10
Copy link
Contributor Author

dbeatty10 commented Sep 19, 2022

Incorporated feedback from @chamini2, @ueshin, and @ChenyuLInx.

Added these features:

  • Also check the availability of pyspark.pandas since it was introduced in Spark 3.2 and Databricks still supports DBR 9.1 that is based on Spark 3.1.2
  • Raise an error exception if unable to convert to a Spark DataFrame (pyspark.sql.dataframe.DataFrame)

The net effect is:

  • No need to convert Spark DataFrames -- already the type we want
  • Convert pandas DataFrames to Spark DataFrame
  • Convert pandas-on-Spark DataFrames to Spark DataFrame
  • Raise an exception for all other cases

Example when there is an error:
image

@dbeatty10
Copy link
Contributor Author

Incorporated some more feedback from @ueshin given on databricks/dbt-databricks#180

Specifically, preferentially convert pandas DataFrames to pandas-on-Spark DataFrames first since @ueshin shared that:

  • they know how to convert pandas DataFrames better than spark.createDataFrame(df)
  • and converting from pandas-on-Spark to Spark DataFrame has no overhead

Copy link

@chamini2 chamini2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

# since they know how to convert pandas DataFrames better than `spark.createDataFrame(df)`
# and converting from pandas-on-Spark to Spark DataFrame has no overhead
if pyspark_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
df = pyspark.pandas.frame.DataFrame(df)
Copy link
Contributor

@jtcohen6 jtcohen6 Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the risk of making this even more complex than it needs to be — I believe pyspark.pandas was introduced in Spark v3.2: https://www.databricks.com/blog/2021/10/04/pandas-api-on-upcoming-apache-spark-3-2.html

It won't be available in earlier versions. (The same functionality was available via the koalas package, which was the old codename for pandas-on-PySpark.)

I don't want us to get too-too clever with this logic, though! Could just look like a try/except here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what will happen currently if the input is a pandas-on-Spark DataFrame but pyspark.pandas is not available:

  • msg = f"{type(df)} is not a supported type for dbt Python materialization"

What I believe will happen if the input is a pandas DataFrame but pyspark.pandas is not available:

  • df = spark.createDataFrame(df)

We can add in an attempt to import databricks.koalas so we are covering as many bases as possible. If we go that route, is there an environment we could test it out on?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go that route, is there an environment we could test it out on?

We could spin up a Databricks cluster running an older Spark version (v3.1). This is also what will be running inside Dataproc — the latest Apache Spark release it supports is v3.1.

ueshin pushed a commit to databricks/dbt-databricks that referenced this pull request Sep 20, 2022
…fore writing (#181)

resolves #179

### Description

Per #180 (comment) removing `py_write_table` macro since dbt-labs/dbt-spark#469 is merged.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla:yes ready_for_review Externally contributed PR has functional approval, ready for code review from Core engineering
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CT-1198] [Feature] support python model return pandas dataframe
6 participants