Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate out snowflake source #836

Merged
merged 79 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
c31906c
Add more secret manager support
xiaoyongzhu Sep 25, 2022
e96459a
Add abstract class
xiaoyongzhu Sep 26, 2022
2d6c135
Update feathr-configuration-and-env.md
xiaoyongzhu Sep 27, 2022
f616522
Update _envvariableutil.py
xiaoyongzhu Sep 27, 2022
cdcd612
add tests for aws secrets manager
xiaoyongzhu Sep 28, 2022
aa5fdda
Update test_secrets_read.py
xiaoyongzhu Sep 28, 2022
a6870d9
fix tests
xiaoyongzhu Sep 29, 2022
997a2b1
Update test_secrets_read.py
xiaoyongzhu Sep 29, 2022
8be6a42
fix test
xiaoyongzhu Sep 29, 2022
e617b99
Update pull_request_push_test.yml
xiaoyongzhu Sep 29, 2022
435e24f
Merge branch 'main' into secret_manager
xiaoyongzhu Sep 29, 2022
9cb332c
get_secrets_update
aabbasi-hbo Oct 7, 2022
218123f
move import statement
aabbasi-hbo Oct 7, 2022
07a8cf0
update spelling
aabbasi-hbo Oct 7, 2022
44a3ce0
update raise exception
aabbasi-hbo Oct 7, 2022
87cd083
revert
aabbasi-hbo Oct 7, 2022
bd3d2b0
feature registry hack
aabbasi-hbo Oct 17, 2022
0531788
query for uppercase
aabbasi-hbo Oct 19, 2022
57ee474
add snowflake source
aabbasi-hbo Oct 27, 2022
a8e566b
remove snowflake type
aabbasi-hbo Oct 27, 2022
268f600
enableDebugLogger
aabbasi-hbo Oct 31, 2022
1030c3e
add logging
aabbasi-hbo Oct 31, 2022
6590287
simple path snowflake fix
aabbasi-hbo Nov 1, 2022
9230aab
snowflake-update
aabbasi-hbo Nov 3, 2022
46db910
fix bugs/log
aabbasi-hbo Nov 3, 2022
6fa8596
get_snowflake_path
aabbasi-hbo Nov 4, 2022
b71b415
update get_snowflake_path
aabbasi-hbo Nov 4, 2022
573a2d6
Merge pull request #2 from feathr-ai/main
aabbasi-hbo Nov 4, 2022
5d724f1
Merge branch 'snowflake-source-scala-update' into main
aabbasi-hbo Nov 4, 2022
0668b51
Merge pull request #3 from aabbasi-hbo/main
aabbasi-hbo Nov 4, 2022
66d0c75
remove log
aabbasi-hbo Nov 4, 2022
ba1589e
log
aabbasi-hbo Nov 4, 2022
688a5ac
add logs
aabbasi-hbo Nov 4, 2022
f6b71f9
test with path
aabbasi-hbo Nov 4, 2022
067e20e
update snowflake registry handling
aabbasi-hbo Nov 4, 2022
97b9ca9
update source
aabbasi-hbo Nov 4, 2022
6b401de
remove logs
aabbasi-hbo Nov 4, 2022
e5c200f
update error handling and test
aabbasi-hbo Nov 4, 2022
fe0c8f9
make lowercase
aabbasi-hbo Nov 6, 2022
e01635d
remove logging
aabbasi-hbo Nov 6, 2022
41554b4
Merge pull request #5 from aabbasi-hbo/secrets-key-test
aabbasi-hbo Nov 6, 2022
1a502ab
Revert "Merge pull request #5 from aabbasi-hbo/secrets-key-test"
aabbasi-hbo Nov 6, 2022
51d8e75
Revert "remove logging"
aabbasi-hbo Nov 6, 2022
3771172
Revert "update error handling and test"
aabbasi-hbo Nov 6, 2022
1ba6ea0
Revert "query for uppercase"
aabbasi-hbo Nov 6, 2022
1e90f1a
Revert "revert"
aabbasi-hbo Nov 6, 2022
70f3742
Revert "update raise exception"
aabbasi-hbo Nov 6, 2022
567fd97
Revert "update spelling"
aabbasi-hbo Nov 6, 2022
6bcc7b9
Revert "move import statement"
aabbasi-hbo Nov 6, 2022
5c55213
Revert "get_secrets_update"
aabbasi-hbo Nov 6, 2022
3e80185
Revert "Update pull_request_push_test.yml"
aabbasi-hbo Nov 6, 2022
4346c0b
Revert "fix test"
aabbasi-hbo Nov 6, 2022
6b61e6b
Revert "Update test_secrets_read.py"
aabbasi-hbo Nov 6, 2022
7c06a54
Revert "fix tests"
aabbasi-hbo Nov 6, 2022
c284e1c
Revert "Update test_secrets_read.py"
aabbasi-hbo Nov 6, 2022
dc709f6
Revert "add tests for aws secrets manager"
aabbasi-hbo Nov 6, 2022
157c7fc
Revert "Update _envvariableutil.py"
aabbasi-hbo Nov 6, 2022
f89edea
Revert "Update feathr-configuration-and-env.md"
aabbasi-hbo Nov 6, 2022
645b631
Revert "Add abstract class"
aabbasi-hbo Nov 6, 2022
cc26339
Revert "Add more secret manager support"
aabbasi-hbo Nov 6, 2022
135aa65
remove extra line
aabbasi-hbo Nov 6, 2022
358c9f4
fix formatting
aabbasi-hbo Nov 6, 2022
f4125a2
Update setup.py
aabbasi-hbo Nov 6, 2022
f83a149
update python tests
aabbasi-hbo Nov 7, 2022
e5b8b2a
update scala test
aabbasi-hbo Nov 7, 2022
f9afe01
update tests
aabbasi-hbo Nov 7, 2022
e6ceb6a
update test
aabbasi-hbo Nov 8, 2022
539634b
add test
aabbasi-hbo Nov 8, 2022
556bbfe
update docs
aabbasi-hbo Nov 8, 2022
1d2d20e
fix test
aabbasi-hbo Nov 8, 2022
03df405
add snowflake guide
aabbasi-hbo Nov 8, 2022
858f362
add to NonTimeBasedDataSourceAccessor
aabbasi-hbo Nov 8, 2022
78695d1
remove registry fixes
aabbasi-hbo Nov 10, 2022
ec4012c
Merge branch 'main' into separate-snowflake-source
aabbasi-hbo Nov 11, 2022
7c9827a
Update source.py
aabbasi-hbo Nov 18, 2022
fb053e0
Update source.py
aabbasi-hbo Nov 18, 2022
d1abd3a
Update source.py
aabbasi-hbo Nov 18, 2022
aac4938
remove print
aabbasi-hbo Nov 18, 2022
bb33936
Update feathr-snowflake-guide.md
aabbasi-hbo Nov 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/how-to-guides/feathr-configuration-and-env.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Feathr will get the configurations in the following order:
| OFFLINE_STORE__SNOWFLAKE__URL | Configures the Snowflake URL. Usually it's something like `dqllago-ol19457.snowflakecomputing.com`. | Required if using Snowflake as an offline store. |
| OFFLINE_STORE__SNOWFLAKE__USER | Configures the Snowflake user. | Required if using Snowflake as an offline store. |
| OFFLINE_STORE__SNOWFLAKE__ROLE | Configures the Snowflake role. Usually it's something like `ACCOUNTADMIN`. | Required if using Snowflake as an offline store. |
| OFFLINE_STORE__SNOWFLAKE__WAREHOUSE | Configures the Snowflake Warehouse. | Required if using Snowflake as an offline store. |
| JDBC_SF_PASSWORD | Configurations for Snowflake password | Required if using Snowflake as an offline store. |
| SPARK_CONFIG__SPARK_CLUSTER | Choice for spark runtime. Currently support: `azure_synapse`, `databricks`. The `databricks` configs will be ignored if `azure_synapse` is set and vice versa. | Required |
| SPARK_CONFIG__SPARK_RESULT_OUTPUT_PARTS | Configure number of parts for the spark output for feature generation job | Required |
Expand Down
36 changes: 36 additions & 0 deletions docs/how-to-guides/feathr-snowflake-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
layout: default
title: Using Snowflake with Feathr
parent: Feathr How-to Guides
---

# Using Snowflake with Feathr

Currently, feathr supports using Snowflake as a source.

# Using Snowflake as a source

To use Snowflake as a source, we need to create a `SnowflakeSource` in projects.

```
source = feathr.SnowflakeSource(name: str, database: str, schema: str, dbtable: optional[str], query: Optional[str])
```

* `name` is the source name, same as other sources.
* `database` is SF database that stores the table of interest
* `schema` is SF schema that stores the table of interest
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
* `dbtable` or `query`, `dbtable` is the table name in the database and `query` is a SQL `SELECT` statement, only one of them should be specified at the same time.

There are some other parameters such as `preprocessing`, they're same as other sources like `HdfsSource`.

After creating the `SnowflakeSource`, you can use it in the same way as other kinds of sources.

# Specifying Snowflake Source in Observation Settings

`ObservationSettings` requires an observation path. In order to generate the snowflake path, feathr exposes client functionality that exposes the same arguments as SnowflakeSource.

To generate snowflake path to pass into `ObservationSettings`, we need to call `client.get_snowflake_path()` functionality.

```
observation_path = client.get_snowflake_path(database: str, schema: str, dbtable: Optional[str], query: Optional[str])
```
1 change: 1 addition & 0 deletions docs/samples/customer360/Customer360.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
" url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
" user: \"<replace_with_your_user>\"\n",
" role: \"<replace_with_your_user_role>\"\n",
" warehouse: \"<replace_with_your_warehouse>\"\n",
"spark_config:\n",
" spark_cluster: 'databricks'\n",
" spark_result_output_parts: '1'\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@
" url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
" user: \"<replace_with_your_user>\"\n",
" role: \"<replace_with_your_user_role>\"\n",
" warehouse: \"<replace_with_your_warehouse>\"\n",
"spark_config:\n",
" # choice for spark runtime. Currently support: azure_synapse, databricks\n",
" # The `databricks` configs will be ignored if `azure_synapse` is set and vice versa.\n",
Expand Down Expand Up @@ -1417,7 +1418,7 @@
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3.8.10 ('logistics')",
"display_name": "Python 3.9.14 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -1431,11 +1432,11 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.9.14"
},
"vscode": {
"interpreter": {
"hash": "6d25d3d1f1809ed0384c3d8e0cd4f1df57fe7bb936ead67f035c6ff1494f4e23"
"hash": "a665b5d41d17b532ea9890333293a1b812fa0b73c9c25c950b3cedf1bebd0438"
}
}
},
Expand Down
7 changes: 4 additions & 3 deletions docs/samples/fraud_detection_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@
" url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
" user: \"<replace_with_your_user>\"\n",
" role: \"<replace_with_your_user_role>\"\n",
" warehouse: \"<replace_with_your_warehouse>\"\n",
"spark_config:\n",
" spark_cluster: 'azure_synapse'\n",
" spark_result_output_parts: '1'\n",
Expand Down Expand Up @@ -997,7 +998,7 @@
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3.10.4 64-bit",
"display_name": "Python 3.9.14 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -1011,12 +1012,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
"version": "3.9.14"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "6eea572ac5b43246b7c51fa33510c93fb6df4c34b515a6e4994c858f44841967"
"hash": "a665b5d41d17b532ea9890333293a1b812fa0b73c9c25c950b3cedf1bebd0438"
}
}
},
Expand Down
7 changes: 4 additions & 3 deletions docs/samples/nyc_taxi_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
" url: \"dqllago-ol19457.snowflakecomputing.com\"\n",
" user: \"feathrintegration\"\n",
" role: \"ACCOUNTADMIN\"\n",
" warehouse: \"COMPUTE_WH\"\n",
"spark_config:\n",
" spark_cluster: 'azure_synapse'\n",
" spark_result_output_parts: '1'\n",
Expand Down Expand Up @@ -693,7 +694,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.10.8 64-bit",
"display_name": "Python 3.9.14 64-bit",
"language": "python",
"name": "python3"
},
Expand All @@ -707,11 +708,11 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
"version": "3.9.14"
},
"vscode": {
"interpreter": {
"hash": "b0fa6594d8f4cbf19f97940f81e996739fb7646882a419484c72d19e05852a7e"
"hash": "a665b5d41d17b532ea9890333293a1b812fa0b73c9c25c950b3cedf1bebd0438"
}
}
},
Expand Down
1 change: 1 addition & 0 deletions docs/samples/product_recommendation_demo_advanced.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@
" url: \"<replace_with_your_snowflake_account>.snowflakecomputing.com\"\n",
" user: \"<replace_with_your_user>\"\n",
" role: \"<replace_with_your_user_role>\"\n",
" warehouse: \"<replace_with_your_warehouse>\"\n",
"spark_config:\n",
" spark_cluster: 'azure_synapse'\n",
" spark_result_output_parts: '1'\n",
Expand Down
1 change: 1 addition & 0 deletions feathr_project/feathr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
'Source',
'InputContext',
'HdfsSource',
'SnowflakeSource',
'KafkaConfig',
'KafKaSource',
'ValueType',
Expand Down
18 changes: 17 additions & 1 deletion feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,20 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
# Pretty print anchor_list
if verbose and self.anchor_list:
FeaturePrinter.pretty_print_anchors(self.anchor_list)

