Skip to content

Commit

Permalink
Fix contention issue (#1582)
Browse files Browse the repository at this point in the history
* Disable indexing

Signed-off-by: Willem Pienaar <git@willem.co>

* Add option to set datastore write concurrency

Signed-off-by: Willem Pienaar <git@willem.co>

* Add batch size configuration

Signed-off-by: Willem Pienaar <git@willem.co>

* Ensure only positive integers

Signed-off-by: Willem Pienaar <git@willem.co>

* Fix lint

Signed-off-by: Willem Pienaar <git@willem.co>

* Reorder code

Signed-off-by: Willem Pienaar <git@willem.co>
  • Loading branch information
woop authored May 25, 2021
1 parent dfb029d commit 69a1d16
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
16 changes: 11 additions & 5 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def __init__(self, config: RepoConfig):
assert isinstance(config.online_store, DatastoreOnlineStoreConfig)
self._gcp_project_id = config.online_store.project_id
self._namespace = config.online_store.namespace
self._write_concurrency = config.online_store.write_concurrency
self._write_batch_size = config.online_store.write_batch_size

assert config.offline_store is not None
self.offline_store = get_offline_store_from_config(config.offline_store)
Expand Down Expand Up @@ -72,7 +74,9 @@ def update_infra(

for table in tables_to_keep:
key = client.key("Project", project, "Table", table.name)
entity = datastore.Entity(key=key)
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
client.put(entity)

Expand Down Expand Up @@ -113,10 +117,10 @@ def online_write_batch(
) -> None:
client = self._initialize_client()

pool = ThreadPool(processes=40)
pool = ThreadPool(processes=self._write_concurrency)
pool.map(
lambda b: _write_minibatch(client, project, table, b, progress),
_to_minibatches(data),
_to_minibatches(data, batch_size=self._write_batch_size),
)

def online_read(
Expand Down Expand Up @@ -217,7 +221,7 @@ def get_historical_features(
]


def _to_minibatches(data: ProtoBatch, batch_size=50) -> Iterator[ProtoBatch]:
def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]:
"""
Split data into minibatches, making sure we stay under GCP datastore transaction size
limits.
Expand Down Expand Up @@ -247,7 +251,9 @@ def _write_minibatch(

key = client.key("Project", project, "Table", table.name, "Row", document_id,)

entity = datastore.Entity(key=key)
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)

entity.update(
dict(
Expand Down
15 changes: 14 additions & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from pathlib import Path

import yaml
from pydantic import BaseModel, StrictInt, StrictStr, ValidationError, root_validator
from pydantic import (
BaseModel,
PositiveInt,
StrictInt,
StrictStr,
ValidationError,
root_validator,
)
from pydantic.error_wrappers import ErrorWrapper
from pydantic.typing import Dict, Literal, Optional, Union

Expand Down Expand Up @@ -38,6 +45,12 @@ class DatastoreOnlineStoreConfig(FeastBaseModel):
namespace: Optional[StrictStr] = None
""" (optional) Datastore namespace """

write_concurrency: Optional[PositiveInt] = 40
""" (optional) Amount of threads to use when writing batches of feature rows into Datastore"""

write_batch_size: Optional[PositiveInt] = 50
""" (optional) Amount of feature rows per batch being written into Datastore"""


OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig]

Expand Down

0 comments on commit 69a1d16

Please sign in to comment.