From 28a3d24b12b35e4154df2bfd66dedb80bcfa3292 Mon Sep 17 00:00:00 2001 From: Abdul Hameed Date: Thu, 13 Jun 2024 16:45:58 -0400 Subject: [PATCH] feat: Remote offline Store (#4262) * feat: Added offline store remote deployment functionly using arrow flight server and client Signed-off-by: Abdul Hameed * Initial functional commit for remote get_historical_features Signed-off-by: Abdul Hameed * remote offline store example Signed-off-by: Abdul Hameed * removing unneeded test code and fixinf impotrts Signed-off-by: Abdul Hameed * call do_put only once, postpone the invocation of do_put and simplified _make_flight_info Signed-off-by: Abdul Hameed * added primitive parameters to the command descriptor Signed-off-by: Abdul Hameed * removed redundant param Signed-off-by: Abdul Hameed * Initial skeleton of unit test for offline server Signed-off-by: Abdul Hameed * added unit test for offline store remote client Signed-off-by: Abdul Hameed * testing all offlinestore APIs Signed-off-by: Abdul Hameed * integrated comments Signed-off-by: Abdul Hameed * Updated remote offline server readme with the capability to init with an environment variable Signed-off-by: Theodor Mihalache Signed-off-by: Abdul Hameed * added RemoteOfflineStoreDataSourceCreator, use feature_view_names to transfer feature views and remove dummies Signed-off-by: Abdul Hameed * added missing CI requirement Signed-off-by: Abdul Hameed * fixed linter Signed-off-by: Abdul Hameed * fixed multiprocess CI requirement Signed-off-by: Abdul Hameed * feat: Added offline store remote deployment functionly using arrow flight server and client Signed-off-by: Abdul Hameed * fix test errors Signed-off-by: Abdul Hameed * managing feature view aliases and restored skipped tests Signed-off-by: Abdul Hameed * fixced linter issue Signed-off-by: Abdul Hameed * fixed broken test Signed-off-by: Abdul Hameed * added supported deployment modes using helm chart for online (default), offline, ui and registry Signed-off-by: Abdul Hameed * updated the document for offline remote server Signed-off-by: Abdul Hameed * added the document for remote offline server Signed-off-by: Abdul Hameed * rebase and fix conflicts Signed-off-by: Abdul Hameed * feat: Added offline store remote deployment functionly using arrow flight server and client Signed-off-by: Abdul Hameed * added unit test for offline store remote client Signed-off-by: Abdul Hameed * added RemoteOfflineStoreDataSourceCreator, use feature_view_names to transfer feature views and remove dummies Signed-off-by: Abdul Hameed * feat: Added offline store remote deployment functionly using arrow flight server and client Signed-off-by: Abdul Hameed * Added missing remote offline store apis implementation Signed-off-by: Theodor Mihalache Signed-off-by: Abdul Hameed * Fixed tests Signed-off-by: Theodor Mihalache Signed-off-by: Abdul Hameed * Implemented PR change proposal Signed-off-by: Theodor Mihalache Signed-off-by: Abdul Hameed * Implemented PR change proposal Signed-off-by: Theodor Mihalache Signed-off-by: Abdul Hameed * updated example readme file Signed-off-by: Abdul Hameed * Implemented PR change proposal Signed-off-by: Theodor Mihalache Signed-off-by: Abdul Hameed * fixing the integration tests Signed-off-by: Abdul Hameed * Fixed OfflineServer teardown Signed-off-by: Theodor Mihalache * updated the document for remote offline feature server and client Signed-off-by: Abdul Hameed * Implemented PR change proposal Signed-off-by: Theodor Mihalache --------- Signed-off-by: Abdul Hameed Signed-off-by: Theodor Mihalache Co-authored-by: Daniele Martinoli <86618610+dmartinol@users.noreply.github.com> Co-authored-by: Theodor Mihalache Co-authored-by: Theodor Mihalache <84387487+tmihalac@users.noreply.github.com> --- docs/SUMMARY.md | 3 + docs/reference/feature-servers/README.md | 4 + .../feature-servers/offline-feature-server.md | 35 ++ .../offline-stores/remote-offline-store.md | 28 ++ examples/remote-offline-store/README.md | 98 +++++ .../offline_client/__init__.py | 0 .../offline_client/feature_store.yaml | 10 + .../offline_client/test.py | 40 ++ .../offline_server/__init__.py | 0 .../offline_server/feature_repo/__init__.py | 0 .../feature_repo/data/driver_stats.parquet | Bin 0 -> 35102 bytes .../feature_repo/data/online_store.db | Bin 0 -> 28672 bytes .../feature_repo/example_repo.py | 140 ++++++ .../feature_repo/feature_store.yaml | 9 + infra/charts/feast-feature-server/README.md | 14 +- .../feast-feature-server/README.md.gotmpl | 13 +- .../templates/deployment.yaml | 35 +- .../templates/service.yaml | 2 +- infra/charts/feast-feature-server/values.yaml | 3 + sdk/python/feast/cli.py | 29 ++ sdk/python/feast/constants.py | 3 + sdk/python/feast/feature_store.py | 6 + .../feast/infra/offline_stores/remote.py | 407 ++++++++++++++++++ sdk/python/feast/offline_server.py | 332 ++++++++++++++ sdk/python/feast/repo_config.py | 1 + sdk/python/feast/templates/local/bootstrap.py | 1 + .../local/feature_repo/example_repo.py | 5 + sdk/python/tests/conftest.py | 5 +- .../feature_repos/repo_configuration.py | 2 + .../universal/data_sources/file.py | 79 +++- .../offline_store/test_feature_logging.py | 13 +- .../test_universal_historical_retrieval.py | 45 +- .../offline_store/test_validation.py | 24 +- .../offline_stores/test_offline_store.py | 34 ++ sdk/python/tests/unit/test_offline_server.py | 250 +++++++++++ sdk/python/tests/utils/http_server.py | 6 +- 36 files changed, 1636 insertions(+), 40 deletions(-) create mode 100644 docs/reference/feature-servers/offline-feature-server.md create mode 100644 docs/reference/offline-stores/remote-offline-store.md create mode 100644 examples/remote-offline-store/README.md create mode 100644 examples/remote-offline-store/offline_client/__init__.py create mode 100644 examples/remote-offline-store/offline_client/feature_store.yaml create mode 100644 examples/remote-offline-store/offline_client/test.py create mode 100644 examples/remote-offline-store/offline_server/__init__.py create mode 100644 examples/remote-offline-store/offline_server/feature_repo/__init__.py create mode 100644 examples/remote-offline-store/offline_server/feature_repo/data/driver_stats.parquet create mode 100644 examples/remote-offline-store/offline_server/feature_repo/data/online_store.db create mode 100644 examples/remote-offline-store/offline_server/feature_repo/example_repo.py create mode 100644 examples/remote-offline-store/offline_server/feature_repo/feature_store.yaml create mode 100644 sdk/python/feast/infra/offline_stores/remote.py create mode 100644 sdk/python/feast/offline_server.py create mode 100644 sdk/python/tests/unit/test_offline_server.py diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 2e205dee0a..af6362da3e 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -85,6 +85,7 @@ * [PostgreSQL (contrib)](reference/offline-stores/postgres.md) * [Trino (contrib)](reference/offline-stores/trino.md) * [Azure Synapse + Azure SQL (contrib)](reference/offline-stores/mssql.md) + * [Remote Offline](reference/offline-stores/remote-offline-store.md) * [Online stores](reference/online-stores/README.md) * [Overview](reference/online-stores/overview.md) * [SQLite](reference/online-stores/sqlite.md) @@ -117,6 +118,8 @@ * [Python feature server](reference/feature-servers/python-feature-server.md) * [\[Alpha\] Go feature server](reference/feature-servers/go-feature-server.md) * [\[Alpha\] AWS Lambda feature server](reference/feature-servers/alpha-aws-lambda-feature-server.md) + * [Offline Feature Server](reference/feature-servers/offline-feature-server) + * [\[Beta\] Web UI](reference/alpha-web-ui.md) * [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md) * [\[Alpha\] Data quality monitoring](reference/dqm.md) diff --git a/docs/reference/feature-servers/README.md b/docs/reference/feature-servers/README.md index f9a40104c3..d5a4312f73 100644 --- a/docs/reference/feature-servers/README.md +++ b/docs/reference/feature-servers/README.md @@ -12,4 +12,8 @@ Feast users can choose to retrieve features from a feature server, as opposed to {% content-ref url="alpha-aws-lambda-feature-server.md" %} [alpha-aws-lambda-feature-server.md](alpha-aws-lambda-feature-server.md) +{% endcontent-ref %} + +{% content-ref url="offline-feature-server.md" %} +[offline-feature-server.md](offline-feature-server.md) {% endcontent-ref %} \ No newline at end of file diff --git a/docs/reference/feature-servers/offline-feature-server.md b/docs/reference/feature-servers/offline-feature-server.md new file mode 100644 index 0000000000..6c2fdf7a25 --- /dev/null +++ b/docs/reference/feature-servers/offline-feature-server.md @@ -0,0 +1,35 @@ +# Offline feature server + +## Description + +The Offline feature server is an Apache Arrow Flight Server that uses the gRPC communication protocol to exchange data. +This server wraps calls to existing offline store implementations and exposes interfaces as Arrow Flight endpoints. + +## How to configure the server + +## CLI + +There is a CLI command that starts the Offline feature server: `feast serve_offline`. By default, remote offline server uses port 8815, the port can be overridden with a `--port` flag. + +## Deploying as a service on Kubernetes + +The Offline feature server can be deployed using helm chart see this [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-feature-server). + +User need to set `feast_mode=offline`, when installing Offline feature server as shown in the helm command below: + +``` +helm install feast-offline-server feast-charts/feast-feature-server --set feast_mode=offline --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +``` + +## Server Example + +The complete example can be find under [remote-offline-store-example](../../../examples/remote-offline-store) + +## How to configure the client + +Please see the detail how to configure offline store client [remote-offline-store.md](../offline-stores/remote-offline-store.md) + +## Functionality Matrix + +The set of functionalities supported by remote offline stores is the same as those supported by offline stores with the SDK, which are described in detail [here](../offline-stores/overview.md#functionality). + diff --git a/docs/reference/offline-stores/remote-offline-store.md b/docs/reference/offline-stores/remote-offline-store.md new file mode 100644 index 0000000000..0179e0f06f --- /dev/null +++ b/docs/reference/offline-stores/remote-offline-store.md @@ -0,0 +1,28 @@ +# Remote Offline Store + +## Description + +The Remote Offline Store is an Arrow Flight client for the offline store that implements the `RemoteOfflineStore` class using the existing `OfflineStore` interface. +The client implements various methods, including `get_historical_features`, `pull_latest_from_table_or_query`, `write_logged_features`, and `offline_write_batch`. + +## How to configure the client + +User needs to create client side `feature_store.yaml` file and set the `offline_store` type `remote` and provide the server connection configuration +including adding the host and specifying the port (default is 8815) required by the Arrow Flight client to connect with the Arrow Flight server. + +{% code title="feature_store.yaml" %} +```yaml +offline_store: + type: remote + host: localhost + port: 8815 +``` +{% endcode %} + +## Client Example + +The complete example can be find under [remote-offline-store-example](../../../examples/remote-offline-store) + +## How to configure the server + +Please see the detail how to configure offline feature server [offline-feature-server.md](../feature-servers/offline-feature-server.md) \ No newline at end of file diff --git a/examples/remote-offline-store/README.md b/examples/remote-offline-store/README.md new file mode 100644 index 0000000000..c07d7f3041 --- /dev/null +++ b/examples/remote-offline-store/README.md @@ -0,0 +1,98 @@ +# Feast Remote Offline Store Server + +This example demonstrates the steps using an [Arrow Flight](https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/) server/client as the remote Feast offline store. + +## Launch the offline server locally + +1. **Create Feast Project**: Using the `feast init` command for example the [offline_server](./offline_server) folder contains a sample Feast repository. + +2. **Start Remote Offline Server**: Use the `feast server_offline` command to start remote offline requests. This command will: + - Spin up an `Arrow Flight` server at the default port 8815. + +3. **Initialize Offline Server**: The offline server can be initialized by providing the `feature_store.yml` file via an environment variable named `FEATURE_STORE_YAML_BASE64`. A temporary directory will be created with the provided YAML file named `feature_store.yml`. + +Example + +```console +cd offline_server +feast -c feature_repo apply +``` + +```console +feast -c feature_repo serve_offline +``` + +Sample output: +```console +Serving on grpc+tcp://127.0.0.1:8815 +``` + +## Launch a remote offline client + +The [offline_client](./offline_client) folder includes a test python function that uses an offline store of type `remote`, leveraging the remote server as the +actual data provider. + + +The test class is located under [offline_client](./offline_client/) and uses a remote configuration of the offline store to delegate the actual +implementation to the offline store server: +```yaml +offline_store: + type: remote + host: localhost + port: 8815 +``` + +The test code in [test.py](./offline_client/test.py) initializes the store from the local configuration and then fetches the historical features +from the store like any other Feast client, but the actual implementation is delegated to the offline server +```py +store = FeatureStore(repo_path=".") +training_df = store.get_historical_features(entity_df, features).to_df() +``` + + +Run client +`cd offline_client; + python test.py` + +Sample output: + +```console +config.offline_store is +----- Feature schema ----- + + +RangeIndex: 3 entries, 0 to 2 +Data columns (total 10 columns): + # Column Non-Null Count Dtype +--- ------ -------------- ----- + 0 driver_id 3 non-null int64 + 1 event_timestamp 3 non-null datetime64[ns, UTC] + 2 label_driver_reported_satisfaction 3 non-null int64 + 3 val_to_add 3 non-null int64 + 4 val_to_add_2 3 non-null int64 + 5 conv_rate 3 non-null float32 + 6 acc_rate 3 non-null float32 + 7 avg_daily_trips 3 non-null int32 + 8 conv_rate_plus_val1 3 non-null float64 + 9 conv_rate_plus_val2 3 non-null float64 +dtypes: datetime64[ns, UTC](1), float32(2), float64(2), int32(1), int64(4) +memory usage: 332.0 bytes +None + +----- Features ----- + + driver_id event_timestamp label_driver_reported_satisfaction ... avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2 +0 1001 2021-04-12 10:59:42+00:00 1 ... 590 1.022378 10.022378 +1 1002 2021-04-12 08:12:10+00:00 5 ... 974 2.762213 20.762213 +2 1003 2021-04-12 16:40:26+00:00 3 ... 127 3.419828 30.419828 + +[3 rows x 10 columns] +------training_df---- + driver_id event_timestamp label_driver_reported_satisfaction ... avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2 +0 1001 2021-04-12 10:59:42+00:00 1 ... 590 1.022378 10.022378 +1 1002 2021-04-12 08:12:10+00:00 5 ... 974 2.762213 20.762213 +2 1003 2021-04-12 16:40:26+00:00 3 ... 127 3.419828 30.419828 + +[3 rows x 10 columns] +``` + diff --git a/examples/remote-offline-store/offline_client/__init__.py b/examples/remote-offline-store/offline_client/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/remote-offline-store/offline_client/feature_store.yaml b/examples/remote-offline-store/offline_client/feature_store.yaml new file mode 100644 index 0000000000..24ee5d7042 --- /dev/null +++ b/examples/remote-offline-store/offline_client/feature_store.yaml @@ -0,0 +1,10 @@ +project: offline_server +# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry) +registry: ../offline_server/feature_repo/data/registry.db +# The provider primarily specifies default offline / online stores & storing the registry in a given cloud +provider: local +offline_store: + type: remote + host: localhost + port: 8815 +entity_key_serialization_version: 2 diff --git a/examples/remote-offline-store/offline_client/test.py b/examples/remote-offline-store/offline_client/test.py new file mode 100644 index 0000000000..172ee73bf0 --- /dev/null +++ b/examples/remote-offline-store/offline_client/test.py @@ -0,0 +1,40 @@ +from datetime import datetime +from feast import FeatureStore +import pandas as pd + +entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } +) + +features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", +] + +store = FeatureStore(repo_path=".") + +training_df = store.get_historical_features(entity_df, features).to_df() + +print("----- Feature schema -----\n") +print(training_df.info()) + +print() +print("----- Features -----\n") +print(training_df.head()) + +print("------training_df----") + +print(training_df) diff --git a/examples/remote-offline-store/offline_server/__init__.py b/examples/remote-offline-store/offline_server/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/remote-offline-store/offline_server/feature_repo/__init__.py b/examples/remote-offline-store/offline_server/feature_repo/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/remote-offline-store/offline_server/feature_repo/data/driver_stats.parquet b/examples/remote-offline-store/offline_server/feature_repo/data/driver_stats.parquet new file mode 100644 index 0000000000000000000000000000000000000000..19279202d84a63a921306eb5fcd8b88603fb54b6 GIT binary patch literal 35102 zcmb5#c|29o|0w)J=AkqYG8__;B*}R8=X1=nq|DL@nVKX?nhs?iGGvwx6;i1X(sal? zHJ3&zQ>my_Xu9ir@9(~z`#kqPuh;YV!~3lL+Iu^DpR?Ectk3#zGI3rl$tRhqW(yKBL5eM&W?q?I(iRlW*h;-8?|9B#I_Df|FkyWGoC6!3qzCk~O$Wb|V=me4Z zs`gGUaWH#WxRA(plyWL2(gm$@N{HOs`(BgUAfzs`d~E6~Ao1 zOJw&e)jS~5Gn{806S+1=@fnfDTO0e5NV`7NJwW8_kzDYO$keoSA0ZA7MHIg$vQL%| zeI(Ld`!yzs+_}QLzY$sOhV?&)Gm8~W+4?O*-OB`Gyw8ntQ{%Ek@m`Fbxd>e?|O$Gd> zM3z*yoduD0_g98BkrS`kZ%1S@Hz}+m4o;`~I}_PgZdJMv=^+!7?nJIKUEiC?df{|v zGm&;8`i?J=V_z*CKxFd2TNg|myty!EJCPk>{&E+QzG|=X9wK-2Y)}M|#d%V-pGfl% z{C<$gnQvgsCNjH%qK^^>WAfYMh-~97@gyRBYIZ{^k$Yi9K{}DO)q60DNK;5v%_VZ4 z+}vJ3WFG%qeUdn6r8vVOvT06uhDg80jx8s0cVFqQB(gN#&aWcU-YsyyLgW;h71t7( z8^ed{iGyNiHLerct&c-*5$T5o>YIpMJ^eo|MAk&0SqG6;k(by_?HD3nKT|4XptptM*g)TOw_T!u4SyXW2TQ_eAFFBUT@YgE^N| zCy4BIZytUn(uEet{vdL>rmPtvYyY0I-$a_unUDX7oDYw5=5&+ePxJ2=AP#!z-4-UY z7X|W*6Y0IVb`nHxY)8gIA`51oN)c&4mn+B;IhQ>B<%!IoWHIa7X(mGutCwwqlpU7M}|D_ReaM(l{i0tC9AQK|p?R1qn zkvs3vlogTH!Dnnoq(!fdb|i8P1KQUUnO}3nU5JB~?Hk;QY`^JzFCtxT`JfMx`^ZDp zm&i&@+P;-Yv%Fp%MCAM(pWa4f*3028;@~ca*f1hn{c!hQBK=L({QX33{(##-BFkB# z_z00EYBCf<*_XKDeFe&oK96p^dX8!ba*P3pAE{pbAW{KV;`&DnBd!%`wswl!as zIQZcEz;YrxSygo8!n=bt7q%D!#BtGOoWJQ6-dLnJjuF#D{&iJW1HzM=Qz2Bb1L2n*2A0k^) zCvgjr-sgMQpU92Nk_;lUu*GZJf6hNWRkD+`IhU7>?ItpVUA6WS2bJQ(BZ=(i^^FIJ z^sF)35hB-4#wv!$;dy8M)14Uzj; zrLvC5N^_aK_Mh`x#p&H7ZO*^CgIprBVYK-UaWGU`xShyeZnLh7NPoLOyO+o^YoJARqBAB$a*z$ zOo&L!S==p329K3z*q$H6Y`TU(Uk-l0~LypLOZyc&fWR-5OQzp{9 zPX1m-LUEAZeG?8whJb8@B{o|~cKxEY&J(x_S?W%1~BXZP+g))iELCJO5#KAnv z>^vfS{k|84M7l_Y(kUXhxj(R!hb?wMxautV{gZUn?;31QTw+yKbZyVr zgL7W(RVT0Sn|#|@XmzRh=7H(2?_acEI(7R9ZH|bX^<_@eQK5wz-W``q?!-$hwJox~ zQremO zo4Lp4uZgplSrd5g%=(%*hb8*E23s=L#5>9x@0%RDUlYGh5s%RL7w9H9DVfDfEOV|+ zSijWj$Z@(wrf#D1GP_*utq*DwH>j;U#f)9BHfiJX4dqT}ov$XjtZ=);YRz1m?7Gsc zK6LoO)nvC-n{TuE7wVeDtE@0%EG&C);aLml`zIsCBx_+~uvk0!9lAf07$RB*oP#)kAQX7Tfs zEKe9@_*x_{ULEkLA;Zt=xS~niBEw97n=G~UWgD+$Zneu@wWaNZVOD@cq5kfXN7u3f z*PX&>0ZF41LF-RjWvIF|o(SGhzOKYF+bBE4<-FUKfX9v5+uSZ~ZiO?MRT=$ zZn1@z28iYB1oUxiP6vn=>g{~ZOFiBDgUU1enM{7&hnc#)xj>QlyvMglLIp;FR65F#OlILBke43J$ZIim_S-0~| z*V%20FZtX$%p;(lfWBgrWYrzAS3{muZRsxCv7~s!&HYCGkw zg^l!;*p=^;zaH`VbyKq1E`^&>KR&+| zsP9&8O_EyW+f%W7X?xlc1&k3}9;VWnsk*Mz{#@9y?i{Vn%_+H;fVdokEn(8kCnBlo%}qDCBQRHh+bgPwMfI|f_GDul{QGRd;+mDStv@<#MRt3ElSy&qpTWm(--fJ483^DMHyM>mZXlQ%tK zeP7u!RgQ{TWb;79Hd}L3^9h@Ws*Wcahj=AzAE`T^wLfW^ZTncm^`iI97)iS)nx0p; zziQ65d#dGg^WYy|Df@nH-_{g)vmEt2H^0Y-+$&vI}I{p_{9~>^PJw8#)-?r zG3oVh&66Zo`n2S&e`lE{PaWoyaUQbHT;@=0p6@(to3nD$tz$A9M(hjL?s(mjzhTs| zm^$>APj=&&Q>jI&f6tm|=^T?|A~HtD z)H6ygQe+LI_?Bm$u{ojP(RgfmR+&SQ(f(OJjT05?&)DW0#cE_%x?J$Q7RA3J=YmIV z$g{@S6}eTLZbZ$@@@wW@-qMmJV;rZMU%j=b!0>>;%7WV9N0lDe<5m{d?ReR^|CfMP zQN!+`u6*Not&@#=KR&;9KyX#@jr~&}o?VY$b?Vlk-?KBn1hqNbXg*O{MuK)pbFA1B zqk}@LOIs2bt?;~&u=;dcs+@7uZy}vC9T_Ti1&lu!5~BXLc6 z-$}-S+26vt757TalVzzS-E$Al+7}uf5?NdM=-kHho;Q=$o_})D`+C$L5xoojSNyvR zsN@`}l{D2ZM>$D8nYAyl2>E&c zUy{RgB=!DpkqtTEzyC-J{m-5L6WRRlTggr`tpDsw!kRe`_-POLc^>fdKHw*>`S^HM ztDOI@D;D+-3y#Tf4ijuW}q|NYeeFMsg=c(G%ll3zpZB@5*K z`}NiuOs$dIHr@-|2oI!gxq>6V9hE1@M8(Pgq;@R8_<4D-v_TJae>vk*?PxgqPzein z?m+$r8}Pv5aO^B)!vRGh;N-8wo0(0}|0V@?Z+-w__QrVXrY9O}+yQ;=Dd@F1LtPp2 z#VH?s%&qjm=`uqM4NjrrH*mqTY64WnccA8|4xT<73}%;(;INM`>|^)90}4v z+ADDLT@9SroJbkY`s2!&ILPlFg2q}QOjPXvMU!jrd29*zhVz4E))ut+EkYfTaYg-6 zAq+kp2j7~XQ>-2ZbbRE3XVe{WWz_;Kwm(dLxR(jgy$d95qTqR>D>i*xjOxmc7`dp0 zQYl}B6HXtX{`eOd)k%k_RnkTMIQ@o zpFXG56L+EPu55UGR1|sj_uAR52?8`BgUfWW0`gKeZV3 zkEl_}avoT}Wi?d(c}`U*xZ(Z$L&$9RWW@1VQDWz-VS33oYW)p8Y(JupV^VHdr6U3N z=!RIe#S8!C>Y%!u9i0C>2YGwuV82fe=)^;93!{aWvb|?nAWWw>Cw=DJaB|ms) z9H&wb8Q{Z>JJ3>I1m0fcrPl6UiQHwgFtKPUKFRt6AC7xdNd{w}{Av@m!rkksj7^nkaKG>rB@$_Xky9)+H>%g#Zc{~s{FzXQFYk}{O>gUc1AP`zUgBiX?kU61^x2G^YgQwJ|Jy5@qLcgldx zT|l0bEGp{fO=_t7IGhQR#rDWh^l>?a0vDoiyX{Wgp?DrA z&UeNGimD9MAPv;=r?V=*q? ztc`#7RZ%0{&FC&@h2Ph`hK&Uau&XN(6i$ibP|_mI3!8$28EUwFoiL_I@nhJ+eYl~{ z2}Ap4;p0DEd~16X-q=uB{cbDd@<`#mz%KY*IskIajW}Gn5=XB1Kyhjy1vX1j^Rovs zP6nc&|2_!%Vnzx0FNN_5QQVQ^fL(iz!^ET4;MuVr+qbD<`>+B|K52n!zr%R46EJ*} zEmp~@;2W)K5O9}7S#wK#F;om$pLQXwZ!_NeP)eD6Rl(2`Mc{PuDokEni*-A~V62I} z-&Q;P(B_4QuL@HYrc=}@8Be%dV~wrP#8J_!h+@C!0a^c6I6fJIQ-*A4x!wq6izU&Q zBM(tIGO&2*9gs`jhPzYT!Tnb|sJwGREBRt@e)tj&>X7%(w*;>Qms65vVU*{In{cRD z5QF=_Gn|btfOpCq41;CJ^Vb;V%e;}%l?yxt{K&Z`f(Z^I6yLQ9SUYkDGCA`wK1K!7 z)9vti&S%Q<+Y@+Gu@xB-Dbz*d?^Hwd2IKR)JaA&?DJr5&6t7*Xf&B~D!mO`76t3Bd zHcL;y0ZuPS8;ik1;jMT>sTvsDroc~F9-8^|aZcYte0TmN#S!U-K$Af9YS0DV?|>#* zy!d8OIU}xG9@o4~27#4cIOFq^@z-VohSWuHe_JK|kgEq99YHMJ>Vj1V!f}OO3H2{y z6vR|^L6;E+3I)4hYOXi_eP@C5U7Ru7eg+sZEc}}@#rPYbf_J(M@p?@H^}|aM-EWLR zUCMoml61j9Z(qtTDTh*=ehB`>##Ex|PCVt4W$fhQhAWh#u%Va(msjwjt4L?-T0q-LAInHzaU*pN%KENHh<*g~PMP6@b{2A%*rCspi*S&( z3l`4z#IH;R)bjccm)`F~%}Xn=!T$i-*E(RK{~7AbDJB-{+T&ur4Jc#{I9KF6Xj*T= z#fAql^`6=hN=^}YhMvdmE{UmKmOhT)94 zDf-U?%-a4O0?(fWy_9G$zVjaZbnB?A4h(!4Z$O!T2|~vaUOYI5!c7&+@Jz%o42O+S z&qRe#JfjXm9v#G$+qdDzH|b#cl!ZrFC17(!7nK}WP`1u$IPlLF6|Nu(|2cwX83)is zeGWc0`vWm4RZw*BF!BtEq5r>qcv)5sGcssU^=$y2K3b1vBPW3ud_Zh(Ai91Qf$3&7 zG_N;AtCf54om&NTHJ*b)`$Q;miGnpwidZJG7Wd`Z4Pv@e6z!?K09zjUM-_Ebq+3?$pQ72V<1ye0+Rmz*t=2y ze{Cv4-yB01#+62Gk=HcgCwN!K28)(U}!XKm08ML1U zs5HQaMA{gzPPf6qm|`$ilESx~_pnOp6f8Zl9fOZ}VaSg;XttJ)tF{cjx52z`Bf7`G0>gI{hDtVpYRE9-$$M=ynux?>>oOry$Qxg+_r{1K zNopoR7hnHI>^x}%pXY4FIn5cc?$=^`@V*KBz98y8?F9W98_e?=0k74=5Vd0ub{@M0 z+yh$3_xm--$5?{sOLugYerR-H+jH2PvJHPo9fzYsayaxu49>c!VU3v*b{r=vA}=j2p+t%ki^O@P&O2VZ9IUEJpJ(7X@C(Rz8!spS-2{e z!!TO55qHYn0|~D$@Ni~5I(m;oQTuCHuO*CC;`d?g)$MpUSO>m7XoLGT+rUbTfnIJA zP}}wzYIZEav3Vjm6C;Rkr4E8MBNStk7r=PO9O!J^gQHJXv8N{#mn!6OUX`w?LNKed-Za+J$VM@!IrnlQ=ie<@r*a0Iea5KpKdsb}2Xy=_w;zRcbfC#X z65SoyQ2S{CPR&;bzOH&0@;5@~y>6I&B9g>Bg1FvY7}XWX{r(Q%uk#5c(;1Y>y1Ov3 zg9qofT!iwlKKS!Y3Cwva;U;?_(iX~r{-G98vBTg+4?$6TDgOB( z1#*rIOcZdT=mxt`=AR$NWZB@qA$4jt?Fhbqa|e1x=_uW^1(mF1aOJxV$b5PR>aQnK zA_?9&BQB1WiSMB6<5v)$)I)KplaNsHlL~am0a;cwY#o@Uc261L&gx+}e_R#i^uIy@ z=@(ocq(X$_I=K9FHD0(mA5T3tgn402)OgNZ6o^_+o^1nkptj=SZ96e}$QJ9~H-j&O z#Q85zP*!Vyf#j;)urg;6(rRYFuwxDC`TN4WPr~>_awUc)j2kCgexz(gmSc6a0KPQb zhA$=Mu~j1&_fIH-%R~U$9iIY?r1_ZKXO8q}AE5MhK)x(L-nLwU+eFjgOBV}8!gt`7 zQWfe}!4axGUIr61yx{NfE;P)Ng6mzI@n!Qi=x=(%coZsujjsB5*4G}3Bqi{9#A6Ws z_MFivX^ZrP_3%4+n))R#3+iXufp#_+zZ+=bil02_;j|HMS~$bZs5L6D*F)QeR}gc< z7DF67@Lk{r%65$@{_?QFbvt%Ya$ygs0Mh>t_n3pwVtWi|BJ;CF3YcCZj?Q|Cl+wRQ zY+TO3?{^I#?p8Wg+~)_c$h`HrcM9mcet=+QXWXjR4V!vTK-T01pEzyzn6oERL|?$k-o$6KC55hDp&r2+kO)KSv|aQKrsiSLqN;;S2a%pOA8XgDaPNKwbe zMxgV(8@5z9gOi6e4iyc+&72RgvW|(DH0I(HsnsOg?1!KGmZGI>DC`;M!QYP#W1KRH z#Ud5}f4!(tcjiXCWVZ%ODNA$@;KM0iT@(>@!j(%*sn_+6$YC$Rg-=9r|2{uR-o6Cf zw3T7G&S^$yb0jWMqfl3qLW8xo$b7jUH!4=bwQC3A)9726$uXoDL)w@n=7RhCg;4p! zD|j1v3_dawsdu;3sSA>8@JjwN49$;(U3)g-igho*;*b)KEcZb109g#(Qv}n879rop zMc`;y3>(J}V~nvYCAHlGnrvjCvs4hTdetzbgWEtXSPu)?7a=D!3cs;5QB^h^{f2U3 z?^ZQ*@_Pxhkq6Oy#s$aQ&r?5hrQzRwDR`5j2A>Xx8h=)O0yjBxu!_XRy4`Egyx=C~ zGTAmJ(rb{?4`+l~Zv{E)L)f^%1-GsI3oBTu zkoC|BuXZlPjgjx*?lxmEXtP5_kJr?qhf%QPuQDz?ZwgxFR#cH#Av`Wdh;VX2G==0Bo5E!;EdhRK&dm*!cA;Y@Qr~>>nqVUT};jb@q zlR{AZtb}WlEMblOX3P{a#HlaKF*Md2{KreEGv(r;(DmbvM1;p1+!y=_VIQQEdLK?}qOMQa+yAjBdWuf=b z2;4KV#Oe4brYb-&VdQ#cF4`xi_bH9K}W(JBMru=B?Hf3 zgrkouX}4hFb}S`pu!GDCG|_tbGHkt+3m2PDf_~c^3|=jOz*2xnh7EkBY^e1G_N3o; z!StPe)R&ficuQ$9F4~ks>;nm?U!sqN+pYn>y*G{|3cyOO4)~GjhO53hqkxqS4z1V& zPZ=zRcvC;*lQ?6S=m(1Nv=p9g%Z0L=qY$Eb1tz~q;|5n9l)A7JrD);U!JiHb%?@DS zA2uvrZioT$Lg-EAw+1XSCn1wp4(7>#08qC4ex^VA;pf7^_!D*{O(#68&_VNoP~faUTwAnbB6=4)n_|3d6P!Dv^K!=OaG|e5gxpB zP#5ZQ?ofFZx{!XQ3Yxuaz(Mm7B}_hpnBAdxWP=;N+_nLRA{Ee%N#=N@j~>{19rA-e z1E|DOyZm}VZ=Wk_zTk&}{fAM#jgH>CkHV&E8!Y{P5sZp0@Sx}xJg%ew2D-6ev&tX6 z>1G&o$rZcRrXl?G6r3_^fhje8yk+qcqQAsb{y`+Bn!5vq7tFtTGD9E{$M0WbXp7@3<-)$7g2eQr0ws$>_si{Gcb&fWy;8^=JZI4GThPbvDJanwp+?3O^;lmX z=`M#+j*QmmsXMvwrT!K8o%ILX zqUR9%XCHhdpWW#wG1zgQ!?P%HPubsw^TeW)M!qX#L0qmJOmFHP~aH^2-20a$zd2pSld zz^T|$#_^B3sCjY@rikr?-GOR&aeN8d9a6!PaWXzMufSgm=OZgT2*oV@vB0Ya3N8JB zeNq?_3}(UIV@(?)4*Wz z0e8<8?BAS^n}gb_h8q&K-^Rj4)AwYXr( zX|k?zUkYuE<={&7W_YsP2*VqCVOnkl?6Zg!CnV7?eBrJoD~)qF5ge+hgvK5TpD1xp%-;K($|Q3o}{ z*gzu4c#VL1W+=89&qX6eUi9Z%f#2Ve_o_4n;&$yY=HdfRZ!c z35u{Vd=1L1Rzuq43bcB28YTpN@%V;0_$gKjZ~a*X$%QXL!-fyNr?_xI@H(vbO@h%? zfjC9izz_O9xFX(|^Z};0sKNs`enu*Pfid0>{{vt~QQKuR;NopVcq8P7bfvuvlTa~| zgWiV+<-Wo?jlZz0{u^YLhk;?Y6~3JBL-h%gdsxXt<*zCXMH4wZ-d7B?$^&Km4nh zi{`hDF?%ov0vsc;w2>E|w1}X&xCRDKo1k;HJH*&`LqE$1PF^en^MQG|$$u}{7bU^G zcQP2*n+;cGU&0K{6IqP=V7GSz{#>AlOx6TL@a;M@+g%0ayPa`?J{|Z^CBrAZSJdV|GE^)_n$-Vq~J$BosBd?U&dIx5}9jMK?`BNk+RRQ3rsPOssOcxb{= zxe;c(QU9B{ksU`BesJSq`prDH&@sAzNrI{LTZshgW6DxZ36=(L=cgweQ&BZZv~_>G zAg|+?s$Nr~W8~X~9HCfsOOqt$ytj)gtYbA?o041`-b!9hh}8@*N%ri2E7i~us}<3d z>@)p#F;^%~JI*AG}?IU_A{XenJd(c029Gc#{UMcpRR*0nh^ zr(tNBc4DG^fN54i|B$LdXQE?7b5`;6kQ!4s$tlkCM5*+!y0uM`b58S#GK1mePKiky z%S^K?-G?>YJCj`NnzO4ShgYzKlik})bE@-(HG^!DJ)bn^)HMvR3{6b-9x=^r>>t*O z>`eCg(VTm0dUzFEIE5u(me(vjqMcxq;wyD0ugzd&b$Viozp7b&m-~oLUS~>x-ktou z$dNT1;nW~Yvw{bCBf1qfsUfa+3Z68Kti7C=x;?AZYF;AZF=i^=l^&FFpBf-+VbV2@@xw>sel50z;)U`1S?WBy9 z0Q1wb&&Dhbx-!xtT23p>j9D>7GSlPC&nU~hx3;#;%*<&yqiXox#wjWDM49 z?p>KVbuDML_P@7diDc!qnV0G0zqb#v%_?}(Ql@w9y+deHR?&!gxzV%tj*(qi#Xnlg zVdnihw#W&NfJKF=%m=3g+Y_ZytreDrAJ(TQoj9Xvan9D`gL7Wji88&`bB_BzY~YAw zS6Et9I_H1bSYex8>DpT9dhLVD<)rKj0T$;ypM7v`=*q5&Xg%*U^TCZPl5;uE;)1Wt zxO<0fPIXS}g#g2Gk9$cuwPh9;Lp;Vk`@3@L>RK=E+&}I$D3aUIW>FQEKkhwdo7?!L zwJPG;_@>FE+#4enm!h7H`%HJ`-ulsc>F~_>W}0XoSHSXejLb)tkX>H0RNLh^!;f1e zlJi&TG?ayOOs5qaR%~zr)h9Iy3*Hzq(z1mup*f&b5zQwUhIE0xW9^ zo_!23=+5tpXsaon`54F)Ew~qFSz9XeDahKc;6YAXZJFVxV5j7QM`e~*D?L7ixOW#k zscXAhwg1yLmS|yrn`K>f{-^Chc7@NMwAIyJ`?MpJj2@3VW|_~S33f$qrP>?X3_tHqPc9l#wYt{j@i{E7yJ$qO{aWAt&wDtcC&w(U z8Xx3;4zI8~`N6fl@yWH%doL%S{1{+${rR)c5e?lZKS#7*e?9YgA6K+^GS2G8TbYT- z4!h#7Iqf$_3@7&AOD>)&v%2}gV?6rkJ80TgsiNa7Zgk$Q@1Y>aqDQ7YMhMKPAL%!w7w(z zd@|0Ur)2KFjynpolkrTkQi*u$7G>El3D)+d3vxSJRE@qQI;E5@Dz|Rc@cfeG-cu@7 z-_fcS^(C1lc3QgKx=p9xOG=RaY1yY8ZF-GgQbSWt%Z*yM8$JJ$7TI%J;b%uXNk$xJ zi=Ckh+H{!8eoarXKcl?3v%}KpYess?85K2~PFv5fnRz{DRP{SM9izTxam3E5TiJ9u z7koWYVSiS`t+UIu@oV-JXrStCL*>9(4;uTCm+j}vxQyd|O3e&}1_u`DEN+eP%%++k~ zCwWelO7~V+>UZ5wi<&x37e8lhW&0qrVCsyz!#P{Gt_L}dQ)jhP&)EmsJ}h`XRc6q8 z&T(JY!{XVga;A8tQ@rh?QrYhn)((}jZu_{>^LwRxZ>4K}*W;?F z@8?aZG8T{Dzf*y&(E%>w`RXzVvAp3 z3EK5H%l^2W;Bdirad&^4(T^+XsTcg!?4EUb{;1CDy%3<^{j4wQM-4~(Vvv>H^9Kb# zYAYNrhPZVgCjn+XL-hJb(V9uA%qh&VAi4UeErh=ZaT_#@oGoEBmvd!=WlH zxBKOY(a&r5Qmewt?OuKG{Mp#wTNP2?{pxen&+CKYmm=HkUVkn4dBbkZ;ZoGo?$xKYZeaHL&s)>Imk$5z9{4-^^EPenWwxOGAn%fCu8`y9n8iJV0>;x#5^0x@ zsoB2~@tSUy?zGsudC5$NwPST=ZqJaa@l2;vTJ?!?`(X{QnJ)Lf>YVzX zVXXr*-K@DadF}QiI)yVmL5?*APkTo6uFv#_rqvXU+K(E&nCXk`t1153GYY?E?y~3B zas(a5Oqa~wOK_|$UEDinX*_#BJ+1bPn!|fruh|EAeYIu!z3&|l%s%AIy;@=A@WHuo z_ECl7)k?SC53bi|A74(pdLhtZ-1EiklZL*lRr`9!eSXb8<<70U9PjYacge5*4#&Fc z+}@7?#=oB3ORK9bclZ?I_3L?mUtL{&@28yye!Uo+Ti?*`@HwpT*UK@-`o^cdpChjS zdNr9=e`C~PBI?Dj*VBFVw|@3c9RBrdfHtpzE9f{Gv*h=n(7J}^#eJc7WaiRTe}CkW zlN9=|$~uW)Ww6C64wjQr7klx&NR>B(qqHQ}JvBpymoXl4T>;N~`EV9~L(R?Q zxYZ^bbeh!hx|c2{&HD*jgA#C|>KG^uEW^)5;ppGP25Hwph&X15ziq!$Y)L!(K4lN> zLM70UErRouKElqL$IxBHgXd1|!EmuFa9tx0B(482w%2UJ<`^Fo)4T_rhhixG3MmYc z*Td6U7WkoL168+k0EGU~(7H(;_rBSS!E?j0VIbYeesLgjA}rD3XC8QN7e(3uQ54NJ z!mvL~T<$7@K7Xr?zZ{%^xxBS-mXzVdojHK=!!~&3fFVZDkHE|8HlXN^OqdgN8WL|9 zAPD}0ewQ`qdqNJ)4{>1B%8L54XbzTcw1#L?7N+p-#if7MaAr&m9h?r}`#L_X5DG^R zdLc|J*kI?UT4?UirAF6MSloUHT_!(3y2LG*4wrzgeicmkwhM20mr@^cH)8<5EOfJq zAkC@`IIq-EhaC+-mQa3hW1x462MZQyg6X%#D6(7|0)Ja#W-AT%=kCL8?h{m@Tss8W z3>jDbiNse|EU`s#F(x%qxMGtJs(Lu#oQuoI^eh_!dk=$wi7B=n-iPa|+jbmEd;^cgR^i@50+9LO9V}<= zfKjq6?sBCKN@OjtinASy+QKpG3JY|?MQ|S}GqjOs;RcdEb)9H}Hx8eu4nYcEF8&1H zZ#y9F=owxPlY z25R$7Q>oWC!hOkB2o85bzs@Jrc~Sya5~zsMD{?@Ke-pTmlhU*^x(r3lYjDI(9#2I~ zL%jPe^xLe&C($*q{n{q%p5+HkOM4I;l}1zL`KUEygl&<0Jqoo(Td;KI0~KYF1V&zKF(N+@UyI#hyp7w6e~-+=ZR=^U-}5d= z?2Lo^eXoGcw;!#f{BR&Uk1|?wkQzN9k1bE*C>kmKFt}bqo!^&Esp|3Iz$XKkI`Il_ zuE_vl6;FJle~lV^JZwSrjCm0-q_D~1>W9uz?_Yf@MMh)N)86%R@03bs?3X} zZ)ITqPa!NEj|HjsYcX(IjG^lj4Tbav)YrCe49=(*`u+{V+Kf`_Ql$WdtZAj{{LIin z)D7o;I)Zu9OVBBslsnySG%~HOqb>!Q;_Gj6XwlIM%l9h6E{9dPfBqp{=ck5wx25sT zg%+@59l(aP-Kb})gD2uwpaY0wv33${(3}e)c5@jM!((vm;|#U%nJg?Dv_}KEwb*{j z0;Nn1@r;%hOn8hy#-{@6=8@y@bPHKZwq#(b#$}lB6a^>!kMJPP9*2(Tp};blV+q_@Pv@b6&M_3?s`1)t$s zjxoMJEs3=rk<>S13U`%PP@W}qFdTOaA{VORM`?G|I4^@V;D8jkLB@ig;Bt8xcHtKY z{IwhJ3)+FabT|Z;>_zXVtB}sO967epkn~NGlt+7^R(l}YFFglzDw?r-zy`M*7lo{# zZ0P@$2>UNSf@4pjDE9bXw6&0cU!^`E70*KzSZzg>^JNT;3^9~3j3-a!6~UDDCuC1{7#nGEVLS{ zqIGa*;tE)hc@VptG9gKD7mDht;mV8GsjTJy;J%*&++gm*R?Q*UJ@gGG>`CcV-7Bia z#2ES-&O=W9b!e5jP9;~bz|)tOFwT*s^!Fz;P(@W0PTk4|Q#(4oOYvZ6UXO#)ff?%T zvn}YU8GtK&Eb%4jGsA{!sF8jPN&`h?(%)?W~)A%u5h zAA+_m4J9%qF=Uq@%>7ym`Nav0Swl8e@N_Xqw64G>`BhN4qZp>ns$jj020l2c1biOF zRBR9V-{G(^YP;DwQm)|y5z{BBk$>gzxFr}5tskMbJ38Zd{C{ijyQ88=`e*@BvY-Nj zf&u~}B9aB9>*tVj4g(l~A!o_KKoS8_5pxDq6wH|88ZlwOoFn3@xMEnd>$`*QuA9E| z{&?Sa&O2vE@7((fRb5@()7{fkMc1=lKOJIQex5)rLzXeeRmt?e{wO!!C!g~9qv*W zl)mOLTjuJ*oj%o_yrkW!vivZ+p)WzE<*V6dCnK7B;w3Y}aKF$I{&eE50rl+bMTz3& z%;2>(?K&`xNt>v#<-XzU?#Xj(W0pTz_nFLku5hFa<3s3|Tm^dT5zUI0N>R}ZN$U0t zKUba_N!xIIUggVYM=ZSP|;;fuWiXIwS+U6Ii7kvV&sJLOU8W( z$~~t?_Ahg3-IaA7?w*3BZ%(i!X*fjOn|PqO15 z!>Pmk77J6I%~FgoEN9~^F7ikcE%fKHf$b5rZmuECvxuVSTcl|84GjwS)nL!B#ZlYq zL6k3&Pmyyb(&F<^x%yU9+L*tOJs&@iqOHeJ7XLN-qir1CyLp|P7|7WT#M$|>QjoGD-AMV!YmwTF#AO% ztO!G3a<|s7`p*G0Z=*b!W@Xan!$gNnGwE_|3D-K@i)zDs*e&~kwD;-{EKqbS+r7wx zN_X0`jLE`b6b{rmM2^l}aH5$SMA@3N@iSW^_d0$m$A!J+jAz7A-xW7;jtZmnPs=#z zj9Qk}IG$C>a$qaT3L$#x4 z=+M@22ifN!0k_@r0^9lNN48{-C2e|-p`ogYq@pcF83WI<5oW$rw)HWaIqD75R2HZC zWyiQR&Z6`Y*K-N)7lnEg-PA$L+KdS>JY@#c<`9lWNv|S0rUU|~lduq&l?{IeU zS{993lS?IImvf7@cW~{hBdOq4BwKR7lEsW3M+=v z^)j~Xbv|opc*n)eJ;H_lLQMImOuF;(5Vzi2noeENr_Pi2*u&k|SYrA_+GgI1Odi^j z_?!_G>le%_J$1QbUY4|OuL3>T>O~=a`q0EOdD64VjeJ~BF`0xIx{jZ>Ce<3#v1$G+?6o{oeO%1>?UCl}wk~DT zH+f_;RgLm3uP`0;RosJT`MGS+Mr*|2T*cVk%`fO)Jb@~0t=baU0FSVp0!^hBq z9w{UrxsA=8>cMvUkEB7_EjD9^r*TJ>GpO#xUT&xJII0({WOYL;SoC%yCSDLgI%a%Y zrkP7mS16DnQzO%+Ph4i1BU_NJMS-fL>5S(!FQx=Af;#?Pm02B&J#{vQspLXGa^d&Y*Q zPgG-OZ*?(UCWkWQeqdwt4cWG!XUx_?KsT0d<#Zb|Xy7Qu_Ey-EXy;g(>}E{88e15* z@Gv*HUj#LG_>jWtaN_p}rZNuG&Lm@~`*dygcC8^xxgJTsyjaIn>%}R*vnL(2!|=rC zb)4mnN|rVzhonNDvc7SS?A3J%W^vApM1l>dcf1T;m=(k_rnIoRAE$DgtVF5VrIy*n zyk|d*if0>t{5;ZG#?9uPl?F9VNMvtTO(eHhVN_pJm83b2K%mBvaY@!Ss2QHf#IP!fK*<)D)4yNvvvQ=?;lBYvpVJD373fO=T>7SP1QZCQq|x3n=;OHKyaFk0GBSuK-Edj6cjrL3L9 z^w(Ol6Xm6B#I6r))=epjzgWX&r~kxF7}Jd!Lk@9oJRMk*{bp9Z<{~>0GL60Q%b=k1 zwxn*uvExs=G0GoLfrT=}&Lz>dnImbt!5*9YQ;4+;3L~{MWlYb;h;BXzqg@4)>72q( zY)kV~Hlioy;5;3`=I$|~+!#mpA>4u#=D3lilQ<2x#gNV={;a+-lR9=fa&9+9vjv?q zS>MNV*xkL&tbRobv)gS%74vV{tnYT6MVNPT$$i5)rXk6e_D-S?=lZgj!TKyrdob0O z4k49SVx$tXdt_PoQx@WaVQ>lZl-j>}9?D;06Q0~)u|D>!{HOa&uV)tVo~>nT+=tO(40|R) zEqC9nn&~%PWnme^*zM~x**J9!0k1#JmapSzz~IYlkiQ5;EOH`82W6^k96;Mg#IQ)K z=S(wjHWyeLM@!4cQNp2Y%G2~=%=iW?U(Qk4;lnK6aT3ik7o~Mao!J+SckF>+By$|R zoNeMIlfSMpwY*j(9pj_i@Dr+3+h-KbAHJXU+9FGn(igD9I6s_;t>csrUt?i5i&#y- z2%6NE%vnl2Vb@{;X~x)#%)&c?YLbVuwQtO5xYkhGx-S${8k)EZ`A67ir5Ws#UNPI{ zr9{!VU)O(95!=wh=wjP)rrIb$ecLhpYV>?IZVpEykJi|n%@X1I#zj!T!x>ECGtu+h z7hJPWI?ZbeCeyT06nA1PJC?eI*;+=j)P*lu`_D#9a>Fy$f1n~0Yt5mQ-6A%po9bBr z?vakH70`^Up|s&Kk3ukoVc7K-Oq8EP4Ncx8b-I+j$*yEuw%lWBn39y%QqOgNI*%>+ zEWtvo9O$HD3wK)4hej-Z&r;fk(v=h~_M~bp)02zgRy9XZFY%#V)r~{!T6`{ zu^IF*a*FRmqi)}3+8I~4!3#Vo8uwHWj#DSSOHuUU_+{4Z@K8FLbCd~go@KX-N3&z9 z%5($Y=Szo&&|%dE7OioBz3Dp_-#ykdraGQpNO+NoCXf7m1yqB3t+7Qt=!wo6<{scp z&V##=p>zx@nv_H{mtSJ@;{s_N7f0{vk|@Jhh51}GAwva6s#?&>w%(pdk(Lvv^cAiP z=S-re6iJdMDJnemfMq@JU_btHosF37OXdf+ah=*cGFaG)I$ih_us)VlB;~p2e4?vG zspS5-FDst8miwY{nS~$fWSRkCq#Lt?C1-WAmGhmbBof~PKc&!_CQNNU`-=S-c#h@Y zwV*($QdVZJN9#6*kO9hkG0>-18>6f~Ox0pzLuPQE66@KVmFo21E+a`jC$3TP7_;eR zO;#G#^!fQPR@Tdl246O&F>UFXX1&tpGtFcAiDRj1iV+!&c*bryc+#wL8R}@B%bfcM zvls=UDW1o<#ythx`ZjxJ?7f&vAbUD1Qou@0Ff>^zn#SLJ!9HCypm!LO*PP-wiF@Cmf^y$6EIBeruc4d+QJ@BYtV@@TKJB|~_{8eZ| zZ%owUh3eN~c(VU~);!aY*<6sI4|Yl1Mc)%_z|HqI zGgL9PRPhBDz7j(c9j>#B7^1i8cm@@D*k{yXNrsG2$6Ri*m9(zK)MC_6GAQw&zBQpvSJq<3@>?UGXGXi7Nu>iQ8T zNGWGe?vEv>UQ4+f+S8drx;V9|`*L5_D3bU47&3n@O-I^$lWMvYlb))^TEoY)8(LLt zb)G&KaWI+5P8v@ajs#KQGA*jbIrz&GRhl$?6n&Xjh3|@kDDb`uh4uJ50$u&B|F{3d9uL8->G3^YK&e$VffPs8b;0c0(#1VM9wlRI~WU zxnw1(%e~!aLt--p7y+f2`=o`}R>j}({KbA%f#r>#*-KpT3 zHl0e5rv*JH(%?J2>E>J?_M|=w_vBBoo%qh8bX}2sxDdd`Igh3dMY7zsOB+~FtUjGu zrB8jTb!qP@ak`Rqf(@1#!^Zw(&+fNvW4#81Q|)Y5sw)4$ip&R4xADD6t=f_K$yc*H zi`Q)4S&o*fn9|tK%h)DsNqRf<9WzacW}@%LbGP={Ql3FLr+7b+>gM>8y!Rz`beJnG z+M>%Ihks% zVm0jOo#(8_Vi|hm)Qz6LaVE7nBk7by87uIVqS<-T^#1fxrc{|nXIE*kgX_hpesN#A zCfLCq&sX5Qb3AE*;2_(v`Z%`(+qwUP4)*-Rdz<8&!`O)hz1Zr`PF6mR=(Z%LuP6^? zcBL7_e!jzvJeNuL)jqR11KK&o2_n=sB4gyGL+7oJSMg}-^m^bN^ct`a%Gg1#0LBByQ6?L{8Ne>+Hu!EpOM-WRucFLDXo6k=IYAP2e$xB#7r z_ys@>`G&Cl!INQ61fM`oEie+WLQER|0ALO5If(PYL3<9SCGl26*@&IQ7 zKSDbLACYqtm7IfbDfm3>QSeIvQ{ejq;LjI|=)=|q#vrE#5C?7|77LUiE(_K{ydU@w zSR5!u&RW=$5Lbk@1zMm9K6mh0hz|rW2ll}~2H1$aUtk{u*MWD#w;b3B zbOWqWW-4MC;CNsNumt`F_@+Q#0EWXJ22=oipf_U4@b?7zK{o+#i#&jR90&&P!|?&W z=YRw3ZGa6>3b?`76J@plGhvH^hXH+2#t=9Fy&XAn;Ngh(fbS}JHBbj%Gq4KrNT3M1 zKm7H88SKxn&qKQdda$R!ZWZREj!?26YL*gF9iDlmcTNUUjW<##1RwjsQW=q$k#^vF6j@qYp9=Ug zcraoG@K*xr@STO-1H2W;2Q~;beAB^E;CbK@lzRs43itx}FVY7n0+zz~0d)-q)&Xs> z>ySScu?N5|=+)p9U^QZkVV41|KsNG~fJulA2ChNx0u~{rk6agMb?`CR2cR#3Pryz9 zUq`+GaD@Ldz(LOk7lNN7#~s`XOoOiv@+ZQ#5B6NJsZax>U}qxki8uo?;oAw{LFg*r zFk&y^I|pqDe<&cFVmKPu3BLu{7``3Q<=_F}1IT>|yo1&RKL(FN-V^8|a1d;H#AAy^J>+r{co!|>Vt|fFj^Z{@ckPrO|Xo7wMzK9$Fg~AF)VqfB5!- zWnqVd9T49F?hC$)m>hH^v=8Dvz`?*i=%I-DK}FMYP8f%Cy%z;nQQh%bab1LgxG zp<_|sS;Uq?Ux9A}bULsZ{uZzdxEuT);FI8m@XbYTBzP4N1K$#`9AJeUf7lENggyzrO@Ny*8)!hF2eT$NP@OPnLOx^&<~JP3mysH0AvVrfjfx1g42*24m|_h z45T1uEBuM@9S6q)gWwkf761=nmxJ5Dvyh_*-UGW3uz8_XH0CKL&_m;*Zk@srSp zkaG`u0oWd_4}J~*0B8x|H0)~RF9wc7{|w%Qcp6X-eHT~<|0CE7pmm@%0D@i)7@`hq z*yEtpp^re%L#`^&3w8qRDCjE01BH2j1rUOm1pI}7EMSg!D|`azX^2aLXM*M7YX?Mu ziSVrx#(@EdX`;+A_+wyefPaL&0{%nb3fRZM89;aVWWgJdhx_OU1}U8!^oJ?lkC3d= zb!HQOZTj6pxx2ky-;qRE_y?|~CyR({91sy#3_BnysaXE~cl>CEM)N$G~}tSZ-CoO-@|aPb5&SG~GQTUYf} z3(}Du|e6|D_Tri6wXzeo*mWaw9ffRBRTU+Heu7{Lq->SNL-8EBBxZFdSUvA zhMX62md)Oq>@DwC+Id>FFDdr4?(JN6#=2wc15ZQcz1_}gy*U2rn9b{pf|+V!H;cVk z%a2=TDt>zXpq##Bh$x7ON|`C}y1x(e)|AkkqaY$Z?1;DBTFci}JOu~sR{MxpKQV_3 z{wrrV^j%o&}P(pu9min{1%hPBG*E-mqO)n9+a*X^f{2j|_4 zRw?vRG&#Mpw}<()t%p4>7l!3 zRsHq&b=$H^5;i_dQ%u}^!Xi{|>!s)$Ni*)vTbx|p(Hxqx|8tMgsfP}xHmAxdIJu8+ zR-ZaLZQPI@YU#_&9^FhoGiv_m3@7L8bE6_XoW^J`4tOLwx{x1vE3+hgzPiTsr1|p( zoF3oTHv9I(AScPjS>m@1?v;+_uC$6|Nsrr^kugRa^za3bxV_&O(fml!HyUE?R zhuo}%fl&iC|q^Q@`QCEv(^{>_x%SVW5D)l;KUQ`m0WLldt*88d_r_?OCmv!4E zL$g#ub$)GWiPaX(0=4$K(_!j)?`qXG&MVcG4LZMFzFenYWZjg(%gQ2T9xa$%SFti` zQ(e44nn>t0CA9}y-SrL}UYazo<3(NZyN2`r;zrGS4=P0(?N*qXN5npuUVnGNpbT!; z$_F!S-u#G(DPoW9tM+xamsWYoHar+;ul#<+ET`VZD?MGB?+=jk(%aQG)7?VqN|C4Q zn>O*q2HjW5=9Kq&STf2r&flKr-XJ?S+C?RTuhPRo()yJDkf^yu{NVd@`C}CV=Y^Ek z*;8oa&LD}an6VwOx!J7aZB>{Dfq2&d>)zh*XxXvOT?Y~-E_+eGeDa%UjhaGfSIw%)a{cb;tlis&-E3Em|*h1r?yvQ{x zYNSML`aSkNkusvf^>(G{j78ll8z!vkJ@oL^Qe9CizHNL%y{?2{*6N6Na*Ykj*^QW) z@V3gvIa&KgqtE8+Ppntxp1WZe6IDIPa9_#QqP1>2-|4R_`(WYiv|0Db6u06NuD91Y zpYL~0aod?iQ`hg%Z;~k3S=#BjL9RzucYeT=-9y(Mo70>i;D3nHO&$8ib+zO*jp!8{ z_pcWZG^nZI-QKuSq`!fC`R!d5o1A;P87Kyx8&Vy1N-=*-`ef?}gY@LE^bRKXBu1u5 zL04vW$w9rf70##XD>SF~GRjx&K6Ufq%HgN2KN@V^mG{WG>T^eJgYTk?RdHOMVU@PM zn&ZTi4S;}iA4kF{q-Zo7P0>QdO1h{eOy7G`YK_pf%fcXPNN(ll(xsY7oo z6mH#q?Y{hWpy|CG1%-=~c1AnvCfB!2Pf+4_s~S2ka?~ci;g)@lpE%+$C^ig|- zV4!EsM^%r<%3Io*MO6Vm{z-^_ncBWZ0z>J~L+XY_zU=V6HhMs0$kW{&+n*YCe_k4> zy0>gruZ??Og~ZjmX578rXW!44_c)HZ+!T|Nx=eKCt$h)n>Qwh9XI$+$x^>O$jq3Uq zHFZ_t@9I+ef6RL5C8D{@txUsh`->YcKfHSPbh^wJu?eE%^LiL`Mn3A%qxC?4!vpdM zKLu*f_c7;{uUk4#(IncyFxcO7%6{cu3~ay#s6KxbUXO z$S%LK>2Sp@tDg=lnD*LyWVXiy7s2gk_QoYsbXT1&Qpy%dO;TQLv2ypqF^P3^d3y6} zES{=4C^eUNR}O0SUQqXA za%j;+mkAH^tt<*xp31X5c;=MRsm6O{{X>`7w$yE3xwIww<)%ktC!KL`R-WFpXJt3D z!4=(>D`y>ion*hzVZUT?!~>`blc0i;a2nSS4n@mX8PM(%?e72 z?n+`^_n9TtVrQ?KA*RwpcrB)^)bIN%G09&;D!(bz{+s^YiwiOt;U%*0vP^WDnuu;u z8`j|0hpR{k(xNbb``d#?_KPYvi|TgV@20!=K3e`awf(~~{%%cFCzy+<^hg)PMkR$S z$>KpPI%0o$mP=w~FEtyqP52xZgnxhViHMS<(Ee?L`B(e5yZU&9i?Fsob^q5&eqALU zMfb|yYQk^1`2N03O;t@b@qhSSng7iLUH*IX@pb=eKd1bcKSKYb{ad;1MmOwLx{}IG zH@o5XimHK1kJRww$nbQPJ(BraamkTUIpYK=iJ3{s=~~trq4`<~@Ih&%g(t^EX&Go} zC5IsKm%| zfAoEA*|?0{RN)e}GLz#3DUn|rk(rc=qt~}UN<@5=AVb)=q^OMW$ncDCq~~izB_lm9 zIYwAEz~6D0rPjpn8uQ0iX#MGNO~T6l(4@ah_*46^;3(nqo-*>f`soi%{557_7Mh$s z<}XbZ7RyUX?pn*hP04wmVH?)3xu+m>!^@q@(+7p+YVPW>yru^pquCs(&^AC&s z?GX0YMgAf5r$t65ri5phn*4R4-v$2DVt@Vp^RG+&KJx#1p?^Fb{>`%gn((J4pilp9 zt$%a>r*-~zF8k|ZzYF|nalh_Of9>aABiPR}|FWNTZRM_yqww^ZlGgPpNsNm~3s1`x zen?Vt!_(4IvW2zcL0;)`Dak^wvALmZT}F@7CIe%i#rQ-7rte6#a(#3 zY#z#ig_YR4Knrai&psXbwj*4U5;H@b0?qg)o>>vjff*6W9&uiAwh>Xeu4&;x<_RwG zDKRd|uCWnGk%=yG*)c9j=CKh$0m7Wj$e^5bcROC3z|=QZkn95=FZ-LFC9rd`aItqV z^RO4ByV$wrMLGp$3+wXW8%GLrf=#T_BZG`H`6geNAHx4_xtYO1#)-n*h~&WZ2s`02 zLsCP6auWC^f3s{SN3`EDk8kR0E^rdIix(FeWS)xp6GHr4gv&-djkBWyt&F1hwq5nQ zBw59UCk4hw+I7V0%O^rwrbk!Fa=z z6X~21C+tJ?PlmubF*?#YF&pb@fptv?=KCggE!#iXD9FS-kN>p~{<4l=+n1UZk@U6C zu#WaF$;MXhcEWw)yY+E)O^r+n%*A#}_{a75bI#wkKQS}XKG^8nK4cuhw>AHMn_*uw z@{ITSx?Vwn=r`f|I-vi*?q|_{*)iBBf7>@)lCZxfheQjK9F4wh=cMqU!1NGDtN)af z9gKZCnD6<0pCv^&S;a=$d*uF8+d`6ZvV!?Oc^+|pv%ff*8%H?h{H5Q5gFNH;rmk3r zfUb24NlHwNaPp0gNOH{hwyk1=O+3GE|NpLS{E%e6iEnIpP>ygLpz}T8cssOd>5XZH>B|N;p5VEKElaEXmq^`-|U4kCr6VIW2=PVpu}|Hu~m?im@UY* zO+m@wU5S3eciWh6wkDj37F?>cV^ zg!O%U;dG1_`ZGXyKE-xGuL~QB%6@CqxBmWm((dYdVFHgQ>|5JJ>KfTY4WB>pF literal 0 HcmV?d00001 diff --git a/examples/remote-offline-store/offline_server/feature_repo/data/online_store.db b/examples/remote-offline-store/offline_server/feature_repo/data/online_store.db new file mode 100644 index 0000000000000000000000000000000000000000..d230f45b934c6f347589b35d87f58efa2ba4aa6d GIT binary patch literal 28672 zcmeI&!EVzq7{Kvl?HUA>?SgXkkxCT^Ar9PVA%}^wky2^fp|V2jXwlMiOWZ;a9MS_1 z!6Whn97&p0Vjz)J9FW4_l9Q%!Y=1w0ie$C#-9ez0@aOT&)8d(NXqcw)QV7E^npw57 zx-I#2Ubn^Ee)z+=X*~A7wGX}z^+5I_I{1Q0;r{}4!D zv@HA7HPh1|nyOE8-w%UG$wb9V70YQH^JNWaQ117&&L7u2_2ELfza}%Paz^6QP5d%0y>ZT``HXXVmnUZ^gUe(c9keT)c74 ze+v8d2%XMkzh!@PO=Giu$#t0Rprp6Z^Ubv^4_Q7fZ_`TMhGkF2W?FPx{)F^nm0VOx zmZE8M*Ob`~P%UwaL2jqdL({UyV{`qvV)_()RE>Ra$5i@#hW{v`uJ$ZDZH)gSql%@e zkXJ>_jqFu>Cx6|x^TT0&pdf$%0tg_000IagfB*srAb>z!1@7j7vH!2@@)8#U2q1s} z0tg_000IagfB*s;0rvkS0s;sifB*srAb pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df + + +# This groups features into a model version +driver_activity_v1 = FeatureService( + name="driver_activity_v1", + features=[ + driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view + transformed_conv_rate, # Selects all features from the feature view + ], +) +driver_activity_v2 = FeatureService( + name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate] +) + +# Defines a way to push data (to be available offline, online or both) into Feast. +driver_stats_push_source = PushSource( + name="driver_stats_push_source", + batch_source=driver_stats_source, +) + +# Defines a slightly modified version of the feature view from above, where the source +# has been changed to the push source. This allows fresh features to be directly pushed +# to the online store for this feature view. +driver_stats_fresh_fv = FeatureView( + name="driver_hourly_stats_fresh", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_push_source, # Changed from above + tags={"team": "driver_performance"}, +) + + +# Define an on demand feature view which can generate new features based on +# existing feature views and RequestSource features +@on_demand_feature_view( + sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], +) +def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df + + +driver_activity_v3 = FeatureService( + name="driver_activity_v3", + features=[driver_stats_fresh_fv, transformed_conv_rate_fresh], +) diff --git a/examples/remote-offline-store/offline_server/feature_repo/feature_store.yaml b/examples/remote-offline-store/offline_server/feature_repo/feature_store.yaml new file mode 100644 index 0000000000..a751706d07 --- /dev/null +++ b/examples/remote-offline-store/offline_server/feature_repo/feature_store.yaml @@ -0,0 +1,9 @@ +project: offline_server +# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry) +registry: data/registry.db +# The provider primarily specifies default offline / online stores & storing the registry in a given cloud +provider: local +online_store: + type: sqlite + path: data/online_store.db +entity_key_serialization_version: 2 diff --git a/infra/charts/feast-feature-server/README.md b/infra/charts/feast-feature-server/README.md index 457aeff245..9ff5652485 100644 --- a/infra/charts/feast-feature-server/README.md +++ b/infra/charts/feast-feature-server/README.md @@ -13,9 +13,18 @@ helm repo update Install Feast Feature Server on Kubernetes -A base64 encoded version of the `feature_store.yaml` file is needed. Helm install example: +- Feast Deployment Mode: The Feast Feature Server supports multiple deployment modes using the `feast_mode` property. Supported modes are `online` (default), `offline`, `ui`, and `registry`. +Users can set the `feast_mode` based on their deployment choice. The `online` mode is the default and maintains backward compatibility with previous Feast Feature Server implementations. + +- Feature Store File: A base64 encoded version of the `feature_store.yaml` file is needed. + +Helm install examples: ``` -helm install feast-feature-server feast-charts/feast-feature-server --set feature_store_yaml_base64=$(base64 feature_store.yaml) +helm install feast-feature-server feast-charts/feast-feature-server --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-offline-server feast-charts/feast-feature-server --set feast_mode=offline --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-ui-server feast-charts/feast-feature-server --set feast_mode=ui --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-registry-server feast-charts/feast-feature-server --set feast_mode=registry --set feature_store_yaml_base64=$(base64 > feature_store.yaml) + ``` ## Tutorial @@ -26,6 +35,7 @@ See [here](https://github.com/feast-dev/feast/tree/master/examples/python-helm-d | Key | Type | Default | Description | |-----|------|---------|-------------| | affinity | object | `{}` | | +| feast_mode | string | `"online"` | Feast supported deployment modes - online (default), offline, ui and registry | | feature_store_yaml_base64 | string | `""` | [required] a base64 encoded version of feature_store.yaml | | fullnameOverride | string | `""` | | | image.pullPolicy | string | `"IfNotPresent"` | | diff --git a/infra/charts/feast-feature-server/README.md.gotmpl b/infra/charts/feast-feature-server/README.md.gotmpl index fb877208e0..be2fdae248 100644 --- a/infra/charts/feast-feature-server/README.md.gotmpl +++ b/infra/charts/feast-feature-server/README.md.gotmpl @@ -13,9 +13,18 @@ helm repo update Install Feast Feature Server on Kubernetes -A base64 encoded version of the `feature_store.yaml` file is needed. Helm install example: +- Feast Deployment Mode: The Feast Feature Server supports multiple deployment modes using the `feast_mode` property. Supported modes are `online` (default), `offline`, `ui`, and `registry`. +Users can set the `feast_mode` based on their deployment choice. The `online` mode is the default and maintains backward compatibility with previous Feast Feature Server implementations. + +- Feature Store File: A base64 encoded version of the `feature_store.yaml` file is needed. + +Helm install examples: ``` -helm install feast-feature-server feast-charts/feast-feature-server --set feature_store_yaml_base64=$(base64 feature_store.yaml) +helm install feast-feature-server feast-charts/feast-feature-server --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-offline-server feast-charts/feast-feature-server --set feast_mode=offline --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-ui-server feast-charts/feast-feature-server --set feast_mode=ui --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-registry-server feast-charts/feast-feature-server --set feast_mode=registry --set feature_store_yaml_base64=$(base64 > feature_store.yaml) + ``` ## Tutorial diff --git a/infra/charts/feast-feature-server/templates/deployment.yaml b/infra/charts/feast-feature-server/templates/deployment.yaml index 94c56de9dd..85b323610d 100644 --- a/infra/charts/feast-feature-server/templates/deployment.yaml +++ b/infra/charts/feast-feature-server/templates/deployment.yaml @@ -33,19 +33,46 @@ spec: env: - name: FEATURE_STORE_YAML_BASE64 value: {{ .Values.feature_store_yaml_base64 }} - command: ["feast", "serve", "-h", "0.0.0.0"] + command: + {{- if eq .Values.feast_mode "offline" }} + - "feast" + - "serve_offline" + - "-h" + - "0.0.0.0" + {{- else if eq .Values.feast_mode "ui" }} + - "feast" + - "ui" + - "-h" + - "0.0.0.0" + {{- else if eq .Values.feast_mode "registry" }} + - "feast" + - "serve_registry" + {{- else }} + - "feast" + - "serve" + - "-h" + - "0.0.0.0" + {{- end }} ports: - - name: http + - name: {{ .Values.feast_mode }} + {{- if eq .Values.feast_mode "offline" }} + containerPort: 8815 + {{- else if eq .Values.feast_mode "ui" }} + containerPort: 8888 + {{- else if eq .Values.feast_mode "registry" }} + containerPort: 6570 + {{- else }} containerPort: 6566 + {{- end }} protocol: TCP livenessProbe: tcpSocket: - port: http + port: {{ .Values.feast_mode }} initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.livenessProbe.periodSeconds }} readinessProbe: tcpSocket: - port: http + port: {{ .Values.feast_mode }} initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.readinessProbe.periodSeconds }} resources: diff --git a/infra/charts/feast-feature-server/templates/service.yaml b/infra/charts/feast-feature-server/templates/service.yaml index db0ac8b10b..68f096264e 100644 --- a/infra/charts/feast-feature-server/templates/service.yaml +++ b/infra/charts/feast-feature-server/templates/service.yaml @@ -8,7 +8,7 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: {{ .Values.feast_mode }} protocol: TCP name: http selector: diff --git a/infra/charts/feast-feature-server/values.yaml b/infra/charts/feast-feature-server/values.yaml index 168164ffe9..a6dd2d0f94 100644 --- a/infra/charts/feast-feature-server/values.yaml +++ b/infra/charts/feast-feature-server/values.yaml @@ -18,6 +18,9 @@ fullnameOverride: "" # feature_store_yaml_base64 -- [required] a base64 encoded version of feature_store.yaml feature_store_yaml_base64: "" +# feast_mode -- Feast supported deployment modes - online (default), offline, ui and registry +feast_mode: "online" + podAnnotations: {} podSecurityContext: {} diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index d0766b0f4a..eeffc29fab 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -27,6 +27,7 @@ from feast import utils from feast.constants import ( DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, + DEFAULT_OFFLINE_SERVER_PORT, DEFAULT_REGISTRY_SERVER_PORT, ) from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError @@ -765,6 +766,34 @@ def serve_registry_command(ctx: click.Context, port: int): store.serve_registry(port) +@cli.command("serve_offline") +@click.option( + "--host", + "-h", + type=click.STRING, + default="127.0.0.1", + show_default=True, + help="Specify a host for the server", +) +@click.option( + "--port", + "-p", + type=click.INT, + default=DEFAULT_OFFLINE_SERVER_PORT, + help="Specify a port for the server", +) +@click.pass_context +def serve_offline_command( + ctx: click.Context, + host: str, + port: int, +): + """Start a remote server locally on a given host, port.""" + store = create_feature_store(ctx) + + store.serve_offline(host, port) + + @cli.command("validate") @click.option( "--feature-service", diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 6aad3e60bb..fa8674d91d 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -41,6 +41,9 @@ # Default registry server port DEFAULT_REGISTRY_SERVER_PORT = 6570 +# Default offline server port +DEFAULT_OFFLINE_SERVER_PORT = 8815 + # Environment variable for feature server docker image tag DOCKER_IMAGE_TAG_ENV_NAME: str = "FEAST_SERVER_DOCKER_IMAGE_TAG" diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 577bd3fe52..716e706ebe 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2569,6 +2569,12 @@ def serve_registry(self, port: int) -> None: registry_server.start_server(self, port) + def serve_offline(self, host: str, port: int) -> None: + """Start offline server locally on a given port.""" + from feast import offline_server + + offline_server.start_server(self, host, port) + def serve_transformations(self, port: int) -> None: """Start the feature transformation server locally on a given port.""" warnings.warn( diff --git a/sdk/python/feast/infra/offline_stores/remote.py b/sdk/python/feast/infra/offline_stores/remote.py new file mode 100644 index 0000000000..dc657017d9 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/remote.py @@ -0,0 +1,407 @@ +import json +import logging +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union + +import numpy as np +import pandas as pd +import pyarrow as pa +import pyarrow.flight as fl +import pyarrow.parquet +from pydantic import StrictInt, StrictStr + +from feast import OnDemandFeatureView +from feast.data_source import DataSource +from feast.feature_logging import ( + FeatureServiceLoggingSource, + LoggingConfig, + LoggingSource, +) +from feast.feature_view import FeatureView +from feast.infra.offline_stores import offline_utils +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, + RetrievalMetadata, +) +from feast.infra.registry.base_registry import BaseRegistry +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.saved_dataset import SavedDatasetStorage + +logger = logging.getLogger(__name__) + + +class RemoteOfflineStoreConfig(FeastConfigBaseModel): + type: Literal["remote"] = "remote" + host: StrictStr + """ str: remote offline store server port, e.g. the host URL for offline store of arrow flight server. """ + + port: Optional[StrictInt] = None + """ str: remote offline store server port.""" + + +class RemoteRetrievalJob(RetrievalJob): + def __init__( + self, + client: fl.FlightClient, + api: str, + api_parameters: Dict[str, Any], + entity_df: Union[pd.DataFrame, str] = None, + table: pa.Table = None, + metadata: Optional[RetrievalMetadata] = None, + ): + # Initialize the client connection + self.client = client + self.api = api + self.api_parameters = api_parameters + self.entity_df = entity_df + self.table = table + self._metadata = metadata + + # Invoked to realize the Pandas DataFrame + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: + # We use arrow format because it gives better control of the table schema + return self._to_arrow_internal().to_pandas() + + # Invoked to synchronously execute the underlying query and return the result as an arrow table + # This is where do_get service is invoked + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: + return _send_retrieve_remote( + self.api, self.api_parameters, self.entity_df, self.table, self.client + ) + + @property + def on_demand_feature_views(self) -> List[OnDemandFeatureView]: + return [] + + @property + def metadata(self) -> Optional[RetrievalMetadata]: + return self._metadata + + @property + def full_feature_names(self) -> bool: + return self.api_parameters["full_feature_names"] + + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: Optional[int] = None, + ): + """ + Arrow flight action is being used to perform the persist action remotely + """ + + api_parameters = { + "data_source_name": storage.to_data_source().name, + "allow_overwrite": allow_overwrite, + "timeout": timeout, + } + + # Add api parameters to command + for key, value in self.api_parameters.items(): + api_parameters[key] = value + + api_parameters["retrieve_func"] = self.api + + _call_put( + api=RemoteRetrievalJob.persist.__name__, + api_parameters=api_parameters, + client=self.client, + table=self.table, + entity_df=self.entity_df, + ) + + +class RemoteOfflineStore(OfflineStore): + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> RemoteRetrievalJob: + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + feature_view_names = [fv.name for fv in feature_views] + name_aliases = [fv.projection.name_alias for fv in feature_views] + + api_parameters = { + "feature_view_names": feature_view_names, + "feature_refs": feature_refs, + "project": project, + "full_feature_names": full_feature_names, + "name_aliases": name_aliases, + } + + return RemoteRetrievalJob( + client=client, + api=OfflineStore.get_historical_features.__name__, + api_parameters=api_parameters, + entity_df=entity_df, + metadata=_create_retrieval_metadata(feature_refs, entity_df), + ) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + api_parameters = { + "data_source_name": data_source.name, + "join_key_columns": join_key_columns, + "feature_name_columns": feature_name_columns, + "timestamp_field": timestamp_field, + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + } + + return RemoteRetrievalJob( + client=client, + api=OfflineStore.pull_all_from_table_or_query.__name__, + api_parameters=api_parameters, + ) + + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + api_parameters = { + "data_source_name": data_source.name, + "join_key_columns": join_key_columns, + "feature_name_columns": feature_name_columns, + "timestamp_field": timestamp_field, + "created_timestamp_column": created_timestamp_column, + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + } + + return RemoteRetrievalJob( + client=client, + api=OfflineStore.pull_latest_from_table_or_query.__name__, + api_parameters=api_parameters, + ) + + @staticmethod + def write_logged_features( + config: RepoConfig, + data: Union[pyarrow.Table, Path], + source: LoggingSource, + logging_config: LoggingConfig, + registry: BaseRegistry, + ): + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + assert isinstance(source, FeatureServiceLoggingSource) + + if isinstance(data, Path): + data = pyarrow.parquet.read_table(data, use_threads=False, pre_buffer=False) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + api_parameters = { + "feature_service_name": source._feature_service.name, + } + + _call_put( + api=OfflineStore.write_logged_features.__name__, + api_parameters=api_parameters, + client=client, + table=data, + entity_df=None, + ) + + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pyarrow.Table, + progress: Optional[Callable[[int], Any]], + ): + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + feature_view_names = [feature_view.name] + name_aliases = [feature_view.projection.name_alias] + + api_parameters = { + "feature_view_names": feature_view_names, + "progress": progress, + "name_aliases": name_aliases, + } + + _call_put( + api=OfflineStore.offline_write_batch.__name__, + api_parameters=api_parameters, + client=client, + table=table, + entity_df=None, + ) + + @staticmethod + def init_client(config): + location = f"grpc://{config.offline_store.host}:{config.offline_store.port}" + client = fl.connect(location=location) + logger.info(f"Connecting FlightClient at {location}") + return client + + +def _create_retrieval_metadata(feature_refs: List[str], entity_df: pd.DataFrame): + entity_schema = _get_entity_schema( + entity_df=entity_df, + ) + + event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df( + entity_schema=entity_schema, + ) + + timestamp_range = _get_entity_df_event_timestamp_range( + entity_df, event_timestamp_col + ) + + return RetrievalMetadata( + features=feature_refs, + keys=list(set(entity_df.columns) - {event_timestamp_col}), + min_event_timestamp=timestamp_range[0], + max_event_timestamp=timestamp_range[1], + ) + + +def _get_entity_schema(entity_df: pd.DataFrame) -> Dict[str, np.dtype]: + return dict(zip(entity_df.columns, entity_df.dtypes)) + + +def _get_entity_df_event_timestamp_range( + entity_df: Union[pd.DataFrame, str], + entity_df_event_timestamp_col: str, +) -> Tuple[datetime, datetime]: + if not isinstance(entity_df, pd.DataFrame): + raise ValueError( + f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}" + ) + + entity_df_event_timestamp = entity_df.loc[ + :, entity_df_event_timestamp_col + ].infer_objects() + if pd.api.types.is_string_dtype(entity_df_event_timestamp): + entity_df_event_timestamp = pd.to_datetime(entity_df_event_timestamp, utc=True) + + return ( + entity_df_event_timestamp.min().to_pydatetime(), + entity_df_event_timestamp.max().to_pydatetime(), + ) + + +def _send_retrieve_remote( + api: str, + api_parameters: Dict[str, Any], + entity_df: Union[pd.DataFrame, str], + table: pa.Table, + client: fl.FlightClient, +): + command_descriptor = _call_put(api, api_parameters, client, entity_df, table) + return _call_get(client, command_descriptor) + + +def _call_get(client: fl.FlightClient, command_descriptor: fl.FlightDescriptor): + flight = client.get_flight_info(command_descriptor) + ticket = flight.endpoints[0].ticket + reader = client.do_get(ticket) + return reader.read_all() + + +def _call_put( + api: str, + api_parameters: Dict[str, Any], + client: fl.FlightClient, + entity_df: Union[pd.DataFrame, str], + table: pa.Table, +): + # Generate unique command identifier + command_id = str(uuid.uuid4()) + command = { + "command_id": command_id, + "api": api, + } + # Add api parameters to command + for key, value in api_parameters.items(): + command[key] = value + + command_descriptor = fl.FlightDescriptor.for_command( + json.dumps( + command, + ) + ) + + _put_parameters(command_descriptor, entity_df, table, client) + return command_descriptor + + +def _put_parameters( + command_descriptor: fl.FlightDescriptor, + entity_df: Union[pd.DataFrame, str], + table: pa.Table, + client: fl.FlightClient, +): + updatedTable: pa.Table + + if entity_df is not None: + updatedTable = pa.Table.from_pandas(entity_df) + elif table is not None: + updatedTable = table + else: + updatedTable = _create_empty_table() + + writer, _ = client.do_put( + command_descriptor, + updatedTable.schema, + ) + + writer.write_table(updatedTable) + writer.close() + + +def _create_empty_table(): + schema = pa.schema( + { + "key": pa.string(), + } + ) + + keys = ["mock_key"] + + table = pa.Table.from_pydict(dict(zip(schema.names, keys)), schema=schema) + + return table diff --git a/sdk/python/feast/offline_server.py b/sdk/python/feast/offline_server.py new file mode 100644 index 0000000000..718da1b109 --- /dev/null +++ b/sdk/python/feast/offline_server.py @@ -0,0 +1,332 @@ +import ast +import json +import logging +import traceback +from datetime import datetime +from typing import Any, Dict, List + +import pyarrow as pa +import pyarrow.flight as fl + +from feast import FeatureStore, FeatureView, utils +from feast.feature_logging import FeatureServiceLoggingSource +from feast.feature_view import DUMMY_ENTITY_NAME +from feast.infra.offline_stores.offline_utils import get_offline_store_from_config +from feast.saved_dataset import SavedDatasetStorage + +logger = logging.getLogger(__name__) + + +class OfflineServer(fl.FlightServerBase): + def __init__(self, store: FeatureStore, location: str, **kwargs): + super(OfflineServer, self).__init__(location, **kwargs) + self._location = location + # A dictionary of configured flights, e.g. API calls received and not yet served + self.flights: Dict[str, Any] = {} + self.store = store + self.offline_store = get_offline_store_from_config(store.config.offline_store) + + @classmethod + def descriptor_to_key(self, descriptor: fl.FlightDescriptor): + return ( + descriptor.descriptor_type.value, + descriptor.command, + tuple(descriptor.path or tuple()), + ) + + def _make_flight_info(self, key: Any, descriptor: fl.FlightDescriptor): + endpoints = [fl.FlightEndpoint(repr(key), [self._location])] + # TODO calculate actual schema from the given features + schema = pa.schema([]) + + return fl.FlightInfo(schema, descriptor, endpoints, -1, -1) + + def get_flight_info( + self, context: fl.ServerCallContext, descriptor: fl.FlightDescriptor + ): + key = OfflineServer.descriptor_to_key(descriptor) + if key in self.flights: + return self._make_flight_info(key, descriptor) + raise KeyError("Flight not found.") + + def list_flights(self, context: fl.ServerCallContext, criteria: bytes): + for key, table in self.flights.items(): + if key[1] is not None: + descriptor = fl.FlightDescriptor.for_command(key[1]) + else: + descriptor = fl.FlightDescriptor.for_path(*key[2]) + + yield self._make_flight_info(key, descriptor) + + # Expects to receive request parameters and stores them in the flights dictionary + # Indexed by the unique command + def do_put( + self, + context: fl.ServerCallContext, + descriptor: fl.FlightDescriptor, + reader: fl.MetadataRecordBatchReader, + writer: fl.FlightMetadataWriter, + ): + key = OfflineServer.descriptor_to_key(descriptor) + command = json.loads(key[1]) + if "api" in command: + data = reader.read_all() + logger.debug(f"do_put: command is{command}, data is {data}") + self.flights[key] = data + + self._call_api(command, key) + else: + logger.warning(f"No 'api' field in command: {command}") + + def _call_api(self, command: dict, key: str): + remove_data = False + try: + api = command["api"] + if api == OfflineServer.offline_write_batch.__name__: + self.offline_write_batch(command, key) + remove_data = True + elif api == OfflineServer.write_logged_features.__name__: + self.write_logged_features(command, key) + remove_data = True + elif api == OfflineServer.persist.__name__: + self.persist(command["retrieve_func"], command, key) + remove_data = True + except Exception as e: + remove_data = True + logger.exception(e) + traceback.print_exc() + raise e + finally: + if remove_data: + # Get service is consumed, so we clear the corresponding flight and data + del self.flights[key] + + def get_feature_view_by_name( + self, fv_name: str, name_alias: str, project: str + ) -> FeatureView: + """ + Retrieves a feature view by name, including all subclasses of FeatureView. + + Args: + fv_name: Name of feature view + name_alias: Alias to be applied to the projection of the registered view + project: Feast project that this feature view belongs to + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + try: + fv = self.store.registry.get_feature_view(name=fv_name, project=project) + if name_alias is not None: + for fs in self.store.registry.list_feature_services(project=project): + for p in fs.feature_view_projections: + if p.name_alias == name_alias: + logger.debug( + f"Found matching FeatureService {fs.name} with projection {p}" + ) + fv = fv.with_projection(p) + return fv + except Exception: + try: + return self.store.registry.get_stream_feature_view( + name=fv_name, project=project + ) + except Exception as e: + logger.error( + f"Cannot find any FeatureView by name {fv_name} in project {project}" + ) + raise e + + def list_feature_views_by_name( + self, feature_view_names: List[str], name_aliases: List[str], project: str + ) -> List[FeatureView]: + return [ + remove_dummies( + self.get_feature_view_by_name( + fv_name=fv_name, name_alias=name_aliases[index], project=project + ) + ) + for index, fv_name in enumerate(feature_view_names) + ] + + # Extracts the API parameters from the flights dictionary, delegates the execution to the FeatureStore instance + # and returns the stream of data + def do_get(self, context: fl.ServerCallContext, ticket: fl.Ticket): + key = ast.literal_eval(ticket.ticket.decode()) + if key not in self.flights: + logger.error(f"Unknown key {key}") + return None + + command = json.loads(key[1]) + api = command["api"] + logger.debug(f"get command is {command}") + logger.debug(f"requested api is {api}") + try: + if api == OfflineServer.get_historical_features.__name__: + table = self.get_historical_features(command, key).to_arrow() + elif api == OfflineServer.pull_all_from_table_or_query.__name__: + table = self.pull_all_from_table_or_query(command).to_arrow() + elif api == OfflineServer.pull_latest_from_table_or_query.__name__: + table = self.pull_latest_from_table_or_query(command).to_arrow() + else: + raise NotImplementedError + except Exception as e: + logger.exception(e) + traceback.print_exc() + raise e + + # Get service is consumed, so we clear the corresponding flight and data + del self.flights[key] + return fl.RecordBatchStream(table) + + def offline_write_batch(self, command: dict, key: str): + feature_view_names = command["feature_view_names"] + assert ( + len(feature_view_names) == 1 + ), "feature_view_names list should only have one item" + name_aliases = command["name_aliases"] + assert len(name_aliases) == 1, "name_aliases list should only have one item" + project = self.store.config.project + feature_views = self.list_feature_views_by_name( + feature_view_names=feature_view_names, + name_aliases=name_aliases, + project=project, + ) + + assert len(feature_views) == 1 + table = self.flights[key] + self.offline_store.offline_write_batch( + self.store.config, feature_views[0], table, command["progress"] + ) + + def write_logged_features(self, command: dict, key: str): + table = self.flights[key] + feature_service = self.store.get_feature_service( + command["feature_service_name"] + ) + + assert feature_service.logging_config is not None + + self.offline_store.write_logged_features( + config=self.store.config, + data=table, + source=FeatureServiceLoggingSource( + feature_service, self.store.config.project + ), + logging_config=feature_service.logging_config, + registry=self.store.registry, + ) + + def pull_all_from_table_or_query(self, command: dict): + return self.offline_store.pull_all_from_table_or_query( + self.store.config, + self.store.get_data_source(command["data_source_name"]), + command["join_key_columns"], + command["feature_name_columns"], + command["timestamp_field"], + utils.make_tzaware(datetime.fromisoformat(command["start_date"])), + utils.make_tzaware(datetime.fromisoformat(command["end_date"])), + ) + + def pull_latest_from_table_or_query(self, command: dict): + return self.offline_store.pull_latest_from_table_or_query( + self.store.config, + self.store.get_data_source(command["data_source_name"]), + command["join_key_columns"], + command["feature_name_columns"], + command["timestamp_field"], + command["created_timestamp_column"], + utils.make_tzaware(datetime.fromisoformat(command["start_date"])), + utils.make_tzaware(datetime.fromisoformat(command["end_date"])), + ) + + def list_actions(self, context): + return [ + ( + OfflineServer.offline_write_batch.__name__, + "Writes the specified arrow table to the data source underlying the specified feature view.", + ), + ( + OfflineServer.write_logged_features.__name__, + "Writes logged features to a specified destination in the offline store.", + ), + ( + OfflineServer.persist.__name__, + "Synchronously executes the underlying query and persists the result in the same offline store at the " + "specified destination.", + ), + ] + + def get_historical_features(self, command: dict, key: str): + # Extract parameters from the internal flights dictionary + entity_df_value = self.flights[key] + entity_df = pa.Table.to_pandas(entity_df_value) + feature_view_names = command["feature_view_names"] + name_aliases = command["name_aliases"] + feature_refs = command["feature_refs"] + project = command["project"] + full_feature_names = command["full_feature_names"] + feature_views = self.list_feature_views_by_name( + feature_view_names=feature_view_names, + name_aliases=name_aliases, + project=project, + ) + retJob = self.offline_store.get_historical_features( + config=self.store.config, + feature_views=feature_views, + feature_refs=feature_refs, + entity_df=entity_df, + registry=self.store.registry, + project=project, + full_feature_names=full_feature_names, + ) + return retJob + + def persist(self, retrieve_func: str, command: dict, key: str): + try: + if retrieve_func == OfflineServer.get_historical_features.__name__: + ret_job = self.get_historical_features(command, key) + elif ( + retrieve_func == OfflineServer.pull_latest_from_table_or_query.__name__ + ): + ret_job = self.pull_latest_from_table_or_query(command) + elif retrieve_func == OfflineServer.pull_all_from_table_or_query.__name__: + ret_job = self.pull_all_from_table_or_query(command) + else: + raise NotImplementedError + + data_source = self.store.get_data_source(command["data_source_name"]) + storage = SavedDatasetStorage.from_data_source(data_source) + ret_job.persist(storage, command["allow_overwrite"], command["timeout"]) + except Exception as e: + logger.exception(e) + traceback.print_exc() + raise e + + def do_action(self, context: fl.ServerCallContext, action: fl.Action): + pass + + def do_drop_dataset(self, dataset): + pass + + +def remove_dummies(fv: FeatureView) -> FeatureView: + """ + Removes dummmy IDs from FeatureView instances created with FeatureView.from_proto + """ + if DUMMY_ENTITY_NAME in fv.entities: + fv.entities = [] + fv.entity_columns = [] + return fv + + +def start_server( + store: FeatureStore, + host: str, + port: int, +): + location = "grpc+tcp://{}:{}".format(host, port) + server = OfflineServer(store, location) + logger.info(f"Offline store server serving on {location}") + server.serve() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 6ef81794bf..b7c7b0a9d0 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -77,6 +77,7 @@ "athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore", "mssql": "feast.infra.offline_stores.contrib.mssql_offline_store.mssql.MsSqlServerOfflineStore", "duckdb": "feast.infra.offline_stores.duckdb.DuckDBOfflineStore", + "remote": "feast.infra.offline_stores.remote.RemoteOfflineStore", } FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/templates/local/bootstrap.py b/sdk/python/feast/templates/local/bootstrap.py index 125eb7c2e7..ee2847c19c 100644 --- a/sdk/python/feast/templates/local/bootstrap.py +++ b/sdk/python/feast/templates/local/bootstrap.py @@ -24,6 +24,7 @@ def bootstrap(): example_py_file = repo_path / "example_repo.py" replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) + replace_str_in_file(example_py_file, "%LOGGING_PATH%", str(data_path)) if __name__ == "__main__": diff --git a/sdk/python/feast/templates/local/feature_repo/example_repo.py b/sdk/python/feast/templates/local/feature_repo/example_repo.py index 5aed3371b1..debe9d45e9 100644 --- a/sdk/python/feast/templates/local/feature_repo/example_repo.py +++ b/sdk/python/feast/templates/local/feature_repo/example_repo.py @@ -13,6 +13,8 @@ PushSource, RequestSource, ) +from feast.feature_logging import LoggingConfig +from feast.infra.offline_stores.file_source import FileLoggingDestination from feast.on_demand_feature_view import on_demand_feature_view from feast.types import Float32, Float64, Int64 @@ -88,6 +90,9 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view transformed_conv_rate, # Selects all features from the feature view ], + logging_config=LoggingConfig( + destination=FileLoggingDestination(path="%LOGGING_PATH%") + ), ) driver_activity_v2 = FeatureService( name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate] diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 7c875fc9bd..775db8c388 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -305,10 +305,7 @@ def pytest_generate_tests(metafunc: pytest.Metafunc): @pytest.fixture def feature_server_endpoint(environment): - if ( - not environment.python_feature_server - or environment.test_repo_config.provider != "local" - ): + if not environment.python_feature_server or environment.provider != "local": yield environment.feature_store.get_feature_server_endpoint() return diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 4d1c63127c..be01a1e1ac 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -35,6 +35,7 @@ DuckDBDataSourceCreator, DuckDBDeltaDataSourceCreator, FileDataSourceCreator, + RemoteOfflineStoreDataSourceCreator, ) from tests.integration.feature_repos.universal.data_sources.redshift import ( RedshiftDataSourceCreator, @@ -121,6 +122,7 @@ ("local", FileDataSourceCreator), ("local", DuckDBDataSourceCreator), ("local", DuckDBDeltaDataSourceCreator), + ("local", RemoteOfflineStoreDataSourceCreator), ] if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True": diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 18094b723f..f7ab55d868 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -1,18 +1,22 @@ +import logging import os.path import shutil +import subprocess import tempfile import uuid +from pathlib import Path from typing import Any, Dict, List, Optional import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +import yaml from minio import Minio from testcontainers.core.generic import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.minio import MinioContainer -from feast import FileSource +from feast import FileSource, RepoConfig from feast.data_format import DeltaFormat, ParquetFormat from feast.data_source import DataSource from feast.feature_logging import LoggingDestination @@ -22,10 +26,15 @@ FileLoggingDestination, SavedDatasetFileStorage, ) -from feast.repo_config import FeastConfigBaseModel +from feast.infra.offline_stores.remote import RemoteOfflineStoreConfig +from feast.repo_config import FeastConfigBaseModel, RegistryConfig +from feast.wait import wait_retry_backoff # noqa: E402 from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) +from tests.utils.http_server import check_port_open, free_port # noqa: E402 + +logger = logging.getLogger(__name__) class FileDataSourceCreator(DataSourceCreator): @@ -352,3 +361,69 @@ def create_offline_store_config(self): staging_location_endpoint_override=self.endpoint_url, ) return self.duckdb_offline_store_config + + +class RemoteOfflineStoreDataSourceCreator(FileDataSourceCreator): + def __init__(self, project_name: str, *args, **kwargs): + super().__init__(project_name) + self.server_port: int = 0 + self.proc = None + + def setup(self, registry: RegistryConfig): + parent_offline_config = super().create_offline_store_config() + config = RepoConfig( + project=self.project_name, + provider="local", + offline_store=parent_offline_config, + registry=registry.path, + entity_key_serialization_version=2, + ) + + repo_path = Path(tempfile.mkdtemp()) + with open(repo_path / "feature_store.yaml", "w") as outfile: + yaml.dump(config.dict(by_alias=True), outfile) + repo_path = str(repo_path.resolve()) + + self.server_port = free_port() + host = "0.0.0.0" + cmd = [ + "feast", + "-c" + repo_path, + "serve_offline", + "--host", + host, + "--port", + str(self.server_port), + ] + self.proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL + ) + + _time_out_sec: int = 60 + # Wait for server to start + wait_retry_backoff( + lambda: (None, check_port_open(host, self.server_port)), + timeout_secs=_time_out_sec, + timeout_msg=f"Unable to start the feast remote offline server in {_time_out_sec} seconds at port={self.server_port}", + ) + return "grpc+tcp://{}:{}".format(host, self.server_port) + + def create_offline_store_config(self) -> FeastConfigBaseModel: + self.remote_offline_store_config = RemoteOfflineStoreConfig( + type="remote", host="0.0.0.0", port=self.server_port + ) + return self.remote_offline_store_config + + def teardown(self): + super().teardown() + if self.proc is not None: + self.proc.kill() + + # wait server to free the port + wait_retry_backoff( + lambda: ( + None, + not check_port_open("localhost", self.server_port), + ), + timeout_secs=30, + ) diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index eba994544d..32f506f90b 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -34,8 +34,6 @@ def test_feature_service_logging(environment, universal_data_sources, pass_as_pa (_, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - store.apply([customer(), driver(), location(), *feature_views.values()]) - feature_service = FeatureService( name="test_service", features=[ @@ -49,6 +47,17 @@ def test_feature_service_logging(environment, universal_data_sources, pass_as_pa ), ) + store.apply( + [customer(), driver(), location(), *feature_views.values()], feature_service + ) + + # Added to handle the case that the offline store is remote + store.registry.apply_feature_service(feature_service, store.config.project) + store.registry.apply_data_source( + feature_service.logging_config.destination.to_data_source(), + store.config.project, + ) + driver_df = datasets.driver_df driver_df["val_to_add"] = 50 driver_df = driver_df.join(conv_rate_plus_100(driver_df)) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index a6db7f2535..bfb8a56200 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -19,6 +19,9 @@ construct_universal_feature_views, table_name_from_data_source, ) +from tests.integration.feature_repos.universal.data_sources.file import ( + RemoteOfflineStoreDataSourceCreator, +) from tests.integration.feature_repos.universal.data_sources.snowflake import ( SnowflakeDataSourceCreator, ) @@ -157,22 +160,25 @@ def test_historical_features_main( timestamp_precision=timedelta(milliseconds=1), ) - assert_feature_service_correctness( - store, - feature_service, - full_feature_names, - entity_df_with_request_data, - expected_df, - event_timestamp, - ) - assert_feature_service_entity_mapping_correctness( - store, - feature_service_entity_mapping, - full_feature_names, - entity_df_with_request_data, - full_expected_df, - event_timestamp, - ) + if not isinstance( + environment.data_source_creator, RemoteOfflineStoreDataSourceCreator + ): + assert_feature_service_correctness( + store, + feature_service, + full_feature_names, + entity_df_with_request_data, + expected_df, + event_timestamp, + ) + assert_feature_service_entity_mapping_correctness( + store, + feature_service_entity_mapping, + full_feature_names, + entity_df_with_request_data, + full_expected_df, + event_timestamp, + ) table_from_df_entities: pd.DataFrame = job_from_df.to_arrow().to_pandas() validate_dataframes( @@ -375,8 +381,13 @@ def test_historical_features_persisting( (entities, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) + storage = environment.data_source_creator.create_saved_dataset_destination() + store.apply([driver(), customer(), location(), *feature_views.values()]) + # Added to handle the case that the offline store is remote + store.registry.apply_data_source(storage.to_data_source(), store.config.project) + entity_df = datasets.entity_df.drop( columns=["order_id", "origin_id", "destination_id"] ) @@ -398,7 +409,7 @@ def test_historical_features_persisting( saved_dataset = store.create_saved_dataset( from_=job, name="saved_dataset", - storage=environment.data_source_creator.create_saved_dataset_destination(), + storage=storage, tags={"env": "test"}, allow_overwrite=True, ) diff --git a/sdk/python/tests/integration/offline_store/test_validation.py b/sdk/python/tests/integration/offline_store/test_validation.py index fdf182be57..1731f823c8 100644 --- a/sdk/python/tests/integration/offline_store/test_validation.py +++ b/sdk/python/tests/integration/offline_store/test_validation.py @@ -45,8 +45,13 @@ def test_historical_retrieval_with_validation(environment, universal_data_source store = environment.feature_store (entities, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) + storage = environment.data_source_creator.create_saved_dataset_destination() + store.apply([driver(), customer(), location(), *feature_views.values()]) + # Added to handle the case that the offline store is remote + store.registry.apply_data_source(storage.to_data_source(), store.config.project) + # Create two identical retrieval jobs entity_df = datasets.entity_df.drop( columns=["order_id", "origin_id", "destination_id"] @@ -64,7 +69,7 @@ def test_historical_retrieval_with_validation(environment, universal_data_source store.create_saved_dataset( from_=reference_job, name="my_training_dataset", - storage=environment.data_source_creator.create_saved_dataset_destination(), + storage=storage, allow_overwrite=True, ) saved_dataset = store.get_saved_dataset("my_training_dataset") @@ -80,9 +85,13 @@ def test_historical_retrieval_fails_on_validation(environment, universal_data_so (entities, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) + storage = environment.data_source_creator.create_saved_dataset_destination() store.apply([driver(), customer(), location(), *feature_views.values()]) + # Added to handle the case that the offline store is remote + store.registry.apply_data_source(storage.to_data_source(), store.config.project) + entity_df = datasets.entity_df.drop( columns=["order_id", "origin_id", "destination_id"] ) @@ -95,7 +104,7 @@ def test_historical_retrieval_fails_on_validation(environment, universal_data_so store.create_saved_dataset( from_=reference_job, name="my_other_dataset", - storage=environment.data_source_creator.create_saved_dataset_destination(), + storage=storage, allow_overwrite=True, ) @@ -149,10 +158,19 @@ def test_logged_features_validation(environment, universal_data_sources): ), ) + storage = environment.data_source_creator.create_saved_dataset_destination() + store.apply( [driver(), customer(), location(), feature_service, *feature_views.values()] ) + # Added to handle the case that the offline store is remote + store.registry.apply_data_source( + feature_service.logging_config.destination.to_data_source(), + store.config.project, + ) + store.registry.apply_data_source(storage.to_data_source(), store.config.project) + entity_df = datasets.entity_df.drop( columns=["order_id", "origin_id", "destination_id"] ) @@ -180,7 +198,7 @@ def test_logged_features_validation(environment, universal_data_sources): entity_df=entity_df, features=store_fs, full_feature_names=True ), name="reference_for_validating_logged_features", - storage=environment.data_source_creator.create_saved_dataset_destination(), + storage=storage, allow_overwrite=True, ) diff --git a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py index 79a3a27b67..fd50d37632 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py @@ -29,6 +29,10 @@ RedshiftOfflineStoreConfig, RedshiftRetrievalJob, ) +from feast.infra.offline_stores.remote import ( + RemoteOfflineStoreConfig, + RemoteRetrievalJob, +) from feast.infra.offline_stores.snowflake import ( SnowflakeOfflineStoreConfig, SnowflakeRetrievalJob, @@ -104,6 +108,7 @@ def metadata(self) -> Optional[RetrievalMetadata]: PostgreSQLRetrievalJob, SparkRetrievalJob, TrinoRetrievalJob, + RemoteRetrievalJob, ] ) def retrieval_job(request, environment): @@ -203,6 +208,35 @@ def retrieval_job(request, environment): config=environment.config, full_feature_names=False, ) + elif request.param is RemoteRetrievalJob: + offline_store_config = RemoteOfflineStoreConfig( + type="remote", + host="localhost", + port=0, + ) + environment.config._offline_store = offline_store_config + + entity_df = pd.DataFrame.from_dict( + { + "id": [1], + "event_timestamp": ["datetime"], + "val_to_add": [1], + } + ) + + return RemoteRetrievalJob( + client=MagicMock(), + api_parameters={ + "str": "str", + }, + api="api", + table=pyarrow.Table.from_pandas(entity_df), + entity_df=entity_df, + metadata=RetrievalMetadata( + features=["1", "2", "3", "4"], + keys=["1", "2", "3", "4"], + ), + ) else: return request.param() diff --git a/sdk/python/tests/unit/test_offline_server.py b/sdk/python/tests/unit/test_offline_server.py new file mode 100644 index 0000000000..5991e7450d --- /dev/null +++ b/sdk/python/tests/unit/test_offline_server.py @@ -0,0 +1,250 @@ +import os +import tempfile +from datetime import datetime, timedelta + +import assertpy +import pandas as pd +import pyarrow as pa +import pyarrow.flight as flight +import pytest + +from feast import FeatureStore +from feast.feature_logging import FeatureServiceLoggingSource +from feast.infra.offline_stores.remote import ( + RemoteOfflineStore, + RemoteOfflineStoreConfig, +) +from feast.offline_server import OfflineServer +from feast.repo_config import RepoConfig +from tests.utils.cli_repo_creator import CliRunner + +PROJECT_NAME = "test_remote_offline" + + +@pytest.fixture +def empty_offline_server(environment): + store = environment.feature_store + + location = "grpc+tcp://localhost:0" + return OfflineServer(store=store, location=location) + + +@pytest.fixture +def arrow_client(empty_offline_server): + return flight.FlightClient(f"grpc://localhost:{empty_offline_server.port}") + + +def test_offline_server_is_alive(environment, empty_offline_server, arrow_client): + server = empty_offline_server + client = arrow_client + + assertpy.assert_that(server).is_not_none + assertpy.assert_that(server.port).is_not_equal_to(0) + + actions = list(client.list_actions()) + flights = list(client.list_flights()) + + assertpy.assert_that(actions).is_equal_to( + [ + ( + "offline_write_batch", + "Writes the specified arrow table to the data source underlying the specified feature view.", + ), + ( + "write_logged_features", + "Writes logged features to a specified destination in the offline store.", + ), + ( + "persist", + "Synchronously executes the underlying query and persists the result in the same offline store at the " + "specified destination.", + ), + ] + ) + assertpy.assert_that(flights).is_empty() + + +def default_store(temp_dir): + runner = CliRunner() + result = runner.run(["init", PROJECT_NAME], cwd=temp_dir) + repo_path = os.path.join(temp_dir, PROJECT_NAME, "feature_repo") + assert result.returncode == 0 + + result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir) + assert result.returncode == 0 + + fs = FeatureStore(repo_path=repo_path) + return fs + + +def remote_feature_store(offline_server): + offline_config = RemoteOfflineStoreConfig( + type="remote", host="0.0.0.0", port=offline_server.port + ) + + registry_path = os.path.join( + str(offline_server.store.repo_path), + offline_server.store.config.registry.path, + ) + store = FeatureStore( + config=RepoConfig( + project=PROJECT_NAME, + registry=registry_path, + provider="local", + offline_store=offline_config, + entity_key_serialization_version=2, + ) + ) + return store + + +def test_remote_offline_store_apis(): + with tempfile.TemporaryDirectory() as temp_dir: + store = default_store(str(temp_dir)) + location = "grpc+tcp://localhost:0" + server = OfflineServer(store=store, location=location) + + assertpy.assert_that(server).is_not_none + assertpy.assert_that(server.port).is_not_equal_to(0) + + fs = remote_feature_store(server) + + _test_get_historical_features_returns_data(fs) + _test_get_historical_features_returns_nan(fs) + _test_offline_write_batch(str(temp_dir), fs) + _test_write_logged_features(str(temp_dir), fs) + _test_pull_latest_from_table_or_query(str(temp_dir), fs) + _test_pull_all_from_table_or_query(str(temp_dir), fs) + + +def _test_get_historical_features_returns_data(fs: FeatureStore): + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + + training_df = fs.get_historical_features(entity_df, features).to_df() + + assertpy.assert_that(training_df).is_not_none() + assertpy.assert_that(len(training_df)).is_equal_to(3) + + for index, driver_id in enumerate(entity_df["driver_id"]): + assertpy.assert_that(training_df["driver_id"][index]).is_equal_to(driver_id) + for feature in features: + column_id = feature.split(":")[1] + value = training_df[column_id][index] + assertpy.assert_that(value).is_not_nan() + + +def _test_get_historical_features_returns_nan(fs: FeatureStore): + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1, 2, 3], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + + training_df = fs.get_historical_features(entity_df, features).to_df() + + assertpy.assert_that(training_df).is_not_none() + assertpy.assert_that(len(training_df)).is_equal_to(3) + + for index, driver_id in enumerate(entity_df["driver_id"]): + assertpy.assert_that(training_df["driver_id"][index]).is_equal_to(driver_id) + for feature in features: + column_id = feature.split(":")[1] + value = training_df[column_id][index] + assertpy.assert_that(value).is_nan() + + +def _test_offline_write_batch(temp_dir, fs: FeatureStore): + data_file = os.path.join( + temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" + ) + data_df = pd.read_parquet(data_file) + feature_view = fs.get_feature_view("driver_hourly_stats") + + RemoteOfflineStore.offline_write_batch( + fs.config, feature_view, pa.Table.from_pandas(data_df), progress=None + ) + + +def _test_write_logged_features(temp_dir, fs: FeatureStore): + data_file = os.path.join( + temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" + ) + data_df = pd.read_parquet(data_file) + feature_service = fs.get_feature_service("driver_activity_v1") + + RemoteOfflineStore.write_logged_features( + config=fs.config, + data=pa.Table.from_pandas(data_df), + source=FeatureServiceLoggingSource(feature_service, fs.config.project), + logging_config=feature_service.logging_config, + registry=fs.registry, + ) + + +def _test_pull_latest_from_table_or_query(temp_dir, fs: FeatureStore): + data_source = fs.get_data_source("driver_hourly_stats_source") + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + RemoteOfflineStore.pull_latest_from_table_or_query( + config=fs.config, + data_source=data_source, + join_key_columns=[], + feature_name_columns=[], + timestamp_field="event_timestamp", + created_timestamp_column="created", + start_date=start_date, + end_date=end_date, + ).to_df() + + +def _test_pull_all_from_table_or_query(temp_dir, fs: FeatureStore): + data_source = fs.get_data_source("driver_hourly_stats_source") + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + RemoteOfflineStore.pull_all_from_table_or_query( + config=fs.config, + data_source=data_source, + join_key_columns=[], + feature_name_columns=[], + timestamp_field="event_timestamp", + start_date=start_date, + end_date=end_date, + ).to_df() diff --git a/sdk/python/tests/utils/http_server.py b/sdk/python/tests/utils/http_server.py index 47c6cb8ac1..5bb6255d72 100644 --- a/sdk/python/tests/utils/http_server.py +++ b/sdk/python/tests/utils/http_server.py @@ -3,9 +3,9 @@ def free_port(): - sock = socket.socket() - sock.bind(("", 0)) - return sock.getsockname()[1] + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.bind(("", 0)) + return sock.getsockname()[1] def check_port_open(host, port) -> bool: