Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyspark job for feature batch retrieval #1021

Merged
merged 6 commits into from
Oct 7, 2020

Conversation

khorshuheng
Copy link
Collaborator

@khorshuheng khorshuheng commented Sep 30, 2020

What this PR does / why we need it:
This is the prerequisite to use PySpark for batch retrieval job instead of BQ Client / Feast Batch Serving.

Which issue(s) this PR fixes:
This PR contains a standalone pyspark job (i.e. has not Feast or other external dependencies) to perform batch feature retrieval. Feast users are not expected to use this pyspark script directly. Rather, Feast SDK will be responsible for submitting the pyspark job to a spark cluster, which will be implemented in a separate PR.

Does this PR introduce a user-facing change?:

NONE

join_keys: List[str],
feature_table: DataFrame,
features: List[str],
feature_prefix: str = "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind explaining this please?

conf (Dict):
Configuration for the retrieval job, in json format. Sample configuration as follows:

sample_conf = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khorshuheng
Copy link
Collaborator Author

/test test-end-to-end-batch-dataflow

@khorshuheng khorshuheng added the kind/feature New feature or request label Oct 2, 2020
) -> DataFrame:
"""Perform an as of join between entity and feature table, given a maximum age tolerance.
Join conditions:
1. Join keys values match.
Copy link
Member

@woop woop Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we talk about entities here? Or are you keeping it generic?

2. Feature event timestamp is the closest match possible to the entity event timestamp,
but must not be more recent than the entity event timestamp, and the difference must
not be greater than max_age, unless max_age is not specified.
3. If more than one feature table rows satisfy condition 1 and 2, feature row with the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there is no match?

features (List[str]):
The feature columns which should be present in the result dataframe.
feature_prefix (str):
Feature column prefix for the result dataframe.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain WHY this would need to be used?

pass


def verify_schema(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here

@@ -0,0 +1,2 @@
id,event_timestamp
1001,2020-09-02T00:00:00.000
Copy link
Member

@woop woop Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add a few more rows, like 5-10. One row is asking for trouble. Same for rest

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even better to generate thousands to test how multiple partitions can affect result

Copy link
Member

@woop woop Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, using code to generate data isn't a bad idea, as long as we avoid too much rand() without a seed.

The feature columns which should be present in the result dataframe.
feature_prefix (str):
Feature column prefix for the result dataframe.
max_age (str):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's str? what's the format?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the Pyspark interval expression. For example, 13 hour, 6 day, 60 second, 3 month. I can also change this to integer type instead of string, in which case the max age would be in seconds, similar to the current behaviour. Just thought that using a string might be more convenient for users, since they don't need to convert days / months to seconds, though that would be a breaking change. So should i use seconds and integer instead?

@pyalex
Copy link
Collaborator

pyalex commented Oct 5, 2020

@khorshuheng I think we need to add minimum lower boundary
min(event_timestamp) of all entities - max_age
and apply this as default filter to feature_table. Otherwise there's no filters pushed to BQ, and we load too much unnecessary data

assert_dataframe_equal(joined_df, expected_joined_df)


def test_entity_filter(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what is tested here. Description would be helpful

@khorshuheng khorshuheng force-pushed the spark-retrieval branch 3 times, most recently from d1b9392 to 71e7578 Compare October 5, 2020 14:17
@khorshuheng
Copy link
Collaborator Author

@khorshuheng I think we need to add minimum lower boundary
min(event_timestamp) of all entities - max_age
and apply this as default filter to feature_table. Otherwise there's no filters pushed to BQ, and we load too much unnecessary data

While i agree with that, i am not sure how should the minimum lower boundary be specified. Should it be hard coded within the SDK? If not, where should we retrieve this lower boundary?

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
@khorshuheng khorshuheng force-pushed the spark-retrieval branch 2 times, most recently from 1c19162 to 43a81aa Compare October 6, 2020 08:57
…rge dataframe

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: khorshuheng, pyalex

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@pyalex
Copy link
Collaborator

pyalex commented Oct 7, 2020

/lgtm

@feast-ci-bot feast-ci-bot merged commit d1807c9 into feast-dev:master Oct 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants