Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for EventPolicy filters #4086

Merged
Merged
4 changes: 4 additions & 0 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie
}
}

for _, filter := range policy.Spec.Filters {
contractPolicy.Filters = append(contractPolicy.Filters, contract.FromSubscriptionFilter(filter))
}

eventPolicies = append(eventPolicies, contractPolicy)
}

Expand Down
76 changes: 76 additions & 0 deletions control-plane/pkg/core/config/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

"google.golang.org/protobuf/encoding/protojson"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
Expand Down Expand Up @@ -618,6 +620,80 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
},
},
},
}, {
name: "Multiple policies with filters",
applyingPolicies: []string{
"policy-1",
"policy-2",
},
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
{
ObjectMeta: metav1.ObjectMeta{
Name: "policy-1",
Namespace: "my-ns",
},
Spec: eventingv1alpha1.EventPolicySpec{
Filters: []eventingv1.SubscriptionsAPIFilter{
{
CESQL: "true",
},
},
},
Status: eventingv1alpha1.EventPolicyStatus{
From: []string{
"from-1",
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "policy-2",
Namespace: "my-ns",
},
Spec: eventingv1alpha1.EventPolicySpec{
Filters: []eventingv1.SubscriptionsAPIFilter{
{
CESQL: "false",
},
},
},
Status: eventingv1alpha1.EventPolicyStatus{
From: []string{
"from-2-*",
},
},
},
},
namespace: "my-ns",
defaultAuthorizationMode: feature.AuthorizationDenyAll,
expected: []*contract.EventPolicy{
{
TokenMatchers: []*contract.TokenMatcher{
exactTokenMatcher("from-1"),
},
Filters: []*contract.DialectedFilter{
{
Filter: &contract.DialectedFilter_Cesql{
Cesql: &contract.CESQL{
Expression: "true",
},
},
},
},
}, {
TokenMatchers: []*contract.TokenMatcher{
prefixTokenMatcher("from-2-"),
},
Filters: []*contract.DialectedFilter{
{
Filter: &contract.DialectedFilter_Cesql{
Cesql: &contract.CESQL{
Expression: "false",
},
},
},
},
},
},
}, {
name: "No applying policies - allow-same-namespace default mode",
applyingPolicies: []string{},
Expand Down
6 changes: 6 additions & 0 deletions data-plane/benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
<artifactId>dispatcher</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.AllFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.AllFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.*;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.AnyFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.v1.CloudEventV1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import io.cloudevents.CloudEvent;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.NotFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.NotFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventV1;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventV1;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventV1;
import java.util.Map;
Expand Down
4 changes: 4 additions & 0 deletions data-plane/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@
<artifactId>vertx-opentelemetry</artifactId>
</dependency>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-sql</artifactId>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import static java.time.format.DateTimeFormatter.ISO_INSTANT;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v03.CloudEventV03;
import io.cloudevents.core.v1.CloudEventV1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.*;
import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* This interface provides an abstraction for filtering {@link CloudEvent} instances.
*/
@FunctionalInterface
public interface Filter extends Predicate<CloudEvent> {

/**
* @return noop implementation that always returns true
*/
static Filter noop() {
return ce -> true;
}

static Filter fromContract(DataPlaneContract.DialectedFilter filter) {
return switch (filter.getFilterCase()) {
case EXACT -> new ExactFilter(filter.getExact().getAttributesMap());
case PREFIX -> new PrefixFilter(filter.getPrefix().getAttributesMap());
case SUFFIX -> new SuffixFilter(filter.getSuffix().getAttributesMap());
case NOT -> new NotFilter(fromContract(filter.getNot().getFilter()));
case ANY -> new AnyFilter(filter.getAny().getFiltersList().stream()
.map(Filter::fromContract)
.collect(Collectors.toList()));
case ALL -> new AllFilter(filter.getAll().getFiltersList().stream()
.map(Filter::fromContract)
.collect(Collectors.toList()));
case CESQL -> new CeSqlFilter(filter.getCesql().getExpression());
default -> Filter.noop();
};
}

static Filter fromContract(List<DataPlaneContract.DialectedFilter> filters) {
return new AllFilter(filters.stream().map(Filter::fromContract).collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.core.filter.Filter;
import io.cloudevents.CloudEvent;
import java.util.List;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.core.filter.Filter;
import io.cloudevents.CloudEvent;
import java.util.List;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.core.filter.Filter;
import io.cloudevents.CloudEvent;
import io.cloudevents.sql.EvaluationContext;
import io.cloudevents.sql.EvaluationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
import java.util.Map;

public class ExactFilter extends AttributesFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.core.filter.Filter;
import io.cloudevents.CloudEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
import java.util.Map;

public class PrefixFilter extends AttributesFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
import java.util.Map;

public class SuffixFilter extends AttributesFilter {
Expand Down
Loading
Loading