Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python UDF in Ingestion being used for feature validation #1234

Merged
merged 24 commits into from
Dec 22, 2020

Conversation

pyalex
Copy link
Collaborator

@pyalex pyalex commented Dec 16, 2020

What this PR does / why we need it:

This PR introduces experimental feature that enables using custom python code inside ingestion job. See #1230 for motivation.

Technical details:

  1. Python code being wrapped and pickled inside Python SDK.
  2. Pickled object being staged to one of supported storages (gs / s3)
  3. Reference to stored objects added to FeatureTable's labels
  4. IngestionJob currently only supports _validationUDF label and use provided code for feature validation. The code is being called right after reading data from source and before writing it to store. IngestionJob can optionally (set by flag) drop rows that do not pass validation.

Limitations:
Since python pickle happens on customer's machine and unpickle on Spark worker, there might be issues related to incompatibility of pickle protocols even customer's and worker's python versions are different. Ideally to avoid that the minor part of python versions should match (3.7, eg). However, it was confirmed by tests that lower version on SDK is fine (3.6 on SDK, 3.7 on worker), but not other way.

Which issue(s) this PR fixes:

Fixes #1230

Does this PR introduce a user-facing change?:

**experimental** ability to run custom python code inside streaming ingestion job

@pyalex
Copy link
Collaborator Author

pyalex commented Dec 17, 2020

/test test-end-to-end

@pyalex
Copy link
Collaborator Author

pyalex commented Dec 17, 2020

/test test-end-to-end-gcp

@pyalex
Copy link
Collaborator Author

pyalex commented Dec 17, 2020

/test test-end-to-end-gcp

@pyalex
Copy link
Collaborator Author

pyalex commented Dec 18, 2020

/test test-end-to-end-aws

@pyalex pyalex added the kind/feature New feature or request label Dec 18, 2020
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
@pyalex pyalex changed the title [WIP] Python UDF in Ingestion being used for feature validation Python UDF in Ingestion being used for feature validation Dec 18, 2020
@pyalex pyalex added area/ingestion The ingestion Beam component and storage-related items area/sdks labels Dec 18, 2020
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
@@ -332,6 +332,7 @@ def _stream_ingestion_step(
],
"Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"]
+ jars_args
+ ["--conf", "spark.yarn.isPython=true"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any downside of doing this? Do you know why it isn't it always set it to true in Spark?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's yarn specific and from what I found it's used mostly to enable distribution of python related stuff (like pyspark.zip) to yarn workers. It's being set by spark-submit when main file is py-file, which is not the case for our IngestionJob.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Maybe add a comment there for the future generations?

@@ -45,19 +45,19 @@ lint-java:
${MVN} --no-transfer-progress spotless:check

test-java:
${MVN} --no-transfer-progress test
${MVN} --no-transfer-progress -DskipITs=true test
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? (i'm not super familiar with our java test machinery, does it now skip some tests it didn't skip before?)

Copy link
Collaborator Author

@pyalex pyalex Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR: mvn test runs only unit tests, everything else should be skipped

we have two toggles skipITs and skipUTs- means skip Integration Tests and skip Unit Tests, we need them to run thoses test suites separately. Before there was no need in skipITs, since in maven pipeline test is before verify (used for IT), but in spark part we have some additional steps (generate-test-source phase) that are required only by integration tests, and don't needed by unit tests (see spark/ingestion/pom.xml). To skip those steps I added this flag here.

infra/scripts/build-ingestion-py-dependencies.sh Outdated Show resolved Hide resolved
from feast import Client, FeatureTable


GE_PACKED_ARCHIVE = "https://storage.googleapis.com/feast-jobs/spark/validation/pylibs-ge-%(platform)s.tar.gz"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work from distribution perspective? I worry that if this is not in any way tied to Feast version, we can't upgrade GE without breaking older versions of ingestion job.

Maybe we don't have to address this now, especially while this is contrib, but sometime down the road we probably need to pin the version of this tarball to a Feast version somehow.

Copy link
Collaborator Author

@pyalex pyalex Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that smells. But since feature is experimental I think it's ok for now.
As an option for future - we could put this archive inside ingestion jar or docker image (jobservice).

pyalex and others added 2 commits December 21, 2020 17:52
Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Copy link
Collaborator

@khorshuheng khorshuheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: khorshuheng, oavdeev, pyalex

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@feast-ci-bot feast-ci-bot merged commit 17edb99 into feast-dev:master Dec 22, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved area/ingestion The ingestion Beam component and storage-related items area/sdks kind/feature New feature or request lgtm release-note size/XXL
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Apply custom Python code during ingestion for data validation (Pandas UDF)
4 participants