From e3b34fa1e8f89e1de5d4395e50d0b362bf8946cd Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Tue, 19 Apr 2022 14:25:00 -0400 Subject: [PATCH] remove sort_keys from dynamo since they must be sorted. Add better test for different unknowns Signed-off-by: Danny Chiao --- .../feast/infra/online_stores/dynamodb.py | 20 +++++----- .../feast/infra/online_stores/online_store.py | 6 +-- .../test_dynamodb_online_store.py | 40 ++++++++++--------- 3 files changed, 34 insertions(+), 32 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index cb72b43acd..406bee525f 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -59,9 +59,6 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): region: StrictStr """AWS Region Name""" - sort_response: bool = True - """Whether or not to sort BatchGetItem response.""" - table_name_template: StrictStr = "{project}.{table_name}" """DynamoDB table name template""" @@ -204,9 +201,6 @@ def online_read( """ Retrieve feature values from the online DynamoDB store. - Note: This method is currently not optimized to retrieve a lot of data at a time - as it does sequential gets from the DynamoDB table. - Args: config: The RepoConfig for the current FeatureStore. table: Feast FeatureView. @@ -224,7 +218,6 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] entity_ids = [compute_entity_id(entity_key) for entity_key in entity_keys] batch_size = online_config.batch_size - sort_response = online_config.sort_response entity_ids_iter = iter(entity_ids) while True: batch = list(itertools.islice(entity_ids_iter, batch_size)) @@ -243,17 +236,22 @@ def online_read( response = response.get("Responses") table_responses = response.get(table_instance.name) if table_responses: - if sort_response: - table_responses = self._sort_dynamodb_response( - table_responses, entity_ids - ) + table_responses = self._sort_dynamodb_response( + table_responses, entity_ids + ) + entity_idx = 0 for tbl_res in table_responses: + entity_id = tbl_res["entity_id"] + while entity_id != batch[entity_idx]: + result.append((None, None)) + entity_idx += 1 res = {} for feature_name, value_bin in tbl_res["values"].items(): val = ValueProto() val.ParseFromString(value_bin.value) res[feature_name] = val result.append((datetime.fromisoformat(tbl_res["event_ts"]), res)) + entity_idx += 1 # Not all entities in a batch may have responses # Pad with remaining values in batch that were not found diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 1f177996de..04c6a065fb 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -76,9 +76,9 @@ def online_read( entity_keys: a list of entity keys that should be read from the FeatureStore. requested_features: (Optional) A subset of the features that should be read from the FeatureStore. Returns: - Data is returned as a list, one item per entity key. Each item in the list is a tuple - of event_ts for the row, and the feature data as a dict from feature names to values. - Values are returned as Value proto message. + Data is returned as a list, one item per entity key in the original order as the entity_keys argument. + Each item in the list is a tuple of event_ts for the row, and the feature data as a dict from feature names + to values. Values are returned as Value proto message. """ ... diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index d558096b63..e1be890e57 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -51,7 +51,6 @@ def test_online_store_config_default(): assert dynamodb_store_config.batch_size == 40 assert dynamodb_store_config.endpoint_url is None assert dynamodb_store_config.region == aws_region - assert dynamodb_store_config.sort_response is True assert dynamodb_store_config.table_name_template == "{project}.{table_name}" @@ -72,20 +71,17 @@ def test_online_store_config_custom_params(): aws_region = "us-west-2" batch_size = 20 endpoint_url = "http://localhost:8000" - sort_response = False table_name_template = "feast_test.dynamodb_table" dynamodb_store_config = DynamoDBOnlineStoreConfig( region=aws_region, batch_size=batch_size, endpoint_url=endpoint_url, - sort_response=sort_response, table_name_template=table_name_template, ) assert dynamodb_store_config.type == "dynamodb" assert dynamodb_store_config.batch_size == batch_size assert dynamodb_store_config.endpoint_url == endpoint_url assert dynamodb_store_config.region == aws_region - assert dynamodb_store_config.sort_response == sort_response assert dynamodb_store_config.table_name_template == table_name_template @@ -186,23 +182,31 @@ def test_online_read_unknown_entity(repo_config): _insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) entity_keys, features, *rest = zip(*data) - # Append a nonsensical value + # Append a nonsensical entity to search for entity_keys = list(entity_keys) - entity_keys.append( - EntityKeyProto( - join_keys=["customer"], entity_values=[ValueProto(string_val="12359")] - ) - ) features = list(features) - features.append(None) dynamodb_store = DynamoDBOnlineStore() - returned_items = dynamodb_store.online_read( - config=repo_config, - table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"), - entity_keys=entity_keys, - ) - assert len(returned_items) == len(entity_keys) - assert [item[1] for item in returned_items] == list(features) + + # Have the unknown entity be in the beginning, middle, and end of the list of entities. + for pos in range(len(entity_keys)): + entity_keys_with_unknown = deepcopy(entity_keys) + entity_keys_with_unknown.insert( + pos, + EntityKeyProto( + join_keys=["customer"], entity_values=[ValueProto(string_val="12359")] + ), + ) + features_with_none = deepcopy(features) + features_with_none.insert(pos, None) + returned_items = dynamodb_store.online_read( + config=repo_config, + table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"), + entity_keys=entity_keys_with_unknown, + ) + assert len(returned_items) == len(entity_keys_with_unknown) + assert [item[1] for item in returned_items] == list(features_with_none) + # The order should match the original entity key order + assert returned_items[pos] == (None, None) @mock_dynamodb2