-
Notifications
You must be signed in to change notification settings - Fork 994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python UDF in Ingestion being used for feature validation #1234
Changes from 22 commits
0e68ff9
b9ed047
063d8fc
903e743
2b86ea7
0ba6a4b
13ed5ab
366605f
bb72032
afbf8c2
b5efcd6
5a908e3
65a2b7b
e56dd9e
fb18c92
6b67e63
0fd3c3d
2d126c6
8d4e476
bf63033
54fcb6e
c348481
a6d272f
4aa0233
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
#!/usr/bin/env bash | ||
|
||
pyalex marked this conversation as resolved.
Show resolved
Hide resolved
|
||
PLATFORM=$1 | ||
DESTINATION=$2 | ||
PACKAGES=${PACKAGES:-"great-expectations==0.13.2 pyarrow==2.0.0"} | ||
|
||
tmp_dir=$(mktemp -d) | ||
|
||
pip3 install -t ${tmp_dir}/libs $PACKAGES | ||
|
||
cd $tmp_dir | ||
tar -czf pylibs-ge-$PLATFORM.tar.gz libs/ | ||
if [[ $DESTINATION == gs* ]]; then | ||
gsutil cp pylibs-ge-$PLATFORM.tar.gz $GS_DESTINATION | ||
else | ||
mv pylibs-ge-$PLATFORM.tar.gz $DESTINATION | ||
fi |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import io | ||
|
||
try: | ||
from pyspark import cloudpickle | ||
except ImportError: | ||
raise ImportError("pyspark must be installed to enable validation functionality") | ||
|
||
|
||
def serialize_udf(fun, return_type) -> bytes: | ||
buffer = io.BytesIO() | ||
command = (fun, return_type) | ||
cloudpickle.dump(command, buffer) | ||
return buffer.getvalue() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import io | ||
import json | ||
from typing import TYPE_CHECKING | ||
from urllib.parse import urlparse | ||
|
||
import pandas as pd | ||
|
||
from feast.constants import ConfigOptions | ||
from feast.contrib.validation.base import serialize_udf | ||
from feast.staging.storage_client import get_staging_client | ||
|
||
try: | ||
from great_expectations.core import ExpectationSuite | ||
from great_expectations.dataset import PandasDataset | ||
except ImportError: | ||
raise ImportError( | ||
"great_expectations must be installed to enable validation functionality. " | ||
"Please install feast[validation]" | ||
) | ||
|
||
try: | ||
from pyspark.sql.types import BooleanType | ||
except ImportError: | ||
raise ImportError( | ||
"pyspark must be installed to enable validation functionality. " | ||
"Please install feast[validation]" | ||
) | ||
|
||
|
||
if TYPE_CHECKING: | ||
from feast import Client, FeatureTable | ||
|
||
|
||
GE_PACKED_ARCHIVE = "https://storage.googleapis.com/feast-jobs/spark/validation/pylibs-ge-%(platform)s.tar.gz" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this work from distribution perspective? I worry that if this is not in any way tied to Feast version, we can't upgrade GE without breaking older versions of ingestion job. Maybe we don't have to address this now, especially while this is contrib, but sometime down the road we probably need to pin the version of this tarball to a Feast version somehow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know that smells. But since feature is experimental I think it's ok for now. |
||
_UNSET = object() | ||
|
||
|
||
class ValidationUDF: | ||
def __init__(self, name: str, pickled_code: bytes): | ||
self.name = name | ||
self.pickled_code = pickled_code | ||
|
||
|
||
def create_validation_udf(name: str, expectations: ExpectationSuite) -> ValidationUDF: | ||
""" | ||
Wraps your expectations into Spark UDF. | ||
|
||
Expectations should be generated & validated using training dataset: | ||
>>> from great_expectations.dataset import PandasDataset | ||
>>> ds = PandasDataset.from_dataset(you_training_df) | ||
>>> ds.expect_column_values_to_be_between('column', 0, 100) | ||
|
||
>>> expectations = ds.get_expectation_suite() | ||
|
||
Important: you expectations should pass on training dataset, only successful checks | ||
will be converted and stored in ExpectationSuite. | ||
|
||
Now you can create UDF that will validate data during ingestion: | ||
>>> create_validation_udf("myValidation", expectations) | ||
|
||
:param name | ||
:param expectations: collection of expectation gathered on training dataset | ||
:return: ValidationUDF with serialized code | ||
""" | ||
|
||
def udf(df: pd.DataFrame) -> pd.Series: | ||
ds = PandasDataset.from_dataset(df) | ||
result = ds.validate(expectations, result_format="COMPLETE") | ||
valid_rows = pd.Series([True] * df.shape[0]) | ||
|
||
for check in result.results: | ||
if check.success: | ||
continue | ||
|
||
if check.exception_info["raised_exception"]: | ||
# ToDo: probably we should mark all rows as invalid | ||
continue | ||
|
||
valid_rows.iloc[check.result["unexpected_index_list"]] = False | ||
|
||
return valid_rows | ||
|
||
pickled_code = serialize_udf(udf, BooleanType()) | ||
return ValidationUDF(name, pickled_code) | ||
|
||
|
||
def apply_validation( | ||
client: "Client", | ||
feature_table: "FeatureTable", | ||
udf: ValidationUDF, | ||
validation_window_secs: int, | ||
include_py_libs=_UNSET, | ||
): | ||
""" | ||
Uploads validation udf code to staging location & | ||
stores path to udf code and required python libraries as FeatureTable labels. | ||
""" | ||
include_py_libs = ( | ||
include_py_libs if include_py_libs is not _UNSET else GE_PACKED_ARCHIVE | ||
) | ||
|
||
staging_location = client._config.get(ConfigOptions.SPARK_STAGING_LOCATION).rstrip( | ||
"/" | ||
) | ||
staging_scheme = urlparse(staging_location).scheme | ||
staging_client = get_staging_client(staging_scheme) | ||
|
||
pickled_code_fp = io.BytesIO(udf.pickled_code) | ||
remote_path = f"{staging_location}/udfs/{udf.name}.pickle" | ||
staging_client.upload_fileobj( | ||
pickled_code_fp, f"{udf.name}.pickle", remote_uri=urlparse(remote_path) | ||
) | ||
|
||
feature_table.labels.update( | ||
{ | ||
"_validation": json.dumps( | ||
dict( | ||
name=udf.name, | ||
pickled_code_path=remote_path, | ||
include_archive_path=include_py_libs, | ||
) | ||
), | ||
"_streaming_trigger_secs": str(validation_window_secs), | ||
} | ||
) | ||
client.apply_feature_table(feature_table) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -332,6 +332,7 @@ def _stream_ingestion_step( | |
], | ||
"Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"] | ||
+ jars_args | ||
+ ["--conf", "spark.yarn.isPython=true"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any downside of doing this? Do you know why it isn't it always set it to true in Spark? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's yarn specific and from what I found it's used mostly to enable distribution of python related stuff (like pyspark.zip) to yarn workers. It's being set by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Maybe add a comment there for the future generations? |
||
+ ["--packages", BQ_SPARK_PACKAGE, jar_path] | ||
+ args, | ||
"Jar": "command-runner.jar", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? (i'm not super familiar with our java test machinery, does it now skip some tests it didn't skip before?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR: mvn test runs only unit tests, everything else should be skipped
we have two toggles
skipITs
andskipUTs
- meansskip Integration Tests
andskip Unit Tests
, we need them to run thoses test suites separately. Before there was no need inskipITs
, since in maven pipelinetest
is beforeverify
(used for IT), but in spark part we have some additional steps (generate-test-source
phase) that are required only by integration tests, and don't needed by unit tests (see spark/ingestion/pom.xml). To skip those steps I added this flag here.