Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscriptions blacklist functionality #813

Merged
merged 3 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 85 additions & 20 deletions common/src/main/java/feast/common/models/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,47 @@
package feast.common.models;

import feast.proto.core.StoreProto.Store.Subscription;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class Store {

/**
* Accepts a comma-delimited string and converts it to a list of Subscription class objects.
*
* @param subscriptions String formatted Subscriptions, comma delimited.
* @return List of Subscription class objects
*/
public static List<Subscription> parseSubFromStr(String subscriptions) {
List<Subscription> allSubscriptions =
Arrays.stream(subscriptions.split(","))
.map(subscriptionStr -> convertStringToSubscription(subscriptionStr))
.collect(Collectors.toList());

return allSubscriptions;
}

/**
* Accepts a comma-delimited string and converts it to a list of Subscription class objects, with
* exclusions filtered out.
*
* @param subscriptions String formatted Subscriptions, comma delimited.
* @return List of Subscription class objects
*/
public static List<Subscription> parseSubFromStrWithoutExclusions(String subscriptions) {
List<Subscription> allSubscriptions =
Arrays.stream(subscriptions.split(","))
.map(subscriptionStr -> convertStringToSubscription(subscriptionStr))
.collect(Collectors.toList());

allSubscriptions =
allSubscriptions.stream().filter(sub -> !sub.getExclude()).collect(Collectors.toList());

return allSubscriptions;
}

/**
* Accepts a Subscription class object and returns it in string format
*
Expand All @@ -34,7 +70,8 @@ public static String parseSubscriptionFrom(Subscription subscription) {
String.format("Missing arguments in subscription string: %s", subscription.toString()));
}

return String.format("%s:%s", subscription.getProject(), subscription.getName());
return String.format(
"%s:%s:%s", subscription.getProject(), subscription.getName(), subscription.getExclude());
}

/**
Expand All @@ -48,7 +85,15 @@ public static Subscription convertStringToSubscription(String subscription) {
return Subscription.newBuilder().build();
}
String[] split = subscription.split(":");
return Subscription.newBuilder().setProject(split[0]).setName(split[1]).build();
if (split.length == 2) {
// Backward compatibility check
return Subscription.newBuilder().setProject(split[0]).setName(split[1]).build();
}
return Subscription.newBuilder()
.setProject(split[0])
.setName(split[1])
.setExclude(Boolean.parseBoolean(split[2]))
.build();
}

/**
Expand All @@ -62,37 +107,57 @@ public static Subscription convertStringToSubscription(String subscription) {
*/
public static boolean isSubscribedToFeatureSet(
List<Subscription> subscriptions, String projectName, String featureSetName) {
// Case 1: Highest priority check, to exclude all matching subscriptions with excluded flag =
// true
for (Subscription sub : subscriptions) {
// If configuration missing, fail
if (sub.getProject().isEmpty() || sub.getName().isEmpty()) {
throw new IllegalArgumentException(
String.format("Subscription is missing arguments: %s", sub.toString()));
}
// Match feature set name to pattern
Pattern patternName = getNamePattern(sub);
Pattern patternProject = getProjectPattern(sub);
// SubCase: Project name and feature set name matches and excluded flag is true
if (patternProject.matcher(projectName).matches()
&& patternName.matcher(featureSetName).matches()
&& sub.getExclude()) {
return false;
}
}
// Case 2: Featureset is not excluded, check if it is included in the current subscriptions
// filteredSubscriptions only contain subscriptions with excluded flag = false
List<Subscription> filteredSubscriptions =
subscriptions.stream().filter(sub -> !sub.getExclude()).collect(Collectors.toList());

// If all wildcards, subscribe to everything
if (sub.getProject().equals("*") || sub.getName().equals("*")) {
for (Subscription filteredSub : filteredSubscriptions) {
// Match feature set name to pattern
Pattern patternName = getNamePattern(filteredSub);
Pattern patternProject = getProjectPattern(filteredSub);
// SubCase: Project name and feature set name matches
if (patternProject.matcher(projectName).matches()
&& patternName.matcher(featureSetName).matches()) {
return true;
}
}
return false;
}

// Match project name
if (!projectName.equals(sub.getProject())) {
continue;
}
private static Pattern getProjectPattern(Subscription subscription) {
String subProject = subscription.getProject();
if (!subscription.getProject().contains(".*")) {
subProject = subProject.replace("*", ".*");
}

// Convert wildcard to regex
String subName = sub.getName();
if (!sub.getName().contains(".*")) {
subName = subName.replace("*", ".*");
}
return Pattern.compile(subProject);
}

// Match feature set name to pattern
Pattern pattern = Pattern.compile(subName);
if (!pattern.matcher(featureSetName).matches()) {
continue;
}
return true;
private static Pattern getNamePattern(Subscription subscription) {
String subName = subscription.getName();
if (!subscription.getProject().contains(".*")) {
subName = subName.replace("*", ".*");
}

return false;
return Pattern.compile(subName);
}
}
95 changes: 95 additions & 0 deletions common/src/test/java/feast/common/models/StoreTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.common.models;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertTrue;

import feast.proto.core.StoreProto.Store.Subscription;
import java.util.Arrays;
import java.util.List;
import org.junit.Before;
import org.junit.Test;

public class StoreTest {

private List<Subscription> allSubscriptions;

@Before
public void setUp() {

Subscription emptySubscription = Subscription.newBuilder().build();
Subscription subscription1 = Subscription.newBuilder().setProject("*").setName("*").build();
Subscription subscription2 =
Subscription.newBuilder().setProject("project1").setName("fs_2").build();
Subscription subscription3 =
Subscription.newBuilder().setProject("project1").setName("fs_1").setExclude(true).build();
allSubscriptions =
Arrays.asList(emptySubscription, subscription1, subscription2, subscription3);
}

@Test
public void shouldReturnSubscriptionsBasedOnStr() {
String subscriptions = "project1:fs_1:true,project1:fs_2";
List<Subscription> actual1 = Store.parseSubFromStr(subscriptions);
List<Subscription> expected1 = Arrays.asList(allSubscriptions.get(2), allSubscriptions.get(3));

List<Subscription> actual2 = Store.parseSubFromStrWithoutExclusions(subscriptions);
List<Subscription> expected2 = Arrays.asList(allSubscriptions.get(2));

assertTrue(actual1.containsAll(expected1) && expected1.containsAll(actual1));
assertTrue(actual2.containsAll(expected2) && expected2.containsAll(actual2));
}

@Test
public void shouldReturnStringBasedOnSubscription() {
// Case: default exclude should be false
String actual1 = Store.parseSubscriptionFrom(allSubscriptions.get(2));
Subscription sub1 = allSubscriptions.get(2);
String expected1 = sub1.getProject() + ":" + sub1.getName() + ":" + sub1.getExclude();

// Case: explicit setting of exclude to true
String actual2 = Store.parseSubscriptionFrom(allSubscriptions.get(3));
Subscription sub2 = allSubscriptions.get(3);
String expected2 = sub2.getProject() + ":" + sub2.getName() + ":" + sub2.getExclude();

assertThat(actual1, equalTo(expected1));
assertThat(actual2, equalTo(expected2));
}

@Test
public void shouldSubscribeToFeatureSet() {
allSubscriptions = allSubscriptions.subList(2, 4);
// Case: excluded flag = true
boolean actual1 = Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_1");
boolean expected1 = false;

// Case: excluded flag = false
boolean actual2 = Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_2");
boolean expected2 = true;

// Case: featureset does not exist
boolean actual3 =
Store.isSubscribedToFeatureSet(allSubscriptions, "project1", "fs_nonexistent");
boolean expected3 = false;

assertThat(actual1, equalTo(expected1));
assertThat(actual2, equalTo(expected2));
assertThat(actual3, equalTo(expected3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import feast.proto.core.StoreProto;
import feast.proto.core.StoreProto.Store.Subscription;
import feast.proto.types.FeatureRowProto;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.testing.PAssert;
Expand All @@ -41,6 +44,10 @@ private StoreProto.Store newStore(String s) {
.build();
}

private StoreProto.Store newStore(List<StoreProto.Store.Subscription> subscriptionList) {
return StoreProto.Store.newBuilder().addAllSubscriptions(subscriptionList).build();
}

@Test
public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() {
StoreProto.Store bqOnlyStore = newStore("bq*");
Expand Down Expand Up @@ -96,4 +103,55 @@ public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() {

p.run();
}

@Test
public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscriptionBlacklist() {
Subscription subscription1 = Subscription.newBuilder().setProject("*").setName("*").build();
Subscription subscription2 =
Subscription.newBuilder().setProject("project1").setName("fs_2").build();
Subscription subscription3 =
Subscription.newBuilder().setProject("project1").setName("fs_1").setExclude(true).build();
Subscription subscription4 =
Subscription.newBuilder().setProject("project2").setName("*").setExclude(true).build();

List<Subscription> testStoreSubscriptions1 =
Arrays.asList(subscription1, subscription2, subscription3);
StoreProto.Store testStore1 = newStore(testStoreSubscriptions1);

List<Subscription> testStoreSubscriptions2 = Arrays.asList(subscription1, subscription4);
StoreProto.Store testStore2 = newStore(testStoreSubscriptions2);

Map<StoreProto.Store, TupleTag<FeatureRowProto.FeatureRow>> storeTags =
ImmutableMap.of(
testStore1, new TupleTag<>(),
testStore2, new TupleTag<>());

PCollectionTuple allocatedRows =
p.apply(
Create.of(
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project1/fs_1").build(),
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project2/fs_1").build(),
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project2/fs_2").build()))
.apply(
FeatureRowToStoreAllocator.newBuilder()
.setStoreTags(storeTags)
.setStores(ImmutableList.of(testStore1, testStore2))
.build());

PAssert.that(
allocatedRows
.get(storeTags.get(testStore1))
.setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class))
.apply("CountStore1", Count.globally()))
.containsInAnyOrder(2L);

PAssert.that(
allocatedRows
.get(storeTags.get(testStore2))
.setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class))
.apply("CountStore2", Count.globally()))
.containsInAnyOrder(1L);

p.run();
}
}
3 changes: 3 additions & 0 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message Store {
// - my-feature-set-6 can be used to select a single feature set
string name = 1;

// All matches with exclude enabled will be filtered out instead of added
bool exclude = 4;

// Feature set version was removed in v0.5.0.
reserved 2;
}
Expand Down
24 changes: 13 additions & 11 deletions serving/src/main/java/feast/serving/specs/CachedSpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,20 @@ private Map<String, FeatureSetSpec> getFeatureSetMap() {

for (Subscription subscription : this.store.getSubscriptionsList()) {
try {
ListFeatureSetsResponse featureSetsResponse =
coreService.listFeatureSets(
ListFeatureSetsRequest.newBuilder()
.setFilter(
ListFeatureSetsRequest.Filter.newBuilder()
.setProject(subscription.getProject())
.setFeatureSetName(subscription.getName()))
.build());
if (!subscription.getExclude()) {
ListFeatureSetsResponse featureSetsResponse =
coreService.listFeatureSets(
ListFeatureSetsRequest.newBuilder()
.setFilter(
ListFeatureSetsRequest.Filter.newBuilder()
.setProject(subscription.getProject())
.setFeatureSetName(subscription.getName()))
.build());

for (FeatureSet featureSet : featureSetsResponse.getFeatureSetsList()) {
FeatureSetSpec spec = featureSet.getSpec();
featureSets.put(getFeatureSetStringRef(spec), spec);
for (FeatureSet featureSet : featureSetsResponse.getFeatureSetsList()) {
FeatureSetSpec spec = featureSet.getSpec();
featureSets.put(getFeatureSetStringRef(spec), spec);
}
}
} catch (StatusRuntimeException e) {
throw new RuntimeException(
Expand Down