Skip to content

Commit

Permalink
Adds support for arrays in snowflake
Browse files Browse the repository at this point in the history
Signed-off-by: john.lemmon <john.lemmon@medely.com>
  • Loading branch information
JohnLemmonMedely committed Jan 8, 2024
1 parent 052182b commit df6a16d
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 19 deletions.
31 changes: 31 additions & 0 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import json
import os
import uuid
import warnings
Expand Down Expand Up @@ -51,6 +52,17 @@
)
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.types import (
Array,
Bool,
Bytes,
Float32,
Float64,
Int32,
Int64,
String,
UnixTimestamp,
)
from feast.usage import log_exceptions_and_usage

try:
Expand Down Expand Up @@ -320,6 +332,7 @@ def query_generator() -> Iterator[str]:
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
feature_refs, project, registry
),
feature_views=feature_views,
metadata=RetrievalMetadata(
features=feature_refs,
keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}),
Expand Down Expand Up @@ -398,9 +411,12 @@ def __init__(
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
feature_views: Optional[List[FeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):

if feature_views is None:
feature_views = []
if not isinstance(query, str):
self._query_generator = query
else:
Expand All @@ -416,6 +432,7 @@ def query_generator() -> Iterator[str]:
self.config = config
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views or []
self._feature_views = feature_views
self._metadata = metadata
self.export_path: Optional[str]
if self.config.offline_store.blob_export_location:
Expand All @@ -436,6 +453,20 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
self.snowflake_conn, self.to_sql()
).fetch_pandas_all()

for feature_view in self._feature_views:
for feature in feature_view.features:
if feature.dtype in [
Array(String),
Array(Bytes),
Array(Int32),
Array(Int64),
Array(UnixTimestamp),
Array(Float64),
Array(Float32),
Array(Bool),
]:
df[feature.name] = [json.loads(x) for x in df[feature.name]]

return df

def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ def get_table_column_names_and_types(
else:
row["snowflake_type"] = "NUMBERwSCALE"

elif row["type_code"] in [5, 9, 10, 12]:
elif row["type_code"] in [5, 9, 12]:
error = snowflake_unsupported_map[row["type_code"]]
raise NotImplementedError(
f"The following Snowflake Data Type is not supported: {error}"
)
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 11, 13]:
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 10, 11, 13]:
row["snowflake_type"] = snowflake_type_code_map[row["type_code"]]
else:
raise NotImplementedError(
Expand All @@ -305,14 +305,14 @@ def get_table_column_names_and_types(
6: "TIMESTAMP_LTZ",
7: "TIMESTAMP_TZ",
8: "TIMESTAMP_NTZ",
10: "ARRAY",
11: "BINARY",
13: "BOOLEAN",
}

snowflake_unsupported_map = {
5: "VARIANT -- Try converting to VARCHAR",
9: "OBJECT -- Try converting to VARCHAR",
10: "ARRAY -- Try converting to VARCHAR",
12: "TIME -- Try converting to VARCHAR",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,62 @@ CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_varchar_to_string_pro
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_varchar_to_string_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_varchar_to_list_string_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int32_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int64_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_float_to_list_double_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_number_to_int32_proto(df NUMBER)
RETURNS BINARY
LANGUAGE PYTHON
Expand Down
175 changes: 175 additions & 0 deletions sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from binascii import unhexlify

import numpy as np
import pandas
from _snowflake import vectorized

Expand Down Expand Up @@ -59,6 +60,180 @@ def feast_snowflake_varchar_to_string_proto(df):
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""
# ValueType.STRING_LIST = 12
@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_bytes_to_list_bytes_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

# Sometimes bytes come in as strings so we need to convert back to float
numpy_arrays = np.asarray(df[0].to_list()).astype(bytes)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(numpy_arrays, ValueType.BYTES_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_varchar_to_list_string_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_varchar_to_list_string_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.STRING_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int32_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_number_to_list_int32_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT32_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int64_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_number_to_list_int64_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT64_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_float_to_list_double_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_float_to_list_double_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

numpy_arrays = np.asarray(df[0].to_list()).astype(float)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(numpy_arrays, ValueType.DOUBLE_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_boolean_to_list_bool_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.BOOL_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

numpy_arrays = np.asarray(df[0].to_list()).astype(np.datetime64)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(numpy_arrays, ValueType.UNIX_TIMESTAMP_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_number_to_int32_proto(df NUMBER)
RETURNS BINARY
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,14 @@ def _convert_value_name_to_snowflake_udf(value_name: str, project_name: str) ->
"FLOAT": f"feast_{project_name}_snowflake_float_to_double_proto",
"BOOL": f"feast_{project_name}_snowflake_boolean_to_bool_proto",
"UNIX_TIMESTAMP": f"feast_{project_name}_snowflake_timestamp_to_unix_timestamp_proto",
"BYTES_LIST": f"feast_{project_name}_snowflake_array_bytes_to_list_bytes_proto",
"STRING_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto",
"INT32_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int32_proto",
"INT64_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int64_proto",
"DOUBLE_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto",
"FLOAT_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto",
"BOOL_LIST": f"feast_{project_name}_snowflake_array_boolean_to_list_bool_proto",
"UNIX_TIMESTAMP_LIST": f"feast_{project_name}_snowflake_array_timestamp_to_list_unix_timestamp_proto",
}
return name_map[value_name].upper()

Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/data/data_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def get_feature_values_for_dtype(
"int64": [1, 2, 3, 4, 5],
"float": [1.0, None, 3.0, 4.0, 5.0],
"string": ["1", None, "3", "4", "5"],
"bytes": [b"1", None, b"3", b"4", b"5"],
"bool": [True, None, False, True, False],
"datetime": [
datetime(1980, 1, 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@
"password": os.getenv("SNOWFLAKE_CI_PASSWORD", ""),
"role": os.getenv("SNOWFLAKE_CI_ROLE", ""),
"warehouse": os.getenv("SNOWFLAKE_CI_WAREHOUSE", ""),
"database": "FEAST",
"schema": "ONLINE",
"database": os.getenv("SNOWFLAKE_CI_DATABASE", "FEAST"),
"schema": os.getenv("SNOWFLAKE_CI_SCHEMA_ONLINE", "ONLINE"),
}

BIGTABLE_CONFIG = {
Expand Down
Loading

0 comments on commit df6a16d

Please sign in to comment.