Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamodb infra object #2131

Merged
merged 2 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions protos/feast/core/DynamoDBTable.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// * Copyright 2021 The Feast Authors
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * https://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
//

syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "DynamoDBTableProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

// Represents a DynamoDB table
message DynamoDBTable {
// Name of the table
string name = 1;

// Region of the table
string region = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "InfraObjectsProto";
option java_outer_classname = "InfraObjectProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "feast/core/DynamoDBTable.proto";

// Represents a set of infrastructure objects managed by Feast
message Infra {
// List of infrastructure objects managed by Feast
Expand All @@ -34,6 +36,7 @@ message InfraObject {

// The infrastructure object
oneof infra_object {
DynamoDBTable dynamodb_table = 2;
CustomInfra custom_infra = 100;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from typing import Any, List

from feast.importer import get_class_from_type
from feast.protos.feast.core.InfraObjects_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObjects_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto


class InfraObject(ABC):
Expand Down
129 changes: 93 additions & 36 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
from pydantic.typing import Literal

from feast import Entity, FeatureTable, FeatureView, utils
from feast.infra.infra_object import InfraObject
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.DynamoDBTable_pb2 import (
DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -65,7 +70,7 @@ def update(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client, dynamodb_resource = self._initialize_dynamodb(online_config)
dynamodb_client, dynamodb_resource = _initialize_dynamodb(online_config.region)

for table_instance in tables_to_keep:
try:
Expand All @@ -89,7 +94,8 @@ def update(
TableName=f"{config.project}.{table_instance.name}"
)

self._delete_tables_idempotent(dynamodb_resource, config, tables_to_delete)
for table_to_delete in tables_to_delete:
_delete_table_idempotent(dynamodb_resource, table_to_delete.name)

def teardown(
self,
Expand All @@ -99,9 +105,10 @@ def teardown(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = self._initialize_dynamodb(online_config)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)

self._delete_tables_idempotent(dynamodb_resource, config, tables)
for table in tables:
_delete_table_idempotent(dynamodb_resource, table.name)

@log_exceptions_and_usage(online_store="dynamodb")
def online_write_batch(
Expand All @@ -115,7 +122,7 @@ def online_write_batch(
) -> None:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = self._initialize_dynamodb(online_config)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)

table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}")
with table_instance.batch_writer() as batch:
Expand Down Expand Up @@ -144,7 +151,7 @@ def online_read(
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = self._initialize_dynamodb(online_config)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
Expand All @@ -165,35 +172,85 @@ def online_read(
result.append((None, None))
return result

def _initialize_dynamodb(self, online_config: DynamoDBOnlineStoreConfig):
return (
boto3.client("dynamodb", region_name=online_config.region),
boto3.resource("dynamodb", region_name=online_config.region),

def _initialize_dynamodb(region: str):
return (
boto3.client("dynamodb", region_name=region),
boto3.resource("dynamodb", region_name=region),
)


def _delete_table_idempotent(
dynamodb_resource, table_name: str,
):
try:
table = dynamodb_resource.Table(table_name)
table.delete()
logger.info(f"Dynamo table {table_name} was deleted")
except ClientError as ce:
# If the table deletion fails with ResourceNotFoundException,
# it means the table has already been deleted.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceNotFoundException":
raise
else:
logger.warning(f"Trying to delete table that doesn't exist: {table_name}")


class DynamoDBTable(InfraObject):
"""
A DynamoDB table managed by Feast.

Attributes:
name: The name of the table.
region: The region of the table.
"""

name: str
region: str

def __init__(self, name: str, region: str):
self.name = name
self.region = region

def to_proto(self) -> InfraObjectProto:
dynamodb_table_proto = DynamoDBTableProto()
dynamodb_table_proto.name = self.name
dynamodb_table_proto.region = self.region

return InfraObjectProto(
infra_object_class_type="feast.infra.online_stores.dynamodb.DynamoDBTable",
dynamodb_table=dynamodb_table_proto,
)

def _delete_tables_idempotent(
self,
dynamodb_resource,
config: RepoConfig,
tables: Sequence[Union[FeatureTable, FeatureView]],
):
for table_instance in tables:
try:
table = dynamodb_resource.Table(
f"{config.project}.{table_instance.name}"
)
table.delete()
logger.info(
f"Dynamo table {config.project}.{table_instance.name} was deleted"
)
except ClientError as ce:
# If the table deletion fails with ResourceNotFoundException,
# it means the table has already been deleted.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceNotFoundException":
raise
else:
logger.warning(
f"Trying to delete table that doesn't exist:"
f" {config.project}.{table_instance.name}"
)
@staticmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
return DynamoDBTable(
name=infra_object_proto.dynamodb_table.name,
region=infra_object_proto.dynamodb_table.region,
)

def update(self):
dynamodb_client, dynamodb_resource = _initialize_dynamodb(self.region)

try:
dynamodb_resource.create_table(
TableName=f"{self.name}",
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "entity_id", "AttributeType": "S"}
],
BillingMode="PAY_PER_REQUEST",
)
except ClientError as ce:
# If the table creation fails with ResourceInUseException,
# it means the table already exists or is being created.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceInUseException":
raise

dynamodb_client.get_waiter("table_exists").wait(TableName=f"{self.name}")

def teardown(self):
_, dynamodb_resource = _initialize_dynamodb(self.region)
_delete_table_idempotent(dynamodb_resource, self.name)