Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Snowflake connector with Spark #3364

Closed
amithadiraju1694 opened this issue Nov 28, 2022 · 10 comments
Closed

Support for Snowflake connector with Spark #3364

amithadiraju1694 opened this issue Nov 28, 2022 · 10 comments
Labels
kind/feature New feature or request wontfix This will not be worked on

Comments

@amithadiraju1694
Copy link
Contributor

Is your feature request related to a problem? Please describe.

For folks on data bricks or other spark-based platforms, getting large data as a pandas df may not be an option .... It's possible to get a spark df using Snowflake Connector for Python but it's orders of magnitude slower compared to Snowflake connector for Spark

Describe the solution you'd like
Add a function to SnowflakeRetrievalJob which supports executing & retrieving snowflake query through Snowflake Connector for Spark

Describe alternatives you've considered
Tried getting snowflake query results as arrow_batches -> converting to pandas on each batch -> to spark data frame -> concatenate all spark dos to one. This is orders go marinated slower than reading same data frame from Snowflake Connector for Spark.

cc: @sfc-gh-madkins

@amithadiraju1694 amithadiraju1694 added the kind/feature New feature or request label Nov 28, 2022
@adchia adchia closed this as completed Dec 15, 2022
@amithadiraju1694
Copy link
Contributor Author

amithadiraju1694 commented Dec 15, 2022

@adchia would love to use this if completed, since this is one of major pain points for us. Could you pls provide link for docs on how to use this functionality?

@adchia adchia reopened this Dec 15, 2022
@adchia
Copy link
Collaborator

adchia commented Dec 15, 2022

Oh woops I had misread this one. Yeah the SparkOfflineStore doesn't connect to Snowflake yet. Feel free to contribute this and I can cut it in a new patch release!

The easiest way to add this

That being said, it should be a matter of adding another mode in the spark_source.py file.

With logic looking something like below being in the get_table_query_string:

options = {
    "sfUrl": url,
    "sfUser": user,
    "sfPassword": password,
    "sfDatabase": database,
    "sfSchema": schema,
    "sfWarehouse": warehouse,
    "APPLICATION": "feast",
}

if role:
    options["sfRole"] = role

df_reader = spark.read.format("snowflake").options(**options)

if table:
    df_reader = df_reader.option("dbtable", table)
else:
    df_reader = df_reader.option("query", query)

df = df_reader.load()
tmp_table_name = get_temp_entity_table_name()
df.createOrReplaceTempView(tmp_table_name)

I'd probably add something like an optional snowflake_config in SparkSource to capture the above configs (e.g. warehouse, database, etc)

The better way to add this

Really though, we should support reading SnowflakeSource files and mapping that into the above.

This would probably live in https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py#L76, and instead of failing on SparkSource, also support parsing SnowflakeSource. And then constructing the table_query_string like above.

@amithadiraju1694
Copy link
Contributor Author

Oh woops I had misread this one. Yeah the SparkOfflineStore doesn't connect to Snowflake yet. Feel free to contribute this and I can cut it in a new patch release!

The easiest way to add this

That being said, it should be a matter of adding another mode in the spark_source.py file.

With logic looking something like below being in the get_table_query_string:

options = {
    "sfUrl": url,
    "sfUser": user,
    "sfPassword": password,
    "sfDatabase": database,
    "sfSchema": schema,
    "sfWarehouse": warehouse,
    "APPLICATION": "feast",
}

if role:
    options["sfRole"] = role

df_reader = spark.read.format("snowflake").options(**options)

if table:
    df_reader = df_reader.option("dbtable", table)
else:
    df_reader = df_reader.option("query", query)

df = df_reader.load()
tmp_table_name = get_temp_entity_table_name()
df.createOrReplaceTempView(tmp_table_name)

I'd probably add something like an optional snowflake_config in SparkSource to capture the above configs (e.g. warehouse, database, etc)

The better way to add this

Really though, we should support reading SnowflakeSource files and mapping that into the above.

This would probably live in https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py#L76, and instead of failing on SparkSource, also support parsing SnowflakeSource. And then constructing the table_query_string like above.

Thanks for getting back ! Few questions based on your response:

  1. If this functionality ( reading from snowflake source ) is added in SparkOfflineStore , aren't we necessarily creating additional data in hive meta-store ?

  2. Along same lines, can we add this functionality to SnowflakeOfflineStore, by adding additional functions to SnowflakeRetrievalJob to use Snowflake Connector for Spark instead of Snowflake Connector for Python ?

My thought's that, doing through 2 one can still use snowflake as offline store without essentially creating a additional copy of data elsewhere. Let me know your thoughts.

@sfc-gh-madkins
Copy link
Collaborator

@amithadiraju1694 this should be possible to do

@sfc-gh-madkins
Copy link
Collaborator

@amithadiraju1694 this is the alternative you tried correct but found it slow? #3358

@amithadiraju1694
Copy link
Contributor Author

@amithadiraju1694 this is the alternative you tried correct but found it slow? #3358

Yup, that's the one I tried.

@amithadiraju1694
Copy link
Contributor Author

@amithadiraju1694 this should be possible to do

I tried doing it in SnowflakeRetrievalJob , by sending connection options to spark and using spark connector. It couldn't find feast_df_***** object , and I wasn't sure how to fix it. So I stopped there. Please let me know if you have a solution for that.

@sfc-gh-madkins
Copy link
Collaborator

sfc-gh-madkins commented Dec 28, 2022 via email

@sfc-gh-madkins
Copy link
Collaborator

@amithadiraju1694 are you able to give me a side by side numbers comparison using to_spark_df() vs materializing the table in snowflake and then reading it through the spark connector?

@stale
Copy link

stale bot commented Sep 17, 2023

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix This will not be worked on label Sep 17, 2023
@stale stale bot closed this as completed Mar 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature New feature or request wontfix This will not be worked on
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants