From f663e35301ec81bc32e95b996f85171a75e223c2 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 24 May 2021 10:35:21 -0700 Subject: [PATCH 1/6] Disable indexing Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/gcp.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 94e5639da5..58eb67ea03 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -72,7 +72,9 @@ def update_infra( for table in tables_to_keep: key = client.key("Project", project, "Table", table.name) - entity = datastore.Entity(key=key) + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) entity.update({"created_ts": datetime.utcnow()}) client.put(entity) @@ -247,7 +249,9 @@ def _write_minibatch( key = client.key("Project", project, "Table", table.name, "Row", document_id,) - entity = datastore.Entity(key=key) + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) entity.update( dict( From 319db880739492f88b94650cf2c2dae676563586 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 24 May 2021 11:09:06 -0700 Subject: [PATCH 2/6] Add option to set datastore write concurrency Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/gcp.py | 3 ++- sdk/python/feast/repo_config.py | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 58eb67ea03..8b78267bd9 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -44,6 +44,7 @@ def __init__(self, config: RepoConfig): self._namespace = config.online_store.namespace assert config.offline_store is not None + self._write_concurrency = config.online_store.write_concurrency self.offline_store = get_offline_store_from_config(config.offline_store) def _initialize_client(self): @@ -115,7 +116,7 @@ def online_write_batch( ) -> None: client = self._initialize_client() - pool = ThreadPool(processes=40) + pool = ThreadPool(processes=self._write_concurrency) pool.map( lambda b: _write_minibatch(client, project, table, b, progress), _to_minibatches(data), diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 472e0861df..ec5f3cd93e 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -38,6 +38,9 @@ class DatastoreOnlineStoreConfig(FeastBaseModel): namespace: Optional[StrictStr] = None """ (optional) Datastore namespace """ + write_concurrency: Optional[StrictInt] = 40 + """ (optional) Amount of threads to use when writing into the online store""" + OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig] From 46796fa16a0a783cecb1557dc869974aecce5aa9 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 24 May 2021 14:28:43 -0700 Subject: [PATCH 3/6] Add batch size configuration Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/gcp.py | 3 ++- sdk/python/feast/repo_config.py | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 8b78267bd9..fe691aaee2 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -45,6 +45,7 @@ def __init__(self, config: RepoConfig): assert config.offline_store is not None self._write_concurrency = config.online_store.write_concurrency + self._write_batch_size = config.online_store.write_batch_size self.offline_store = get_offline_store_from_config(config.offline_store) def _initialize_client(self): @@ -119,7 +120,7 @@ def online_write_batch( pool = ThreadPool(processes=self._write_concurrency) pool.map( lambda b: _write_minibatch(client, project, table, b, progress), - _to_minibatches(data), + _to_minibatches(data, batch_size=self._write_batch_size), ) def online_read( diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index ec5f3cd93e..dd93e96826 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -39,7 +39,10 @@ class DatastoreOnlineStoreConfig(FeastBaseModel): """ (optional) Datastore namespace """ write_concurrency: Optional[StrictInt] = 40 - """ (optional) Amount of threads to use when writing into the online store""" + """ (optional) Amount of threads to use when writing batches of feature rows into Datastore""" + + write_batch_size: Optional[StrictInt] = 50 + """ (optional) Amount of feature rows per batch being written into Datastore""" OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig] From 7eb67d1b2de2a2df096ea36b55dc3f5d5e611b2e Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 24 May 2021 14:31:14 -0700 Subject: [PATCH 4/6] Ensure only positive integers Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/gcp.py | 2 +- sdk/python/feast/repo_config.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index fe691aaee2..363a694ef6 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -221,7 +221,7 @@ def get_historical_features( ] -def _to_minibatches(data: ProtoBatch, batch_size=50) -> Iterator[ProtoBatch]: +def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]: """ Split data into minibatches, making sure we stay under GCP datastore transaction size limits. diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index dd93e96826..973f81e4fd 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,7 +1,7 @@ from pathlib import Path import yaml -from pydantic import BaseModel, StrictInt, StrictStr, ValidationError, root_validator +from pydantic import BaseModel, StrictInt, StrictStr, ValidationError, root_validator, PositiveInt from pydantic.error_wrappers import ErrorWrapper from pydantic.typing import Dict, Literal, Optional, Union @@ -38,10 +38,10 @@ class DatastoreOnlineStoreConfig(FeastBaseModel): namespace: Optional[StrictStr] = None """ (optional) Datastore namespace """ - write_concurrency: Optional[StrictInt] = 40 + write_concurrency: Optional[PositiveInt] = 40 """ (optional) Amount of threads to use when writing batches of feature rows into Datastore""" - write_batch_size: Optional[StrictInt] = 50 + write_batch_size: Optional[PositiveInt] = 50 """ (optional) Amount of feature rows per batch being written into Datastore""" From 13dce6ff94b4cfc2afef425f3a585bb23afa3f53 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 24 May 2021 14:31:35 -0700 Subject: [PATCH 5/6] Fix lint Signed-off-by: Willem Pienaar --- sdk/python/feast/repo_config.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 973f81e4fd..89268584f3 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,7 +1,14 @@ from pathlib import Path import yaml -from pydantic import BaseModel, StrictInt, StrictStr, ValidationError, root_validator, PositiveInt +from pydantic import ( + BaseModel, + PositiveInt, + StrictInt, + StrictStr, + ValidationError, + root_validator, +) from pydantic.error_wrappers import ErrorWrapper from pydantic.typing import Dict, Literal, Optional, Union From e6c63e16ac11496cda40685259e5d1a855567c53 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Mon, 24 May 2021 14:49:45 -0700 Subject: [PATCH 6/6] Reorder code Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/gcp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 363a694ef6..cca03d4289 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -42,10 +42,10 @@ def __init__(self, config: RepoConfig): assert isinstance(config.online_store, DatastoreOnlineStoreConfig) self._gcp_project_id = config.online_store.project_id self._namespace = config.online_store.namespace - - assert config.offline_store is not None self._write_concurrency = config.online_store.write_concurrency self._write_batch_size = config.online_store.write_batch_size + + assert config.offline_store is not None self.offline_store = get_offline_store_from_config(config.offline_store) def _initialize_client(self):