Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for arrays in snowflake #3758

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import json
import traceback
import warnings
from typing import List, Optional

import gunicorn.app.base
import pandas as pd
from dateutil import parser
from fastapi import FastAPI, HTTPException, Request, Response, status
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse
from pydantic import BaseModel

import feast
from feast import proto_json
from feast import proto_json, utils
from feast.data_source import PushMode
from feast.errors import PushSourceNotFoundException
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
Expand All @@ -31,6 +33,17 @@ class PushFeaturesRequest(BaseModel):
to: str = "online"


class MaterializeRequest(BaseModel):
start_ts: str
end_ts: str
feature_views: Optional[List[str]] = None


class MaterializeIncrementalRequest(BaseModel):
end_ts: str
feature_views: Optional[List[str]] = None


def get_app(store: "feast.FeatureStore"):
proto_json.patch()

Expand Down Expand Up @@ -134,6 +147,34 @@ def write_to_online_store(body=Depends(get_body)):
def health():
return Response(status_code=status.HTTP_200_OK)

@app.post("/materialize")
def materialize(body=Depends(get_body)):
try:
request = MaterializeRequest(**json.loads(body))
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
request.feature_views,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

@app.post("/materialize-incremental")
def materialize_incremental(body=Depends(get_body)):
try:
request = MaterializeIncrementalRequest(**json.loads(body))
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

return app


Expand Down
14 changes: 13 additions & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import StrictStr, validator
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

Expand Down Expand Up @@ -72,6 +72,13 @@ def get_http_client_info():
return http_client_info.ClientInfo(user_agent=get_user_agent())


class BigQueryTableCreateDisposition(ConstrainedStr):
"""Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition"""

values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}


class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for GCP BigQuery"""

Expand All @@ -95,6 +102,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""

@validator("billing_project_id")
def project_id_exists(cls, v, values, **kwargs):
if v and not values["project_id"]:
Expand Down Expand Up @@ -324,6 +334,7 @@ def write_logged_features(
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
schema=arrow_schema_to_bq_schema(source.get_schema(registry)),
create_disposition=config.offline_store.table_create_disposition,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field=source.get_log_timestamp_column(),
Expand Down Expand Up @@ -384,6 +395,7 @@ def offline_write_batch(
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
schema=arrow_schema_to_bq_schema(pa_schema),
create_disposition=config.offline_store.table_create_disposition,
write_disposition="WRITE_APPEND", # Default but included for clarity
)

Expand Down
19 changes: 19 additions & 0 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import json
import os
import uuid
import warnings
Expand Down Expand Up @@ -51,6 +52,7 @@
)
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.types import Array, Float32, Float64, Int32, Int64, String, UnixTimestamp
from feast.usage import log_exceptions_and_usage

try:
Expand Down Expand Up @@ -320,6 +322,7 @@ def query_generator() -> Iterator[str]:
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
feature_refs, project, registry
),
feature_views=feature_views,
metadata=RetrievalMetadata(
features=feature_refs,
keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}),
Expand Down Expand Up @@ -398,9 +401,12 @@ def __init__(
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
feature_views: Optional[List[FeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):

if feature_views is None:
feature_views = []
if not isinstance(query, str):
self._query_generator = query
else:
Expand All @@ -416,6 +422,7 @@ def query_generator() -> Iterator[str]:
self.config = config
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views or []
self._feature_views = feature_views
self._metadata = metadata
self.export_path: Optional[str]
if self.config.offline_store.blob_export_location:
Expand All @@ -436,6 +443,18 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
self.snowflake_conn, self.to_sql()
).fetch_pandas_all()

for feature_view in self._feature_views:
for feature in feature_view.features:
if feature.dtype in [
Array(String),
Array(Int32),
Array(Int64),
Array(UnixTimestamp),
Array(Float64),
Array(Float32),
]:
df[feature.name] = [json.loads(x) for x in df[feature.name]]

return df

def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ def get_table_column_names_and_types(
else:
row["snowflake_type"] = "NUMBERwSCALE"

elif row["type_code"] in [5, 9, 10, 12]:
elif row["type_code"] in [5, 9, 12]:
error = snowflake_unsupported_map[row["type_code"]]
raise NotImplementedError(
f"The following Snowflake Data Type is not supported: {error}"
)
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 11, 13]:
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 10, 11, 13]:
row["snowflake_type"] = snowflake_type_code_map[row["type_code"]]
else:
raise NotImplementedError(
Expand All @@ -305,14 +305,14 @@ def get_table_column_names_and_types(
6: "TIMESTAMP_LTZ",
7: "TIMESTAMP_TZ",
8: "TIMESTAMP_NTZ",
10: "ARRAY",
11: "BINARY",
13: "BOOLEAN",
}

snowflake_unsupported_map = {
5: "VARIANT -- Try converting to VARCHAR",
9: "OBJECT -- Try converting to VARCHAR",
10: "ARRAY -- Try converting to VARCHAR",
12: "TIME -- Try converting to VARCHAR",
}

Expand Down
33 changes: 32 additions & 1 deletion sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
from functools import wraps
from typing import List, Optional

from feast import usage
Expand All @@ -23,6 +24,26 @@
from feast.stream_feature_view import StreamFeatureView


def registry_proto_cache(func):
cache_key = None
cache_value = None

@wraps(func)
def wrapper(registry_proto: RegistryProto, project: str):
nonlocal cache_key, cache_value

key = tuple([id(registry_proto), registry_proto.version_id, project])

if key == cache_key:
return cache_value
else:
cache_value = func(registry_proto, project)
cache_key = key
return cache_value

return wrapper


def init_project_metadata(cached_registry_proto: RegistryProto, project: str):
new_project_uuid = f"{uuid.uuid4()}"
usage.set_current_project_uuid(new_project_uuid)
Expand Down Expand Up @@ -137,8 +158,9 @@ def get_validation_reference(
raise ValidationReferenceNotFound(name, project=project)


@registry_proto_cache
def list_feature_services(
registry_proto: RegistryProto, project: str, allow_cache: bool = False
registry_proto: RegistryProto, project: str
) -> List[FeatureService]:
feature_services = []
for feature_service_proto in registry_proto.feature_services:
Expand All @@ -147,6 +169,7 @@ def list_feature_services(
return feature_services


@registry_proto_cache
def list_feature_views(
registry_proto: RegistryProto, project: str
) -> List[FeatureView]:
Expand All @@ -157,6 +180,7 @@ def list_feature_views(
return feature_views


@registry_proto_cache
def list_request_feature_views(
registry_proto: RegistryProto, project: str
) -> List[RequestFeatureView]:
Expand All @@ -169,6 +193,7 @@ def list_request_feature_views(
return feature_views


@registry_proto_cache
def list_stream_feature_views(
registry_proto: RegistryProto, project: str
) -> List[StreamFeatureView]:
Expand All @@ -181,6 +206,7 @@ def list_stream_feature_views(
return stream_feature_views


@registry_proto_cache
def list_on_demand_feature_views(
registry_proto: RegistryProto, project: str
) -> List[OnDemandFeatureView]:
Expand All @@ -193,6 +219,7 @@ def list_on_demand_feature_views(
return on_demand_feature_views


@registry_proto_cache
def list_entities(registry_proto: RegistryProto, project: str) -> List[Entity]:
entities = []
for entity_proto in registry_proto.entities:
Expand All @@ -201,6 +228,7 @@ def list_entities(registry_proto: RegistryProto, project: str) -> List[Entity]:
return entities


@registry_proto_cache
def list_data_sources(registry_proto: RegistryProto, project: str) -> List[DataSource]:
data_sources = []
for data_source_proto in registry_proto.data_sources:
Expand All @@ -209,6 +237,7 @@ def list_data_sources(registry_proto: RegistryProto, project: str) -> List[DataS
return data_sources


@registry_proto_cache
def list_saved_datasets(
registry_proto: RegistryProto, project: str
) -> List[SavedDataset]:
Expand All @@ -219,6 +248,7 @@ def list_saved_datasets(
return saved_datasets


@registry_proto_cache
def list_validation_references(
registry_proto: RegistryProto, project: str
) -> List[ValidationReference]:
Expand All @@ -231,6 +261,7 @@ def list_validation_references(
return validation_references


@registry_proto_cache
def list_project_metadata(
registry_proto: RegistryProto, project: str
) -> List[ProjectMetadata]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,62 @@ CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_varchar_to_string_pro
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_varchar_to_string_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_varchar_to_list_string_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int32_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int64_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_float_to_list_double_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_number_to_int32_proto(df NUMBER)
RETURNS BINARY
LANGUAGE PYTHON
Expand Down
Loading
Loading