def get_snowflake_path(self, database: str, schema: str, dbtable: str = None, query: str = None) -> str:
"""
Returns snowflake path given dataset location information.
Either dbtable or query must be specified but not both.
"""
if dbtable is not None and query is not None:
raise RuntimeError("Both dbtable and query are specified. Can only specify one..")
if dbtable is None and query is None:
raise RuntimeError("One of dbtable or query must be specified..")
if dbtable:
return f"snowflake://snowflake_account/?sfDatabase={database}&sfSchema={schema}&dbtable={dbtable}"
else:
return f"snowflake://snowflake_account/?sfDatabase={database}&sfSchema={schema}&query={query}"

def list_registered_features(self, project_name: str = None) -> List[str]:
"""List all the already registered features under the given project.
Expand Down Expand Up @@ -825,14 +839,16 @@ def _get_snowflake_config_str(self):
sf_url = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'url')
sf_user = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'user')
sf_role = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'role')
sf_warehouse = self.envutils.get_environment_variable_with_default('offline_store', 'snowflake', 'warehouse')
sf_password = self.envutils.get_environment_variable('JDBC_SF_PASSWORD')
# HOCCON format will be parsed by the Feathr job
config_str = """
JDBC_SF_URL: {JDBC_SF_URL}
JDBC_SF_USER: {JDBC_SF_USER}
JDBC_SF_ROLE: {JDBC_SF_ROLE}
JDBC_SF_WAREHOUSE: {JDBC_SF_WAREHOUSE}
JDBC_SF_PASSWORD: {JDBC_SF_PASSWORD}
""".format(JDBC_SF_URL=sf_url, JDBC_SF_USER=sf_user, JDBC_SF_PASSWORD=sf_password, JDBC_SF_ROLE=sf_role)
""".format(JDBC_SF_URL=sf_url, JDBC_SF_USER=sf_user, JDBC_SF_PASSWORD=sf_password, JDBC_SF_ROLE=sf_role, JDBC_SF_WAREHOUSE=sf_warehouse)
return self._reshape_config_str(config_str)

def _get_kafka_config_str(self):
Expand Down
86 changes: 86 additions & 0 deletions feathr_project/feathr/definition/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from jinja2 import Template
from loguru import logger
from urllib.parse import urlparse, parse_qs
import json


Expand Down Expand Up @@ -133,6 +134,91 @@ def __str__(self):
def to_argument(self):
return self.path

class SnowflakeSource(Source):
"""
A data source for Snowflake

Attributes:
name (str): name of the source
database (str): Snowflake Database
schema (str): Snowflake Schema
dbtable (Optional[str]): Snowflake Table
query (Optional[str]): Query instead of snowflake table
Either one of query or dbtable must be specified but not both.
preprocessing (Optional[Callable]): A preprocessing python function that transforms the source data for further feature transformation.
event_timestamp_column (Optional[str]): The timestamp field of your record. As sliding window aggregation feature assume each record in the source data should have a timestamp column.
timestamp_format (Optional[str], optional): The format of the timestamp field. Defaults to "epoch". Possible values are:
- `epoch` (seconds since epoch), for example `1647737463`
- `epoch_millis` (milliseconds since epoch), for example `1647737517761`
- Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
registry_tags: A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {"deprecated": "true"} to indicate this source is deprecated, etc.
"""
def __init__(self, name: str, database: str, schema: str, dbtable: Optional[str] = None, query: Optional[str] = None, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None) -> None:
super().__init__(name, event_timestamp_column,
timestamp_format, registry_tags=registry_tags)
self.preprocessing=preprocessing
if dbtable is not None and query is not None:
raise RuntimeError("Both dbtable and query are specified. Can only specify one..")
if dbtable is None and query is None:
raise RuntimeError("One of dbtable or query must be specified..")
if dbtable is not None:
self.dbtable = dbtable
if query is not None:
self.query = query
self.database = database
self.schema = schema
self.path = self._get_snowflake_path()

def _get_snowflake_path(self) -> str:
"""
Returns snowflake path for registry.
"""
if self.dbtable:
return f"snowflake://snowflake_account/?sfDatabase={self.database}&sfSchema={self.schema}&dbtable={self.dbtable}"
else:
return f"snowflake://snowflake_account/?sfDatabase={self.database}&sfSchema={self.schema}&query={self.query}"

def parse_snowflake_path(url: str) -> Dict[str, str]:
"""
Parses snowflake path into dictionary of components for registry.
"""
parse_result = urlparse(url)
parsed_queries = parse_qs(parse_result.query)
updated_dict = {key: parsed_queries[key][0] for key in parsed_queries}
return updated_dict

def to_feature_config(self) -> str:
tm = Template("""
{{source.name}}: {
type: SNOWFLAKE
location: {
type: "snowflake"
{% if source.dbtable is defined %}
dbtable: "{{source.dbtable}}"
{% endif %}
{% if source.query is defined %}
query: "{{source.query}}"
{% endif %}
database: "{{source.database}}"
schema: "{{source.schema}}"
}
{% if source.event_timestamp_column %}
timeWindowParameters: {
timestampColumn: "{{source.event_timestamp_column}}"
timestampColumnFormat: "{{source.timestamp_format}}"
}
{% endif %}
}
""")
msg = tm.render(source=self)
return msg

def __str__(self):
return str(self.preprocessing) + '\n' + self.to_feature_config()

def to_argument(self):
return self.path

class JdbcSource(Source):
def __init__(self, name: str, url: str = "", dbtable: Optional[str] = None, query: Optional[str] = None, auth: Optional[str] = None, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None) -> None:
super().__init__(name, event_timestamp_column, timestamp_format, registry_tags)
Expand Down
23 changes: 22 additions & 1 deletion feathr_project/feathr/registry/_feathr_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from feathr.definition.feature import Feature, FeatureBase
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.repo_definitions import RepoDefinitions
from feathr.definition.source import GenericSource, HdfsSource, InputContext, JdbcSource, Source
from feathr.definition.source import GenericSource, HdfsSource, InputContext, JdbcSource, SnowflakeSource, Source
from feathr.definition.transformation import ExpressionTransformation, Transformation, WindowAggTransformation
from feathr.definition.typed_key import TypedKey
from feathr.registry.feature_registry import FeathrRegistry
Expand Down Expand Up @@ -397,6 +397,12 @@ def source_to_def(v: Source) -> dict:
"type": urlparse(v.path).scheme,
"path": v.path,
}
elif isinstance(v, SnowflakeSource):
ret = {
"name": v.name,
"type": "SNOWFLAKE",
"path": v.path,
}
elif isinstance(v, JdbcSource):
ret = {
"name": v.name,
Expand Down Expand Up @@ -446,6 +452,21 @@ def dict_to_source(v: dict) -> Source:
timestamp_format=v["attributes"].get(
"timestampFormat"),
registry_tags=v["attributes"].get("tags", {}))
elif type == "SNOWFLAKE":
snowflake_path = v["attributes"]["path"]
snowflake_parameters = SnowflakeSource.parse_snowflake_path(snowflake_path)
source = SnowflakeSource(name=v["attributes"]["name"],
dbtable=snowflake_parameters.get("dbtable", None),
query=snowflake_parameters.get("query", None),
database=snowflake_parameters["sfDatabase"],
schema=snowflake_parameters["sfSchema"],
preprocessing=_correct_function_indentation(
v["attributes"].get("preprocessing")),
event_timestamp_column=v["attributes"].get(
"eventTimestampColumn"),
timestamp_format=v["attributes"].get(
"timestampFormat"),
registry_tags=v["attributes"].get("tags", {}))
elif type == "generic":
options = v["attributes"].copy()
# These are not options
Expand Down
Loading