Skip to content

Commit

Permalink
Do events end where they should (#5280)
Browse files Browse the repository at this point in the history
* define the test API

* basic routing tests

* remove brokername

* sigh

* lint / codegen
  • Loading branch information
vaikas authored Apr 19, 2021
1 parent cd7c750 commit b3ccb11
Show file tree
Hide file tree
Showing 5 changed files with 539 additions and 8 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.15

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cloudevents/conformance v0.2.0
github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.4.1
github.com/cloudevents/sdk-go/v2 v2.4.1
github.com/golang/protobuf v1.5.2
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,6 @@ knative.dev/hack v0.0.0-20210325223819-b6ab329907d3 h1:km0Rrh0T9/wA2pivQm1hqSPVw
knative.dev/hack v0.0.0-20210325223819-b6ab329907d3/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3 h1:F/pVm+rB+WpyVhH9cmVn3Lh53+UI24qlnjaYiqaw1pw=
knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/pkg v0.0.0-20210412173742-b51994e3b312 h1:tE80vxKw9ENrLRe+U9BvLAcJ5UYpDc40r5hFoRFUXh0=
knative.dev/pkg v0.0.0-20210412173742-b51994e3b312/go.mod h1:V/yjYpwRpIoUCavOoF8plCw72kF7rMjWPms5v2QqxA4=
knative.dev/pkg v0.0.0-20210416161310-b80a1926251c h1:WB+7bcRo5SadPzEQx+eqC6dlRySlnfh6o5bW16UjsjY=
knative.dev/pkg v0.0.0-20210416161310-b80a1926251c/go.mod h1:V/yjYpwRpIoUCavOoF8plCw72kF7rMjWPms5v2QqxA4=
Expand Down
325 changes: 318 additions & 7 deletions test/rekt/features/broker/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"strings"

conformanceevent "github.com/cloudevents/conformance/pkg/event"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -56,6 +57,7 @@ func ControlPlaneConformance(brokerName string) *feature.FeatureSet {
*ControlPlaneTrigger_WithValidFilters(brokerName),
*ControlPlaneTrigger_WithInvalidFilters(brokerName),
*ControlPlaneDelivery(),
*ControlPlaneEventRouting(),
},
}
// TODO: This is not a control plane test, or at best it is a blend with data plane.
Expand Down Expand Up @@ -363,6 +365,165 @@ func ControlPlaneDelivery() *feature.Feature {
return f
}

func ControlPlaneEventRouting() *feature.Feature {
f := feature.NewFeatureNamed("Event Routing Spec")

for i, tt := range []struct {
name string
config []triggerTestConfig
inEvents []conformanceevent.Event
}{{
name: "One trigger, no filter, gets event",
config: []triggerTestConfig{{}},
inEvents: []conformanceevent.Event{
{
Attributes: conformanceevent.ContextAttributes{
Type: "com.example.FullEvent",
},
},
},
}, {
name: "One trigger, with filter, does not get event",
config: []triggerTestConfig{
{
filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": "mytype",
},
},
},
},
inEvents: []conformanceevent.Event{
{
Attributes: conformanceevent.ContextAttributes{
Type: "notmytype",
},
},
},
}, {
name: "One trigger, with filter, gets the event",
config: []triggerTestConfig{
{
filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": "com.example.FullEvent",
// "type": "mytype",
},
},
},
},
inEvents: []conformanceevent.Event{
{
Attributes: conformanceevent.ContextAttributes{
Type: "mytype",
},
},
},
}, {
// name: "Two triggers, with filter, both get the event",
config: []triggerTestConfig{
{
filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": "com.example.FullEvent",
// "type": "mytype",
},
},
},
{
filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": "com.example.FullEvent",
// "type": "mytype",
},
},
},
},
inEvents: []conformanceevent.Event{
{
Attributes: conformanceevent.ContextAttributes{
// Type: "mytype",
Type: "com.example.FullEvent",
},
},
},
}, {
name: "Two triggers, with filter, only matching one gets the event",
config: []triggerTestConfig{
{
filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": "notmytype",
},
},
},
{
filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": "com.example.FullEvent",
// "type": "mytype",
},
},
},
},
inEvents: []conformanceevent.Event{
{
Attributes: conformanceevent.ContextAttributes{
// Type: "mytype",
Type: "com.example.FullEvent",
},
},
},
}, {
name: "Two triggers, with no filters, both get the event",
config: []triggerTestConfig{{}, {}},
inEvents: []conformanceevent.Event{
{
Attributes: conformanceevent.ContextAttributes{
// "type": "com.example.FullEvent",
Type: "com.example.FullEvent",
},
},
},
}} {
brokerName := fmt.Sprintf("routing-test-%d", i)
f.Setup("Set Broker Name", setBrokerName(brokerName))
prober := createBrokerTriggerEventRoutingTopology(f, brokerName, tt.config)

// Send an event into the matrix and hope for the best
// TODO: We need to do some work to get the event types into the Prober.
// All the events generated are currently hardcoded into the com.example.FullEvent
// so once prober supports more configuration, wire it up here.
prober.SenderFullEvents(1)
f.Setup("install source", prober.SenderInstall("source"))
f.Requirement("sender is finished", prober.SenderDone("source"))

// All events have been sent, time to look at the specs and confirm we got them.
expectedEvents := createExpectedEventRoutingMap(tt.config, tt.inEvents)

f.Requirement("wait until done", func(ctx context.Context, t feature.T) {
interval, timeout := environment.PollTimingsFromContext(ctx)
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
gtg := true
for prefix, want := range expectedEvents {
events := prober.ReceivedOrRejectedBy(ctx, prefix)
if len(events) != len(want) {
gtg = false
}
}
return gtg, nil
})
if err != nil {
t.Failed()
}
})

f.Stable("Conformance").Should(tt.name, assertExpectedRoutedEvents(prober, expectedEvents))
}

return f
}

type EventingClient struct {
Brokers eventingclientsetv1.BrokerInterface
Triggers eventingclientsetv1.TriggerInterface
Expand Down Expand Up @@ -516,9 +677,6 @@ func triggerSpecBrokerIsImmutable(ctx context.Context, t feature.T) {
//
func createBrokerTriggerDeliveryTopology(f *feature.Feature, brokerName string, brokerDS, t1DS, t2DS *v1.DeliverySpec, t1FailCount, t2FailCount uint) *eventshub.EventProber {
prober := eventshub.NewProber()
// This will set or clear the broker delivery spec settings.
// Make trigger with delivery settings.
// Make a trigger with no delivery spec.

// TODO: Optimize these to only install things required. For example, if there's no t2 dlq, no point creating a prober for it.
f.Setup("install recorder for t1", prober.ReceiverInstall("t1", eventshub.DropFirstN(t1FailCount)))
Expand Down Expand Up @@ -566,10 +724,6 @@ func createBrokerTriggerDeliveryTopology(f *feature.Feature, brokerName string,
return prober
}

// createExpectedEventMap creates a datastructure for a given test topology created by `createBrokerTriggerDeliveryTopology` function.
// Things we know from the DeliverySpecs passed in are where failed events from both t1 and t2 should land in.
// We also know how many events (incoming as well as how many failures the trigger subscriber is supposed to see).
// Note there are lot of baked assumptions and very tight coupling between this and `createBrokerTriggerDeliveryTopology` function.
type expectedEvents struct {
eventSuccess []bool // events and their outcomes (succeeded or failed) in order received by the Receiver
// What is the minimum time between events above. If there's only one entry, it's irrelevant and will be useless. If there's, say
Expand All @@ -584,6 +738,10 @@ func retryCount(r *int32) uint {
return uint(*r)
}

// createExpectedEventMap creates a datastructure for a given test topology created by `createBrokerTriggerDeliveryTopology` function.
// Things we know from the DeliverySpecs passed in are where failed events from both t1 and t2 should land in.
// We also know how many events (incoming as well as how many failures the trigger subscriber is supposed to see).
// Note there are lot of baked assumptions and very tight coupling between this and `createBrokerTriggerDeliveryTopology` function.
func createExpectedEventMap(brokerDS, t1DS, t2DS *v1.DeliverySpec, t1FailCount, t2FailCount uint) map[string]expectedEvents {
// By default, assume that nothing gets anything.
r := map[string]expectedEvents{
Expand Down Expand Up @@ -727,6 +885,26 @@ func assertExpectedEvents(prober *eventshub.EventProber, expected map[string]exp
}
}

func assertExpectedRoutedEvents(prober *eventshub.EventProber, expected map[string][]conformanceevent.Event) feature.StepFn {
return func(ctx context.Context, t feature.T) {
for prefix, want := range expected {
got := happenedFullEvent(ctx, prober, prefix)

t.Logf("Expected Events %s; \nGot: %#v\n Want: %#v", prefix, got, want)
if len(want) != len(got) {
t.Errorf("Wanted %d events, got %d", len(want), len(got))
}

// Check event acceptance.
if len(want) != 0 && len(got) != 0 {
if diff := cmp.Diff(want, got); diff != "" {
t.Error("unexpected event routing behaviour (-want, +got) =", diff)
}
}
}
}
}

// TODO: this function could be moved to the prober directly.
func happened(ctx context.Context, prober *eventshub.EventProber, prefix string) expectedEvents {
events := prober.ReceivedOrRejectedBy(ctx, prefix)
Expand All @@ -748,3 +926,136 @@ func happened(ctx context.Context, prober *eventshub.EventProber, prefix string)
}
return got
}

// TODO: this function could be moved to the prober directly.
func happenedFullEvent(ctx context.Context, prober *eventshub.EventProber, prefix string) []conformanceevent.Event {
events := prober.ReceivedOrRejectedBy(ctx, prefix)
sort.Slice(events, func(i, j int) bool {
return events[i].Time.Before(events[j].Time)
})
ret := make([]conformanceevent.Event, len(events))
for i, e := range events {
// TODO: yeah, like full event please...
ret[i] = conformanceevent.Event{
Attributes: conformanceevent.ContextAttributes{
Type: e.Event.Type(),
}}
}
return ret

}

// triggerTestConfig is used to define each Trigger behaviour used to construct the test topology by
// createBrokerTriggerEventRoutingTopology
type triggerTestConfig struct {
filter *eventingv1.TriggerFilter
reply *conformanceevent.Event
}

//
// createBrokerTriggerEventRoutingTopology creates a topology that allows us to test the various
// trigger filter configurations. For each triggerConfig a Trigger will be created with an optional
// filter as well as a reply that it will generate in response to any event that it receives.
// For each entry in the triggerConfigs ("i"th entry will be ti in the picture below). If there's
// a filter specified trigger is configured with it. If there's a reply, then the "t0" will be configured
// to reply with that event. **REPLY FUNCTIONALITY DOES NOT WORK YET**
// TODO: Fix the reply.
//
// source ---> [broker] --+--[t0 (optional filter)]--> "t0" (optional reply)
// |
// +--[t1 (optional filter)]--> "t1" (optional reply)
// ...
// +--[tN (optional filter)]--> "tN" (optional reply)
//
func createBrokerTriggerEventRoutingTopology(f *feature.Feature, brokerName string, triggerConfigs []triggerTestConfig) *eventshub.EventProber {
prober := eventshub.NewProber()

// Install the receivers for all the triggers
for i, config := range triggerConfigs {
triggerName := fmt.Sprintf("t%d", i)
// TODO: If there's a corresponding reply, need to wire it in here
if config.reply != nil {
fmt.Printf("TODO: WIRE UP REPLY TO TRIGGER %s", triggerName)
}
f.Setup(fmt.Sprintf("install recorder for %s", triggerName), prober.ReceiverInstall(triggerName))
}

brokerOpts := brokerresources.WithEnvConfig()

f.Setup("Create Broker", brokerresources.Install(brokerName, brokerOpts...))
f.Setup("Broker is Ready", brokerresources.IsReady(brokerName)) // We want to block until broker is ready to go.

prober.SetTargetResource(brokerresources.GVR(), brokerName)

for i, config := range triggerConfigs {
triggerName := fmt.Sprintf("t%d", i)
tOpts := []manifest.CfgFn{triggerresources.WithSubscriber(prober.AsKReference(triggerName), "")}
if config.filter != nil {
tOpts = append(tOpts, triggerresources.WithFilter(config.filter.Attributes))
}
f.Setup(fmt.Sprintf("Create %s with recorder", triggerName), triggerresources.Install(feature.MakeRandomK8sName(triggerName), brokerName, tOpts...))
}
return prober
}

// createExpectedEventRoutingMap takes in an array of trigger configurations as well as incoming events and
// constructs a map of where the events should land. Any replies in trigger configurations will be treated
// as matchable events (since they are going to be sent back to the Broker).
func createExpectedEventRoutingMap(triggerConfigs []triggerTestConfig, inEvents []conformanceevent.Event) map[string][]conformanceevent.Event {
ret := make(map[string][]conformanceevent.Event, len(triggerConfigs))

// for each of the events (both incoming and newly created (replies)) check each trigger filter and append
// to the expected events if it matches.
for _, e := range inEvents {
for i, config := range triggerConfigs {
triggerName := fmt.Sprintf("t%d", i)
if eventMatchesTrigger(e, config.filter) {
ret[triggerName] = append(ret[triggerName], e)
}
}
}
// TODO(vaikas): I'm certain there's a bug/cornercase here. We should make sure that a Reply is considered
// to be sent ONLY if there's an event that matches that trigger filter so as to actually invoke the Trigger
// and therefore generating a reply.
// Basically we need to look at all the incoming events (basically above) and make a note if a trigger
// subscriber gets called and add that logic here.
for _, e := range triggerConfigs {
if e.reply != nil {
for i, config := range triggerConfigs {
triggerName := fmt.Sprintf("t%d", i)
if eventMatchesTrigger(*e.reply, config.filter) {
ret[triggerName] = append(ret[triggerName], *e.reply)
}
}
}
}
return ret
}

// eventNMatchesTrigger checks an event and returns True if the event matches the event.
// nil filter means everything matches, so it's safe to pass nil in here.
func eventMatchesTrigger(event conformanceevent.Event, filter *eventingv1.TriggerFilter) bool {
// With no filter, everything matches
if filter == nil {
return true
}
for attribute, value := range filter.Attributes {
switch attribute {
case "type":
return event.Attributes.Type == value
case "source":
return event.Attributes.Source == value
case "subject":
return event.Attributes.Subject == value
}
// Not a well known attribute, check extensions.
filterAttribute, ok := event.Attributes.Extensions[attribute]
if !ok {
// We want an attribute on the event, but it's not there, so no soup for you.
return false
}
return filterAttribute == attribute
}
// TODO: Do more matching here as necessary.
return false
}
Loading

0 comments on commit b3ccb11

Please sign in to comment.