Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Dask during historical retrieval #1954

Merged
merged 39 commits into from
Feb 16, 2022

Conversation

qooba
Copy link
Contributor

@qooba qooba commented Oct 17, 2021

What this PR does / why we need it:

Which issue(s) this PR fixes:

Fixes #1953

Does this PR introduce a user-facing change?:

The PR will add additional method to the FileRetrievalJob  to_dask_df() which will return dask DataFrame instead od pandas DataFrame. Thanks to this the historical features retrieval can be distributed across the cluster. Additioanly I have added the use_dask argument to the materialization functions which will also distribute the materialization process across the cluster. 

@feast-ci-bot
Copy link
Collaborator

Hi @qooba. Thanks for your PR.

I'm waiting for a feast-dev member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@codecov-commenter
Copy link

codecov-commenter commented Oct 18, 2021

Codecov Report

Merging #1954 (7799a67) into master (144f25c) will increase coverage by 0.03%.
The diff coverage is 97.14%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1954      +/-   ##
==========================================
+ Coverage   86.15%   86.18%   +0.03%     
==========================================
  Files         116      116              
  Lines        9820     9887      +67     
==========================================
+ Hits         8460     8521      +61     
- Misses       1360     1366       +6     
Flag Coverage Δ
integrationtests 75.11% <97.14%> (-0.04%) ⬇️
unittests 58.73% <87.61%> (+0.17%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...python/feast/infra/offline_stores/offline_store.py 83.33% <ø> (ø)
...python/tests/integration/e2e/test_universal_e2e.py 88.00% <50.00%> (ø)
sdk/python/feast/infra/offline_stores/file.py 96.93% <98.94%> (+0.29%) ⬆️
sdk/python/feast/infra/provider.py 89.68% <100.00%> (+0.51%) ⬆️
sdk/python/feast/wait.py 58.82% <0.00%> (-35.30%) ⬇️
sdk/python/feast/infra/utils/snowflake_utils.py 86.48% <0.00%> (-2.30%) ⬇️
sdk/python/feast/feature_server.py 30.55% <0.00%> (-1.03%) ⬇️
.../integration/online_store/test_universal_online.py 97.18% <0.00%> (-0.80%) ⬇️
sdk/python/feast/infra/online_stores/dynamodb.py 86.23% <0.00%> (-0.10%) ⬇️
sdk/python/feast/infra/online_stores/datastore.py 90.52% <0.00%> (-0.05%) ⬇️
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 144f25c...7799a67. Read the comment docs.

self,
end_date: datetime,
feature_views: Optional[List[str]] = None,
use_dask: bool = False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - instead of having this be a boolean flag, I think we should have a mode enum value which can take values of PANDAS or DASK, with a default value for PANDAS.

I think it would be better from a forwards compatibility (since we may want to have other engines, such as Dask on Ray, or Modin).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd want to keep this contract generic. Wouldn't it make more sense to have this functionality defined at a lower level (like the provider) and configured through the feature_store.yaml?

Or alternatively, what is the downside to just using Dask as the default option? Is the only downside that we have an extra dependency to manage?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of if Dask then do this, else do that. If more modes are added will that make this difficult to maintain? And how will all the different modes affect all the provider/store implementations?

Copy link
Member

@judahrand judahrand Oct 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make more sense to have this functionality defined at a lower level (like the provider) and configured through the feature_store.yaml?

If Dask is added as a return format I do think having the ability to dynamically choose whether to use it is important rather than it being defaulted to via config.

Dask is great for some parallelization jobs and for larger-than-memory datasets but it does have significant overhead and should not be considered the default choice for all Pandas-esque workloads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of if Dask then do this, else do that. If more modes are added will that make this difficult to maintain? And how will all the different modes affect all the provider/store implementations?

I agree. We should optimize for simplicity at this point. It's still not clear to me whether we need two modes.

If Dask is added as a return format I do think having the ability to dynamically choose whether to use it is important rather than it being defaulted to via config.

Honestly I would rather not hardcode dask into our contracts. It's an implementation detail. What do you see as the reason why somebody would use dask for some materializations, but not others? Assuming they can enable/disable dask in the config.

Copy link
Member

@woop woop Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick analysis @judahrand. Quick question, wouldn't npartitions=1 make Dask strictly worse than Pandas? Shouldn't the npartitions be based on the amount of cores available, or at least a larger number than 1?

And one might argue that "You're not using Dask right". But I don't think Feast should require the user to understand Dask to use it efficiently. Pandas is much more widely used (as much as I dislike its typing for anything where types are important) and so better understood by more users.

Sure, but the assumption here is that the user doesn't even know they are using Dask, and that we can configure it in such a way that it is an improved experience in 90% of cases.

For what it's worth I am not a strong proponent of taking this approach. I just feel like it's worth exploring.

Copy link
Member

@judahrand judahrand Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qooba But Dask's DataFrame is lazy isn't it? So you'll need to force it to actually do the computation you've asked it to do, won't you? You can do that with .persist(), I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question, wouldn't npartitions=1 make Dask strictly worse than Pandas?

I did play with this parameter and it didn't seem to change anything from a performance point of view.

from dask.dataframe.io.parquet.core import to_parquet
import numpy as np
import pandas as pd
import dask.dataframe as dd

rng = np.random.RandomState(42)
X = rng.rand(10000, 1000)

pandas_df = pd.DataFrame(X, columns=[str(_) for _ in range(X.shape[1])])
dask_df = dd.from_pandas(pandas_df, npartitions=4)

%timeit pandas_df * 100
%timeit dask_df * 100
4.64 ms ± 169 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
55.8 ms ± 228 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Copy link
Member

@judahrand judahrand Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but the assumption here is that the user doesn't even know they are using Dask, and that we can configure it in such a way that it is an improved experience in 90% of cases.

This would be a more reasonable approach but would behave differently to this PR currently I believe?

in this case if evaluation_engine is pandas:
store.get_historical_features(...).to_df()
will return Pandas dataframe
if dask
store.get_historical_features(...).to_df()
will return dask dataframe

I don't think that adding Dask as an invisible computation engine is a bad idea - and that might make sense to have as a global config value.

Copy link
Contributor Author

@qooba qooba Oct 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@woop @judahrand Thanks for comments. I have profiled the dask code and now I have achieved very promising results. I have also changed the approach and completely replaced pandas with dask without any switches. Following suggestions .to_df() will return computed pandas DataFrame to be transparent for the users.

And benchmark (on the same single machine with simple dask cluster):

#!/bin/bash

dask-scheduler --host 0.0.0.0 --port 8786 --bokeh-port 8787

dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8701
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8702
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8703
dask-worker --host 0.0.0.0 0.0.0.0:8786 --worker-port 8704

store.get_historical_features(...).to_df()
for current master branch:

CPU times: user 2min 51s, sys: 6.64 s, total: 2min 57s
Wall time: 2min 52s

for current feature/daskbranch (I have only checkout the branch without other code changes):

CPU times: user 8.18 s, sys: 2.3 s, total: 10.5 s
Wall time: 9.96 s

To reproduce you have to generate the fake dataset (all_data.parquet is about 1.2 GB):

import pandas as pd
import dask.dataframe as dd
import numpy as np
import dask.dataframe as dd
from datetime import datetime
from sklearn.datasets import make_hastie_10_2
def generate_entities(size):
    return np.random.choice(size, size=size, replace=False)

def generate_data(entities, year=2021, month=10, day=1) -> pd.DataFrame:
    n_samples=len(entities)
    X, y = make_hastie_10_2(n_samples=n_samples, random_state=0)
    df = pd.DataFrame(X, columns=["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9"])
    df["y"]=y
    df['entity_id'] = entities
    df['datetime'] = pd.to_datetime(
            np.random.randint(
                datetime(year, month, day, 0).timestamp(),
                datetime(year, month, day, 22).timestamp(),
                size=n_samples),
        unit="s"
    )
    df['created'] = pd.to_datetime(datetime.now())
    return df

entities=generate_entities(1000000)

all_data=[]

for d in range(1,15):
    data=generate_data(entities,month=1, day=d)
    #data=dd.from_pandas(data,npartitions=1)
    all_data.append(data)
        
all_dd=pd.concat(all_data)
all_dd.set_index('datetime')
all_dd.to_parquet("./all_data.parquet")
    
entity_df = pd.DataFrame(data=entities, columns=['entity_id'])
entity_df["event_timestamp"]=pd.to_datetime(
            np.random.randint(
                datetime(2021, 10, 1).timestamp(),
                datetime(2021, 10, 21).timestamp(),
                size=entity_df.size),
        unit="s"
    )

entity_df=entity_df[entity_df.entity_id > 500]
entity_df

feature_store.yaml

project: default
registry: registry.db
provider: local
online_store:
  type: redis
  connection_string: "redis:6379"

features.py

from google.protobuf.duration_pb2 import Duration
from feast import Entity, Feature, FeatureView, ValueType
from feast import FileSource
from feast.data_format import ParquetFormat

my_stats = FileSource(
    path="./all_data.parquet",
    event_timestamp_column="datetime",
    created_timestamp_column="created",
)

my_entity = Entity(name="entity_id", value_type=ValueType.INT64, description="entity id",)

mystats_view = FeatureView(
    name="my_statistics",
    entities=["entity_id"],
    ttl=Duration(seconds=1000*60*60*24*31),
    features=[
        Feature(name="f0", dtype=ValueType.FLOAT),
        Feature(name="f1", dtype=ValueType.FLOAT),
        Feature(name="f2", dtype=ValueType.FLOAT),
        Feature(name="f3", dtype=ValueType.FLOAT),
        Feature(name="f4", dtype=ValueType.FLOAT),
        Feature(name="f5", dtype=ValueType.FLOAT),
        Feature(name="f6", dtype=ValueType.FLOAT),
        Feature(name="f7", dtype=ValueType.FLOAT),
        Feature(name="f8", dtype=ValueType.FLOAT),
        Feature(name="f9", dtype=ValueType.FLOAT),
        Feature(name="y", dtype=ValueType.FLOAT),
    ],
    online=True,
    input=my_stats,
    tags={},
)

and finally fetch historical features:

%%time
from feast import FeatureStore
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(
    entity_df=entity_df, 
    feature_refs = [
        'my_statistics:f0',
        'my_statistics:f1',
        'my_statistics:f2',
        'my_statistics:f3',
        'my_statistics:f4',
        'my_statistics:f5',
        'my_statistics:f6',
        'my_statistics:f7',
        'my_statistics:f8',
        'my_statistics:f9',
        'my_statistics:y',
    ],
).to_df()

training_df

@felixwang9817
Copy link
Collaborator

Hey @qooba, thanks for this PR. We just fixed the issue with the linter that's been blocking all PRs for the last two days. Would you mind rebasing your changes on master and then force pushing your changes up? Thanks!

@qooba qooba force-pushed the feature/dask branch 5 times, most recently from d717d71 to 77bcdb5 Compare October 27, 2021 23:27
@woop woop assigned woop and unassigned tsotnet Nov 15, 2021
@@ -305,26 +404,29 @@ def evaluate_offline_job():
else [event_timestamp_column]
)

source_df.sort_values(by=ts_columns, inplace=True)
# source_df = source_df.sort_values(by=ts_columns[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is happening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've commented this during profiling (I wanted to check how sorting affects the performance) and forgotten to uncomment. Fixed - this sort is important because in the next step we drop_duplicates with keep="last". The dask limitation here is that we can sort only by one column in this case we have to sort by event_timestamp_column rather than [event_timestamp_column, created_timestamp_column] which is still correct.

Copy link
Member

@woop woop Dec 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qooba we currently use created_timestamp_column in order to deduplicate rows with duplicate event timestamps. We use the latest created_timestamp_column in that case. This is often useful in append only logs where some failure has occurred when writing feature values, since it allows a user to just reingest new rows with the same event_timestamp, but with fixed feature values.

If we don't use created timestamp for sorting then we lose the guarantee of only returning the latest event_timestamp value by created_timestamp. Wouldn't it make more sense to first sort by created_timestamp and then event_timestamp in order to have determinism in the output and also consistency with our other offline store implementations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, still some failures. Will wait for tests to pass then review :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, those failures may be unrelated to you. We are looking into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed some related to my changes but still there is something with datetimes, anyway I will check :)

Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
Signed-off-by: qooba <dev@qooba.net>
@qooba
Copy link
Contributor Author

qooba commented Feb 16, 2022

Thanks @qooba! I'm testing this feature branch. Let me know if I can help with anything.

How should we handle the timedelta(0) != 0 edge case?

Fixed :) thanks for finding.

@woop and @qooba, I believe this is a subject for another PR, but let me know if it makes sense:

You are absolutely right. Partitioning will be very important in terms of optimization. But partitioning is strongly associated with datalake design (https://docs.dask.org/en/latest/generated/dask.dataframe.read_parquet.html).

I was trying to optimize the queries using date_partition_column as partition filters in read_parquet() but could be any column in partition_columns: List[str].

Optimizing using event_timestamp by default did not work because Pyarrow did convert date partition columns to category data type and I could not use <=, >= operators to compare str and datetime types.

I end up using the date_partition_column and <=, >= filters with another column derivated from event_timestamp like year.

In fact, this could be supported out of the box:

  • Set the partition_columns in the data source (same as partition_on from to_parquet)
  • Query by partition_columns like event_timestamp
  • Apply <=, >= filters for each partition column using conjunction

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: qooba, woop

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@woop
Copy link
Member

woop commented Feb 16, 2022

/lgtm

@woop
Copy link
Member

woop commented Feb 16, 2022

Thanks @qooba. This is a super valuable addition to Feast!

@feast-ci-bot feast-ci-bot merged commit f1a54a9 into feast-dev:master Feb 16, 2022
aht added a commit to aht/feast that referenced this pull request Feb 23, 2022
…ents

* upstream/master: (103 commits)
  refactor: Move create cluster script
  ci: Add validation for upper case PR titles
  Switch order of views to keep/delete to fix apply logging
  ci: Fix broken Prow plugin which ignores config.yaml
  ci: Add PR linting to ensure conventional commits are followed for PR titles
  Add owner field to FeatureService (feast-dev#2321)
  Disable Redis cluster tests (feast-dev#2327)
  Use PR title for commit message
  Add redis cluster initialziation to master github workflow (feast-dev#2317)
  Fix materialize bug with RedisCluster (feast-dev#2311)
  Add support for Dask during historical retrieval (feast-dev#1954)
  downgrade datatypes to 8
  Update pom.xml to 0.18.2 SNAPSHOT
  Update changelog for 0.18.1 and update pom.xml
  Update helm chart version
  Don't require `snowflake` to always be installed (feast-dev#2309)
  Fixing the Java helm charts and adding a demo tutorial on how to use them (feast-dev#2298)
  Include infra objects in registry dump and fix Infra's from_proto  (feast-dev#2295)
  Remove old flag warning with the python feature server (feast-dev#2300)
  podAnnotations Values in the feature-server chart (feast-dev#2304)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Distribute Feast with Dask
10 participants