From 69a1d16790b5d248b269428c55e655a122770cc0 Mon Sep 17 00:00:00 2001 From: Willem Pienaar <6728866+woop@users.noreply.github.com> Date: Tue, 25 May 2021 15:18:57 -0700 Subject: [PATCH] Fix contention issue (#1582) * Disable indexing Signed-off-by: Willem Pienaar * Add option to set datastore write concurrency Signed-off-by: Willem Pienaar * Add batch size configuration Signed-off-by: Willem Pienaar * Ensure only positive integers Signed-off-by: Willem Pienaar * Fix lint Signed-off-by: Willem Pienaar * Reorder code Signed-off-by: Willem Pienaar --- sdk/python/feast/infra/gcp.py | 16 +++++++++++----- sdk/python/feast/repo_config.py | 15 ++++++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 94e5639da5..cca03d4289 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -42,6 +42,8 @@ def __init__(self, config: RepoConfig): assert isinstance(config.online_store, DatastoreOnlineStoreConfig) self._gcp_project_id = config.online_store.project_id self._namespace = config.online_store.namespace + self._write_concurrency = config.online_store.write_concurrency + self._write_batch_size = config.online_store.write_batch_size assert config.offline_store is not None self.offline_store = get_offline_store_from_config(config.offline_store) @@ -72,7 +74,9 @@ def update_infra( for table in tables_to_keep: key = client.key("Project", project, "Table", table.name) - entity = datastore.Entity(key=key) + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) entity.update({"created_ts": datetime.utcnow()}) client.put(entity) @@ -113,10 +117,10 @@ def online_write_batch( ) -> None: client = self._initialize_client() - pool = ThreadPool(processes=40) + pool = ThreadPool(processes=self._write_concurrency) pool.map( lambda b: _write_minibatch(client, project, table, b, progress), - _to_minibatches(data), + _to_minibatches(data, batch_size=self._write_batch_size), ) def online_read( @@ -217,7 +221,7 @@ def get_historical_features( ] -def _to_minibatches(data: ProtoBatch, batch_size=50) -> Iterator[ProtoBatch]: +def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]: """ Split data into minibatches, making sure we stay under GCP datastore transaction size limits. @@ -247,7 +251,9 @@ def _write_minibatch( key = client.key("Project", project, "Table", table.name, "Row", document_id,) - entity = datastore.Entity(key=key) + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) entity.update( dict( diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 472e0861df..89268584f3 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,7 +1,14 @@ from pathlib import Path import yaml -from pydantic import BaseModel, StrictInt, StrictStr, ValidationError, root_validator +from pydantic import ( + BaseModel, + PositiveInt, + StrictInt, + StrictStr, + ValidationError, + root_validator, +) from pydantic.error_wrappers import ErrorWrapper from pydantic.typing import Dict, Literal, Optional, Union @@ -38,6 +45,12 @@ class DatastoreOnlineStoreConfig(FeastBaseModel): namespace: Optional[StrictStr] = None """ (optional) Datastore namespace """ + write_concurrency: Optional[PositiveInt] = 40 + """ (optional) Amount of threads to use when writing batches of feature rows into Datastore""" + + write_batch_size: Optional[PositiveInt] = 50 + """ (optional) Amount of feature rows per batch being written into Datastore""" + OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig]