Skip to content

Commit

Permalink
adding current progress
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed May 16, 2024
1 parent ed0df05 commit fa18a2c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 41 deletions.
12 changes: 12 additions & 0 deletions docs/reference/alpha-vector-database.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,16 @@ def print_online_features(features):
print(key, " : ", value)

print_online_features(features)
```

### Configuration
We offer two Online Store options for Vector Databases. PGVector and SQLite.

#### Installation with SQLite
If you are using `pyenv` to manage your Python versions, you can install the SQLite extension with the following command:
```bash
PYTHON_CONFIGURE_OPTS="--enable-loadable-sqlite-extensions" \
LDFLAGS="-L/opt/homebrew/opt/sqlite/lib" \
CPPFLAGS="-I/opt/homebrew/opt/sqlite/include" \
pyenv install
```
32 changes: 10 additions & 22 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
# limitations under the License.
import itertools
import os
import json
import sqlite3
import sqlite_vss
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

import sqlite_vss
from pydantic import StrictStr

from feast import Entity
Expand Down Expand Up @@ -110,9 +111,9 @@ def online_write_batch(
created_ts = to_naive_utc(created_ts)

for feature_name, val in values.items():
vector_val = None
if config.online_store.vss_enabled:
vector_val = self._get_list_val_str(val)
print('using vector search')
vector_val = json.dumps(val)
conn.execute(
f"""
UPDATE {_table_id(project, table)}
Expand All @@ -121,7 +122,7 @@ def online_write_batch(
""",
(
# SET
val.SerializeToString(),
str(val),
vector_val,
timestamp,
created_ts,
Expand All @@ -138,13 +139,14 @@ def online_write_batch(
(
entity_key_bin,
feature_name,
val.SerializeToString(),
str(val),
vector_val,
timestamp,
created_ts,
),
)
else:
print('not using vector search')
conn.execute(
f"""
UPDATE {_table_id(project, table)}
Expand Down Expand Up @@ -243,8 +245,8 @@ def update(
project = config.project

for table in tables_to_keep:
conn.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
self.conn.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
)
conn.execute(
f"CREATE INDEX IF NOT EXISTS {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);"
Expand Down Expand Up @@ -282,20 +284,6 @@ def teardown(
except FileNotFoundError:
pass

def _get_list_val_str(self, val: ValueProto) -> str:
if val.HasField("string_list_val"):
return ",".join(val.string_list_val.val)
elif val.HasField("bytes_list_val"):
return ",".join(map(str, val.bytes_list_val.val))
elif val.HasField("int64_list_val"):
return ",".join(map(str, val.int64_list_val.val))
elif val.HasField("float_list_val"):
return ",".join(map(str, val.float_list_val.val))
elif val.HasField("double_list_val"):
return ",".join(map(str, val.double_list_val.val))
else:
raise ValueError("Unsupported list value type")

def retrieve_online_documents(
self,
config: RepoConfig,
Expand Down Expand Up @@ -434,7 +422,7 @@ def from_proto(sqlite_table_proto: SqliteTableProto) -> Any:

def update(self):
self.conn.execute(
f"CREATE TABLE IF NOT EXISTS {self.name} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
f"CREATE TABLE IF NOT EXISTS {self.name} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
)
self.conn.execute(
f"CREATE INDEX IF NOT EXISTS {self.name}_ek ON {self.name} (entity_key);"
Expand Down
32 changes: 13 additions & 19 deletions sdk/python/tests/unit/online_store/test_online_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ def test_get_online_documents() -> None:
with runner.local_repo(
get_example_repo("example_feature_repo_1.py"), "file"
) as store:
store.config.online_store.vss_enabled = True
# Write some data to two tables
document_embeddings_fv = store.get_feature_view(name="document_embeddings")

Expand All @@ -434,15 +435,11 @@ def test_get_online_documents() -> None:
item_key = EntityKeyProto(
join_keys=["item_id"], entity_values=[ValueProto(int64_val=0)]
)
provider.online_write_batch(
config=store.config,
table=document_embeddings_fv,
data=[
data = [
(
item_key,
{
"Embeddings": [
np.array(
[
0.17517076,
-0.1259909,
Expand All @@ -454,9 +451,7 @@ def test_get_online_documents() -> None:
0.01173803,
-0.0573408,
0.02616226,
]
),
np.array(
],
[
0.18517076,
-0.1259909,
Expand All @@ -468,9 +463,7 @@ def test_get_online_documents() -> None:
0.01173803,
-0.0573408,
0.02616226,
]
),
np.array(
],
[
0.19517076,
-0.1259909,
Expand All @@ -482,9 +475,7 @@ def test_get_online_documents() -> None:
0.01173803,
-0.0573408,
0.02616226,
]
),
np.array(
],
[
0.20517076,
-0.1259909,
Expand All @@ -497,17 +488,20 @@ def test_get_online_documents() -> None:
-0.0573408,
0.02616226,
]
),
]
],
},
datetime.utcnow(),
datetime.utcnow(),
)
],
]
provider.online_write_batch(
config=store.config,
table=document_embeddings_fv,
data=data,
progress=None,
)

query = np.array(
query_embedding = np.array(
[
0.17517076,
-0.1259909,
Expand All @@ -523,7 +517,7 @@ def test_get_online_documents() -> None:
)
# Retrieve two features using two keys, one valid one non-existing
result = store.retrieve_online_documents(
feature="document_embeddings:Embeddings", query=query, top_k=3
feature="document_embeddings:Embeddings", query=query_embedding, top_k=3
).to_dict()

assert "Embeddings" in result
Expand Down

0 comments on commit fa18a2c

Please sign in to comment.