From b3ccb113cb2af9fda4491d5dbde342c944d9f0db Mon Sep 17 00:00:00 2001 From: Ville Aikas <11279988+vaikas@users.noreply.github.com> Date: Mon, 19 Apr 2021 13:15:00 -0700 Subject: [PATCH] Do events end where they should (#5280) * define the test API * basic routing tests * remove brokername * sigh * lint / codegen --- go.mod | 1 + go.sum | 1 - test/rekt/features/broker/control_plane.go | 325 +++++++++++++++++- .../features/broker/control_plane_test.go | 219 ++++++++++++ vendor/modules.txt | 1 + 5 files changed, 539 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 650d1f478d..9cd708344c 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/cloudevents/conformance v0.2.0 github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.4.1 github.com/cloudevents/sdk-go/v2 v2.4.1 github.com/golang/protobuf v1.5.2 diff --git a/go.sum b/go.sum index 83defbb57b..0468ba5d43 100644 --- a/go.sum +++ b/go.sum @@ -1115,7 +1115,6 @@ knative.dev/hack v0.0.0-20210325223819-b6ab329907d3 h1:km0Rrh0T9/wA2pivQm1hqSPVw knative.dev/hack v0.0.0-20210325223819-b6ab329907d3/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3 h1:F/pVm+rB+WpyVhH9cmVn3Lh53+UI24qlnjaYiqaw1pw= knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0= -knative.dev/pkg v0.0.0-20210412173742-b51994e3b312 h1:tE80vxKw9ENrLRe+U9BvLAcJ5UYpDc40r5hFoRFUXh0= knative.dev/pkg v0.0.0-20210412173742-b51994e3b312/go.mod h1:V/yjYpwRpIoUCavOoF8plCw72kF7rMjWPms5v2QqxA4= knative.dev/pkg v0.0.0-20210416161310-b80a1926251c h1:WB+7bcRo5SadPzEQx+eqC6dlRySlnfh6o5bW16UjsjY= knative.dev/pkg v0.0.0-20210416161310-b80a1926251c/go.mod h1:V/yjYpwRpIoUCavOoF8plCw72kF7rMjWPms5v2QqxA4= diff --git a/test/rekt/features/broker/control_plane.go b/test/rekt/features/broker/control_plane.go index 497c73631e..b47797b7b1 100644 --- a/test/rekt/features/broker/control_plane.go +++ b/test/rekt/features/broker/control_plane.go @@ -23,6 +23,7 @@ import ( "sort" "strings" + conformanceevent "github.com/cloudevents/conformance/pkg/event" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -56,6 +57,7 @@ func ControlPlaneConformance(brokerName string) *feature.FeatureSet { *ControlPlaneTrigger_WithValidFilters(brokerName), *ControlPlaneTrigger_WithInvalidFilters(brokerName), *ControlPlaneDelivery(), + *ControlPlaneEventRouting(), }, } // TODO: This is not a control plane test, or at best it is a blend with data plane. @@ -363,6 +365,165 @@ func ControlPlaneDelivery() *feature.Feature { return f } +func ControlPlaneEventRouting() *feature.Feature { + f := feature.NewFeatureNamed("Event Routing Spec") + + for i, tt := range []struct { + name string + config []triggerTestConfig + inEvents []conformanceevent.Event + }{{ + name: "One trigger, no filter, gets event", + config: []triggerTestConfig{{}}, + inEvents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + Type: "com.example.FullEvent", + }, + }, + }, + }, { + name: "One trigger, with filter, does not get event", + config: []triggerTestConfig{ + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "mytype", + }, + }, + }, + }, + inEvents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + Type: "notmytype", + }, + }, + }, + }, { + name: "One trigger, with filter, gets the event", + config: []triggerTestConfig{ + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "com.example.FullEvent", + // "type": "mytype", + }, + }, + }, + }, + inEvents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + Type: "mytype", + }, + }, + }, + }, { + // name: "Two triggers, with filter, both get the event", + config: []triggerTestConfig{ + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "com.example.FullEvent", + // "type": "mytype", + }, + }, + }, + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "com.example.FullEvent", + // "type": "mytype", + }, + }, + }, + }, + inEvents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + // Type: "mytype", + Type: "com.example.FullEvent", + }, + }, + }, + }, { + name: "Two triggers, with filter, only matching one gets the event", + config: []triggerTestConfig{ + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "notmytype", + }, + }, + }, + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "com.example.FullEvent", + // "type": "mytype", + }, + }, + }, + }, + inEvents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + // Type: "mytype", + Type: "com.example.FullEvent", + }, + }, + }, + }, { + name: "Two triggers, with no filters, both get the event", + config: []triggerTestConfig{{}, {}}, + inEvents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + // "type": "com.example.FullEvent", + Type: "com.example.FullEvent", + }, + }, + }, + }} { + brokerName := fmt.Sprintf("routing-test-%d", i) + f.Setup("Set Broker Name", setBrokerName(brokerName)) + prober := createBrokerTriggerEventRoutingTopology(f, brokerName, tt.config) + + // Send an event into the matrix and hope for the best + // TODO: We need to do some work to get the event types into the Prober. + // All the events generated are currently hardcoded into the com.example.FullEvent + // so once prober supports more configuration, wire it up here. + prober.SenderFullEvents(1) + f.Setup("install source", prober.SenderInstall("source")) + f.Requirement("sender is finished", prober.SenderDone("source")) + + // All events have been sent, time to look at the specs and confirm we got them. + expectedEvents := createExpectedEventRoutingMap(tt.config, tt.inEvents) + + f.Requirement("wait until done", func(ctx context.Context, t feature.T) { + interval, timeout := environment.PollTimingsFromContext(ctx) + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + gtg := true + for prefix, want := range expectedEvents { + events := prober.ReceivedOrRejectedBy(ctx, prefix) + if len(events) != len(want) { + gtg = false + } + } + return gtg, nil + }) + if err != nil { + t.Failed() + } + }) + + f.Stable("Conformance").Should(tt.name, assertExpectedRoutedEvents(prober, expectedEvents)) + } + + return f +} + type EventingClient struct { Brokers eventingclientsetv1.BrokerInterface Triggers eventingclientsetv1.TriggerInterface @@ -516,9 +677,6 @@ func triggerSpecBrokerIsImmutable(ctx context.Context, t feature.T) { // func createBrokerTriggerDeliveryTopology(f *feature.Feature, brokerName string, brokerDS, t1DS, t2DS *v1.DeliverySpec, t1FailCount, t2FailCount uint) *eventshub.EventProber { prober := eventshub.NewProber() - // This will set or clear the broker delivery spec settings. - // Make trigger with delivery settings. - // Make a trigger with no delivery spec. // TODO: Optimize these to only install things required. For example, if there's no t2 dlq, no point creating a prober for it. f.Setup("install recorder for t1", prober.ReceiverInstall("t1", eventshub.DropFirstN(t1FailCount))) @@ -566,10 +724,6 @@ func createBrokerTriggerDeliveryTopology(f *feature.Feature, brokerName string, return prober } -// createExpectedEventMap creates a datastructure for a given test topology created by `createBrokerTriggerDeliveryTopology` function. -// Things we know from the DeliverySpecs passed in are where failed events from both t1 and t2 should land in. -// We also know how many events (incoming as well as how many failures the trigger subscriber is supposed to see). -// Note there are lot of baked assumptions and very tight coupling between this and `createBrokerTriggerDeliveryTopology` function. type expectedEvents struct { eventSuccess []bool // events and their outcomes (succeeded or failed) in order received by the Receiver // What is the minimum time between events above. If there's only one entry, it's irrelevant and will be useless. If there's, say @@ -584,6 +738,10 @@ func retryCount(r *int32) uint { return uint(*r) } +// createExpectedEventMap creates a datastructure for a given test topology created by `createBrokerTriggerDeliveryTopology` function. +// Things we know from the DeliverySpecs passed in are where failed events from both t1 and t2 should land in. +// We also know how many events (incoming as well as how many failures the trigger subscriber is supposed to see). +// Note there are lot of baked assumptions and very tight coupling between this and `createBrokerTriggerDeliveryTopology` function. func createExpectedEventMap(brokerDS, t1DS, t2DS *v1.DeliverySpec, t1FailCount, t2FailCount uint) map[string]expectedEvents { // By default, assume that nothing gets anything. r := map[string]expectedEvents{ @@ -727,6 +885,26 @@ func assertExpectedEvents(prober *eventshub.EventProber, expected map[string]exp } } +func assertExpectedRoutedEvents(prober *eventshub.EventProber, expected map[string][]conformanceevent.Event) feature.StepFn { + return func(ctx context.Context, t feature.T) { + for prefix, want := range expected { + got := happenedFullEvent(ctx, prober, prefix) + + t.Logf("Expected Events %s; \nGot: %#v\n Want: %#v", prefix, got, want) + if len(want) != len(got) { + t.Errorf("Wanted %d events, got %d", len(want), len(got)) + } + + // Check event acceptance. + if len(want) != 0 && len(got) != 0 { + if diff := cmp.Diff(want, got); diff != "" { + t.Error("unexpected event routing behaviour (-want, +got) =", diff) + } + } + } + } +} + // TODO: this function could be moved to the prober directly. func happened(ctx context.Context, prober *eventshub.EventProber, prefix string) expectedEvents { events := prober.ReceivedOrRejectedBy(ctx, prefix) @@ -748,3 +926,136 @@ func happened(ctx context.Context, prober *eventshub.EventProber, prefix string) } return got } + +// TODO: this function could be moved to the prober directly. +func happenedFullEvent(ctx context.Context, prober *eventshub.EventProber, prefix string) []conformanceevent.Event { + events := prober.ReceivedOrRejectedBy(ctx, prefix) + sort.Slice(events, func(i, j int) bool { + return events[i].Time.Before(events[j].Time) + }) + ret := make([]conformanceevent.Event, len(events)) + for i, e := range events { + // TODO: yeah, like full event please... + ret[i] = conformanceevent.Event{ + Attributes: conformanceevent.ContextAttributes{ + Type: e.Event.Type(), + }} + } + return ret + +} + +// triggerTestConfig is used to define each Trigger behaviour used to construct the test topology by +// createBrokerTriggerEventRoutingTopology +type triggerTestConfig struct { + filter *eventingv1.TriggerFilter + reply *conformanceevent.Event +} + +// +// createBrokerTriggerEventRoutingTopology creates a topology that allows us to test the various +// trigger filter configurations. For each triggerConfig a Trigger will be created with an optional +// filter as well as a reply that it will generate in response to any event that it receives. +// For each entry in the triggerConfigs ("i"th entry will be ti in the picture below). If there's +// a filter specified trigger is configured with it. If there's a reply, then the "t0" will be configured +// to reply with that event. **REPLY FUNCTIONALITY DOES NOT WORK YET** +// TODO: Fix the reply. +// +// source ---> [broker] --+--[t0 (optional filter)]--> "t0" (optional reply) +// | +// +--[t1 (optional filter)]--> "t1" (optional reply) +// ... +// +--[tN (optional filter)]--> "tN" (optional reply) +// +func createBrokerTriggerEventRoutingTopology(f *feature.Feature, brokerName string, triggerConfigs []triggerTestConfig) *eventshub.EventProber { + prober := eventshub.NewProber() + + // Install the receivers for all the triggers + for i, config := range triggerConfigs { + triggerName := fmt.Sprintf("t%d", i) + // TODO: If there's a corresponding reply, need to wire it in here + if config.reply != nil { + fmt.Printf("TODO: WIRE UP REPLY TO TRIGGER %s", triggerName) + } + f.Setup(fmt.Sprintf("install recorder for %s", triggerName), prober.ReceiverInstall(triggerName)) + } + + brokerOpts := brokerresources.WithEnvConfig() + + f.Setup("Create Broker", brokerresources.Install(brokerName, brokerOpts...)) + f.Setup("Broker is Ready", brokerresources.IsReady(brokerName)) // We want to block until broker is ready to go. + + prober.SetTargetResource(brokerresources.GVR(), brokerName) + + for i, config := range triggerConfigs { + triggerName := fmt.Sprintf("t%d", i) + tOpts := []manifest.CfgFn{triggerresources.WithSubscriber(prober.AsKReference(triggerName), "")} + if config.filter != nil { + tOpts = append(tOpts, triggerresources.WithFilter(config.filter.Attributes)) + } + f.Setup(fmt.Sprintf("Create %s with recorder", triggerName), triggerresources.Install(feature.MakeRandomK8sName(triggerName), brokerName, tOpts...)) + } + return prober +} + +// createExpectedEventRoutingMap takes in an array of trigger configurations as well as incoming events and +// constructs a map of where the events should land. Any replies in trigger configurations will be treated +// as matchable events (since they are going to be sent back to the Broker). +func createExpectedEventRoutingMap(triggerConfigs []triggerTestConfig, inEvents []conformanceevent.Event) map[string][]conformanceevent.Event { + ret := make(map[string][]conformanceevent.Event, len(triggerConfigs)) + + // for each of the events (both incoming and newly created (replies)) check each trigger filter and append + // to the expected events if it matches. + for _, e := range inEvents { + for i, config := range triggerConfigs { + triggerName := fmt.Sprintf("t%d", i) + if eventMatchesTrigger(e, config.filter) { + ret[triggerName] = append(ret[triggerName], e) + } + } + } + // TODO(vaikas): I'm certain there's a bug/cornercase here. We should make sure that a Reply is considered + // to be sent ONLY if there's an event that matches that trigger filter so as to actually invoke the Trigger + // and therefore generating a reply. + // Basically we need to look at all the incoming events (basically above) and make a note if a trigger + // subscriber gets called and add that logic here. + for _, e := range triggerConfigs { + if e.reply != nil { + for i, config := range triggerConfigs { + triggerName := fmt.Sprintf("t%d", i) + if eventMatchesTrigger(*e.reply, config.filter) { + ret[triggerName] = append(ret[triggerName], *e.reply) + } + } + } + } + return ret +} + +// eventNMatchesTrigger checks an event and returns True if the event matches the event. +// nil filter means everything matches, so it's safe to pass nil in here. +func eventMatchesTrigger(event conformanceevent.Event, filter *eventingv1.TriggerFilter) bool { + // With no filter, everything matches + if filter == nil { + return true + } + for attribute, value := range filter.Attributes { + switch attribute { + case "type": + return event.Attributes.Type == value + case "source": + return event.Attributes.Source == value + case "subject": + return event.Attributes.Subject == value + } + // Not a well known attribute, check extensions. + filterAttribute, ok := event.Attributes.Extensions[attribute] + if !ok { + // We want an attribute on the event, but it's not there, so no soup for you. + return false + } + return filterAttribute == attribute + } + // TODO: Do more matching here as necessary. + return false +} diff --git a/test/rekt/features/broker/control_plane_test.go b/test/rekt/features/broker/control_plane_test.go index a2fa3a984a..3e9898b0a4 100644 --- a/test/rekt/features/broker/control_plane_test.go +++ b/test/rekt/features/broker/control_plane_test.go @@ -22,7 +22,9 @@ import ( "github.com/google/go-cmp/cmp" + conformanceevent "github.com/cloudevents/conformance/pkg/event" v1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" pkgduckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/ptr" ) @@ -276,3 +278,220 @@ func TestCreateExpectedEventMap(t *testing.T) { } } } + +func TestEventMatchesTrigger(t *testing.T) { + for _, tt := range []struct { + name string + event conformanceevent.Event + filter *eventingv1.TriggerFilter + want bool + }{{ + name: "nil filter matches everything", + event: conformanceevent.Event{}, + filter: nil, + want: true, + }, { + name: "correct match on type", + event: conformanceevent.Event{ + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "eventtype", + }, + }, + want: true, + }, { + name: "wrong type", + event: conformanceevent.Event{ + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "wrongtype", + }, + }, + want: false, + }} { + if got := eventMatchesTrigger(tt.event, tt.filter); got != tt.want { + t.Errorf("%q : Event: %+v Trigger: %+v wanted match: %t but got %t", tt.name, tt.event, tt.filter, tt.want, got) + } + } +} + +func TestCreateExpectedEventDeliveryMap(t *testing.T) { + for _, tt := range []struct { + name string + inevents []conformanceevent.Event + config []triggerTestConfig + want map[string][]conformanceevent.Event + }{{ + name: "nil filter matches everything", + inevents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + config: make([]triggerTestConfig, 1), // One trigger, no filter, no reply + want: map[string][]conformanceevent.Event{ + "t0": { + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + }, + }, { + name: "wrong event type, no match ", + inevents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + config: []triggerTestConfig{ + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "wrongtype", + }, + }, + }, + }, + want: map[string][]conformanceevent.Event{}, + }, { + name: "Two triggers, incoming matches the second one (t1), not first (t0). Reply from t1 matches t0", + inevents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + config: []triggerTestConfig{ + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "replyeventtype", + }, + }, + }, + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "eventtype", + }, + }, + reply: &conformanceevent.Event{ + Attributes: conformanceevent.ContextAttributes{ + Type: "replyeventtype", + }, + }, + }, + }, + want: map[string][]conformanceevent.Event{ + "t0": { + { + Attributes: conformanceevent.ContextAttributes{ + Type: "replyeventtype", + }, + }, + }, + "t1": { + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + }, + }, { + name: "Two triggers, matches the second one (t1), not first (t0)", + inevents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + config: []triggerTestConfig{ + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "wrongtype", + }, + }, + }, + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "eventtype", + }, + }, + }, + }, + want: map[string][]conformanceevent.Event{ + "t1": { + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + }, + }, { + name: "Two triggers, both get the same event", + inevents: []conformanceevent.Event{ + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + config: []triggerTestConfig{ + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "eventtype", + }, + }, + }, + { + filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": "eventtype", + }, + }, + }, + }, + want: map[string][]conformanceevent.Event{ + "t0": { + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + "t1": { + { + Attributes: conformanceevent.ContextAttributes{ + Type: "eventtype", + }, + }, + }, + }, + }} { + got := createExpectedEventRoutingMap(tt.config, tt.inevents) + if !reflect.DeepEqual(tt.want, got) { + t.Logf("%s: Maps unequal: want:\n%+v\ngot:\n%+v", tt.name, tt.want, got) + } + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d4cf785dd7..279abd5c73 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -82,6 +82,7 @@ github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1 github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1 github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1 # github.com/cloudevents/conformance v0.2.0 +## explicit github.com/cloudevents/conformance/pkg/event github.com/cloudevents/conformance/pkg/http # github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.4.1