diff --git a/common/src/main/java/feast/common/models/Store.java b/common/src/main/java/feast/common/models/Store.java index 0644d5f46a..1701b0bb3a 100644 --- a/common/src/main/java/feast/common/models/Store.java +++ b/common/src/main/java/feast/common/models/Store.java @@ -17,11 +17,47 @@ package feast.common.models; import feast.proto.core.StoreProto.Store.Subscription; +import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; +import java.util.stream.Collectors; public class Store { + /** + * Accepts a comma-delimited string and converts it to a list of Subscription class objects. + * + * @param subscriptions String formatted Subscriptions, comma delimited. + * @return List of Subscription class objects + */ + public static List parseSubFromStr(String subscriptions) { + List allSubscriptions = + Arrays.stream(subscriptions.split(",")) + .map(subscriptionStr -> convertStringToSubscription(subscriptionStr)) + .collect(Collectors.toList()); + + return allSubscriptions; + } + + /** + * Accepts a comma-delimited string and converts it to a list of Subscription class objects, with + * exclusions filtered out. + * + * @param subscriptions String formatted Subscriptions, comma delimited. + * @return List of Subscription class objects + */ + public static List parseSubFromStrWithoutExclusions(String subscriptions) { + List allSubscriptions = + Arrays.stream(subscriptions.split(",")) + .map(subscriptionStr -> convertStringToSubscription(subscriptionStr)) + .collect(Collectors.toList()); + + allSubscriptions = + allSubscriptions.stream().filter(sub -> !sub.getExclude()).collect(Collectors.toList()); + + return allSubscriptions; + } + /** * Accepts a Subscription class object and returns it in string format * @@ -34,7 +70,8 @@ public static String parseSubscriptionFrom(Subscription subscription) { String.format("Missing arguments in subscription string: %s", subscription.toString())); } - return String.format("%s:%s", subscription.getProject(), subscription.getName()); + return String.format( + "%s:%s:%s", subscription.getProject(), subscription.getName(), subscription.getExclude()); } /** @@ -48,7 +85,15 @@ public static Subscription convertStringToSubscription(String subscription) { return Subscription.newBuilder().build(); } String[] split = subscription.split(":"); - return Subscription.newBuilder().setProject(split[0]).setName(split[1]).build(); + if (split.length == 2) { + // Backward compatibility check + return Subscription.newBuilder().setProject(split[0]).setName(split[1]).build(); + } + return Subscription.newBuilder() + .setProject(split[0]) + .setName(split[1]) + .setExclude(Boolean.parseBoolean(split[2])) + .build(); } /** @@ -62,37 +107,57 @@ public static Subscription convertStringToSubscription(String subscription) { */ public static boolean isSubscribedToFeatureSet( List subscriptions, String projectName, String featureSetName) { + // Case 1: Highest priority check, to exclude all matching subscriptions with excluded flag = + // true for (Subscription sub : subscriptions) { // If configuration missing, fail if (sub.getProject().isEmpty() || sub.getName().isEmpty()) { throw new IllegalArgumentException( String.format("Subscription is missing arguments: %s", sub.toString())); } + // Match feature set name to pattern + Pattern patternName = getNamePattern(sub); + Pattern patternProject = getProjectPattern(sub); + // SubCase: Project name and feature set name matches and excluded flag is true + if (patternProject.matcher(projectName).matches() + && patternName.matcher(featureSetName).matches() + && sub.getExclude()) { + return false; + } + } + // Case 2: Featureset is not excluded, check if it is included in the current subscriptions + // filteredSubscriptions only contain subscriptions with excluded flag = false + List filteredSubscriptions = + subscriptions.stream().filter(sub -> !sub.getExclude()).collect(Collectors.toList()); - // If all wildcards, subscribe to everything - if (sub.getProject().equals("*") || sub.getName().equals("*")) { + for (Subscription filteredSub : filteredSubscriptions) { + // Match feature set name to pattern + Pattern patternName = getNamePattern(filteredSub); + Pattern patternProject = getProjectPattern(filteredSub); + // SubCase: Project name and feature set name matches + if (patternProject.matcher(projectName).matches() + && patternName.matcher(featureSetName).matches()) { return true; } + } + return false; + } - // Match project name - if (!projectName.equals(sub.getProject())) { - continue; - } + private static Pattern getProjectPattern(Subscription subscription) { + String subProject = subscription.getProject(); + if (!subscription.getProject().contains(".*")) { + subProject = subProject.replace("*", ".*"); + } - // Convert wildcard to regex - String subName = sub.getName(); - if (!sub.getName().contains(".*")) { - subName = subName.replace("*", ".*"); - } + return Pattern.compile(subProject); + } - // Match feature set name to pattern - Pattern pattern = Pattern.compile(subName); - if (!pattern.matcher(featureSetName).matches()) { - continue; - } - return true; + private static Pattern getNamePattern(Subscription subscription) { + String subName = subscription.getName(); + if (!subscription.getProject().contains(".*")) { + subName = subName.replace("*", ".*"); } - return false; + return Pattern.compile(subName); } } diff --git a/common/src/test/java/feast/common/models/StoreTest.java b/common/src/test/java/feast/common/models/StoreTest.java new file mode 100644 index 0000000000..1acba12d2b --- /dev/null +++ b/common/src/test/java/feast/common/models/StoreTest.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.common.models; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertTrue; + +import feast.proto.core.StoreProto.Store.Subscription; +import java.util.Arrays; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class StoreTest { + + private List allSubscriptions; + + @Before + public void setUp() { + + Subscription emptySubscription = Subscription.newBuilder().build(); + Subscription subscription1 = Subscription.newBuilder().setProject("*").setName("*").build(); + Subscription subscription2 = + Subscription.newBuilder().setProject("project1").setName("fs_2").build(); + Subscription subscription3 = + Subscription.newBuilder().setProject("project1").setName("fs_1").setExclude(true).build(); + allSubscriptions = + Arrays.asList(emptySubscription, subscription1, subscription2, subscription3); + } + + @Test + public void shouldReturnSubscriptionsBasedOnStr() { + String subscriptions = "project1:fs_1:true,project1:fs_2"; + List actual1 = Store.parseSubFromStr(subscriptions); + List expected1 = Arrays.asList(allSubscriptions.get(2), allSubscriptions.get(3)); + + List actual2 = Store.parseSubFromStrWithoutExclusions(subscriptions); + List expected2 = Arrays.asList(allSubscriptions.get(2)); + + assertTrue(actual1.containsAll(expected1) && expected1.containsAll(actual1)); + assertTrue(actual2.containsAll(expected2) && expected2.containsAll(actual2)); + } + + @Test + public void shouldReturnStringBasedOnSubscription() { + // Case: default exclude should be false + String actual1 = Store.parseSubscriptionFrom(allSubscriptions.get(2)); + Subscription sub1 = allSubscriptions.get(2); + String expected1 = sub1.getProject() + ":" + sub1.getName() + ":" + sub1.getExclude(); + + // Case: explicit setting of exclude to true + String actual2 = Store.parseSubscriptionFrom(allSubscriptions.get(3)); + Subscription sub2 = allSubscriptions.get(3); + String expected2 = sub2.getProject() + ":" + sub2.getName() + ":" + sub2.getExclude(); + + assertThat(actual1, equalTo(expected1)); + assertThat(actual2, equalTo(expected2)); + } + + @Test + public void shouldSubscribeToFeatureSet() { + allSubscriptions = allSubscriptions.subList(2, 4); + // Case: excluded flag = true + boolean actual1 = Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_1"); + boolean expected1 = false; + + // Case: excluded flag = false + boolean actual2 = Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_2"); + boolean expected2 = true; + + // Case: featureset does not exist + boolean actual3 = + Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_nonexistent"); + boolean expected3 = false; + + assertThat(actual1, equalTo(expected1)); + assertThat(actual2, equalTo(expected2)); + assertThat(actual3, equalTo(expected3)); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java b/ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java index 0ba5d56369..9899e9e4e3 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/FeatureRowToStoreAllocatorTest.java @@ -19,7 +19,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import feast.proto.core.StoreProto; +import feast.proto.core.StoreProto.Store.Subscription; import feast.proto.types.FeatureRowProto; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.testing.PAssert; @@ -41,6 +44,10 @@ private StoreProto.Store newStore(String s) { .build(); } + private StoreProto.Store newStore(List subscriptionList) { + return StoreProto.Store.newBuilder().addAllSubscriptions(subscriptionList).build(); + } + @Test public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() { StoreProto.Store bqOnlyStore = newStore("bq*"); @@ -96,4 +103,55 @@ public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() { p.run(); } + + @Test + public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscriptionBlacklist() { + Subscription subscription1 = Subscription.newBuilder().setProject("*").setName("*").build(); + Subscription subscription2 = + Subscription.newBuilder().setProject("project1").setName("fs_2").build(); + Subscription subscription3 = + Subscription.newBuilder().setProject("project1").setName("fs_1").setExclude(true).build(); + Subscription subscription4 = + Subscription.newBuilder().setProject("project2").setName("*").setExclude(true).build(); + + List testStoreSubscriptions1 = + Arrays.asList(subscription1, subscription2, subscription3); + StoreProto.Store testStore1 = newStore(testStoreSubscriptions1); + + List testStoreSubscriptions2 = Arrays.asList(subscription1, subscription4); + StoreProto.Store testStore2 = newStore(testStoreSubscriptions2); + + Map> storeTags = + ImmutableMap.of( + testStore1, new TupleTag<>(), + testStore2, new TupleTag<>()); + + PCollectionTuple allocatedRows = + p.apply( + Create.of( + FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project1/fs_1").build(), + FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project2/fs_1").build(), + FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project2/fs_2").build())) + .apply( + FeatureRowToStoreAllocator.newBuilder() + .setStoreTags(storeTags) + .setStores(ImmutableList.of(testStore1, testStore2)) + .build()); + + PAssert.that( + allocatedRows + .get(storeTags.get(testStore1)) + .setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class)) + .apply("CountStore1", Count.globally())) + .containsInAnyOrder(2L); + + PAssert.that( + allocatedRows + .get(storeTags.get(testStore2)) + .setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class)) + .apply("CountStore2", Count.globally())) + .containsInAnyOrder(1L); + + p.run(); + } } diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index be9750cfde..2ad90c61e7 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -149,6 +149,9 @@ message Store { // - my-feature-set-6 can be used to select a single feature set string name = 1; + // All matches with exclude enabled will be filtered out instead of added + bool exclude = 4; + // Feature set version was removed in v0.5.0. reserved 2; } diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index ed5458d974..be87d515cb 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -195,18 +195,20 @@ private Map getFeatureSetMap() { for (Subscription subscription : this.store.getSubscriptionsList()) { try { - ListFeatureSetsResponse featureSetsResponse = - coreService.listFeatureSets( - ListFeatureSetsRequest.newBuilder() - .setFilter( - ListFeatureSetsRequest.Filter.newBuilder() - .setProject(subscription.getProject()) - .setFeatureSetName(subscription.getName())) - .build()); + if (!subscription.getExclude()) { + ListFeatureSetsResponse featureSetsResponse = + coreService.listFeatureSets( + ListFeatureSetsRequest.newBuilder() + .setFilter( + ListFeatureSetsRequest.Filter.newBuilder() + .setProject(subscription.getProject()) + .setFeatureSetName(subscription.getName())) + .build()); - for (FeatureSet featureSet : featureSetsResponse.getFeatureSetsList()) { - FeatureSetSpec spec = featureSet.getSpec(); - featureSets.put(getFeatureSetStringRef(spec), spec); + for (FeatureSet featureSet : featureSetsResponse.getFeatureSetsList()) { + FeatureSetSpec spec = featureSet.getSpec(); + featureSets.put(getFeatureSetStringRef(spec), spec); + } } } catch (StatusRuntimeException e) { throw new RuntimeException(