Skip to content

Commit

Permalink
Add SqliteTable as an InfraObject (#2157)
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 authored Dec 17, 2021
1 parent 97fbd3e commit c5caeeb
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
4 changes: 3 additions & 1 deletion protos/feast/core/InfraObject.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ option java_package = "feast.proto.core";
option java_outer_classname = "InfraObjectProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "feast/core/DynamoDBTable.proto";
import "feast/core/DatastoreTable.proto";
import "feast/core/DynamoDBTable.proto";
import "feast/core/SqliteTable.proto";

// Represents a set of infrastructure objects managed by Feast
message Infra {
Expand All @@ -39,6 +40,7 @@ message InfraObject {
oneof infra_object {
DynamoDBTable dynamodb_table = 2;
DatastoreTable datastore_table = 3;
SqliteTable sqlite_table = 4;
CustomInfra custom_infra = 100;
}

Expand Down
31 changes: 31 additions & 0 deletions protos/feast/core/SqliteTable.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// * Copyright 2021 The Feast Authors
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * https://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
//

syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "SqliteTableProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

// Represents a Sqlite table
message SqliteTable {
// Absolute path of the table
string path = 1;

// Name of the table
string name = 2;
}
66 changes: 62 additions & 4 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@

from feast import Entity
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand All @@ -48,6 +51,9 @@ class SqliteOnlineStore(OnlineStore):
"""
OnlineStore is an object used for all interaction between Feast and the service used for offline storage of
features.
Attributes:
_conn: SQLite connection.
"""

_conn: Optional[sqlite3.Connection] = None
Expand All @@ -68,10 +74,7 @@ def _get_db_path(config: RepoConfig) -> str:
def _get_conn(self, config: RepoConfig):
if not self._conn:
db_path = self._get_db_path(config)
Path(db_path).parent.mkdir(exist_ok=True)
self._conn = sqlite3.connect(
db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
)
self._conn = _initialize_conn(db_path)
return self._conn

@log_exceptions_and_usage(online_store="sqlite")
Expand Down Expand Up @@ -208,6 +211,13 @@ def teardown(
pass


def _initialize_conn(db_path: str):
Path(db_path).parent.mkdir(exist_ok=True)
return sqlite3.connect(
db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
)


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"

Expand All @@ -217,3 +227,51 @@ def _to_naive_utc(ts: datetime):
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


class SqliteTable(InfraObject):
"""
A Sqlite table managed by Feast.
Attributes:
path: The absolute path of the Sqlite file.
name: The name of the table.
conn: SQLite connection.
"""

path: str
name: str
conn: sqlite3.Connection

def __init__(self, path: str, name: str):
self.path = path
self.name = name
self.conn = _initialize_conn(path)

def to_proto(self) -> InfraObjectProto:
sqlite_table_proto = SqliteTableProto()
sqlite_table_proto.path = self.path
sqlite_table_proto.name = self.name

return InfraObjectProto(
infra_object_class_type="feast.infra.online_store.sqlite.SqliteTable",
sqlite_table=sqlite_table_proto,
)

@staticmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
return SqliteTable(
path=infra_object_proto.sqlite_table.path,
name=infra_object_proto.sqlite_table.name,
)

def update(self):
self.conn.execute(
f"CREATE TABLE IF NOT EXISTS {self.name} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
)
self.conn.execute(
f"CREATE INDEX IF NOT EXISTS {self.name}_ek ON {self.name} (entity_key);"
)

def teardown(self):
self.conn.execute(f"DROP TABLE IF EXISTS {self.name}")

0 comments on commit c5caeeb

Please sign in to comment.