Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

fix: Fix event handling for duplicated subscriptions in go-sdk remote execution plane use case #542

Merged
merged 3 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/sdk/connector/controlplane/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,17 @@ func (cp *ControlPlane) stopComponents() {

func (cp *ControlPlane) handle(ctx context.Context, eventUpdate types.EventUpdate, integration Integration) error {
cp.logger.Debugf("Received an event of type: %s", *eventUpdate.KeptnEvent.Type)
// if we already know the subscription ID we can just forward the event to be handled
if eventUpdate.SubscriptionID != "" {
return cp.forwardMatchedEvent(ctx, eventUpdate, integration, eventUpdate.SubscriptionID)
}
for _, subscription := range cp.currentSubscriptions {
if subscription.Event == eventUpdate.MetaData.Subject {
cp.logger.Debugf("Check if event matches subscription %s", subscription.ID)
matcher := eventmatcher.New(subscription)
if matcher.Matches(eventUpdate.KeptnEvent) {
cp.logger.Info("Forwarding matched event update: ", eventUpdate.KeptnEvent.ID)
if err := cp.forwardMatchedEvent(ctx, eventUpdate, integration, subscription); err != nil {
if err := cp.forwardMatchedEvent(ctx, eventUpdate, integration, subscription.ID); err != nil {
return err
}
}
Expand All @@ -228,7 +232,7 @@ func (cp *ControlPlane) getSender(sender types.EventSender) types.EventSender {
}
}

func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate types.EventUpdate, integration Integration, subscription models.EventSubscription) error {
func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate types.EventUpdate, integration Integration, subscriptionID string) error {
// increase the eventHandler WaitGroup
cp.eventHandlerWaitGroup.Add(1)
// when the event handler is done, decrease the WaitGroup again
Expand All @@ -237,7 +241,7 @@ func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate typ
err := eventUpdate.KeptnEvent.AddTemporaryData(
tmpDataDistributorKey,
types.AdditionalSubscriptionData{
SubscriptionID: subscription.ID,
SubscriptionID: subscriptionID,
},
models.AddTemporaryDataOptions{
OverwriteIfExisting: true,
Expand Down
111 changes: 111 additions & 0 deletions pkg/sdk/connector/controlplane/controlplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,117 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
}, 5*time.Second, 100*time.Millisecond)
}

func TestControlPlaneInboundEventWithSubscriptionIDIsForwardedToIntegration(t *testing.T) {
var eventChan chan types.EventUpdate
var subsChan chan []models.EventSubscription
var integrationReceivedEvent models.KeptnContextExtendedCE
var subscriptionSourceStopCalled bool
var eventSourceStopCalled bool

mtx := sync.RWMutex{}

callBackSender := func(ce models.KeptnContextExtendedCE) error { return nil }

ssm := &fake.SubscriptionSourceMock{
StartFn: func(ctx context.Context, data types.RegistrationData, c chan []models.EventSubscription, errC chan error, wg *sync.WaitGroup) error {
mtx.Lock()
defer mtx.Unlock()
subsChan = c
wg.Done()
return nil
},
RegisterFn: func(integration models.Integration) (string, error) {
return "some-id", nil
},
StopFn: func() error {
mtx.Lock()
defer mtx.Unlock()
subscriptionSourceStopCalled = true
return nil
},
}
esm := &fake.EventSourceMock{
StartFn: func(ctx context.Context, data types.RegistrationData, ces chan types.EventUpdate, errC chan error, wg *sync.WaitGroup) error {
mtx.Lock()
defer mtx.Unlock()
eventChan = ces
wg.Done()
return nil
},
OnSubscriptionUpdateFn: func(strings []models.EventSubscription) {},
SenderFn: func() types.EventSender { return callBackSender },
StopFn: func() error {
mtx.Lock()
defer mtx.Unlock()
eventSourceStopCalled = true
return nil
},
CleanupFn: func() error {
return nil
},
}
fm := &LogForwarderMock{
ForwardFn: func(keptnEvent models.KeptnContextExtendedCE, integrationID string) error {
return nil
},
}

controlPlane := New(ssm, esm, fm)

integration := ExampleIntegration{
RegistrationDataFn: func() types.RegistrationData { return types.RegistrationData{} },
OnEventFn: func(ctx context.Context, ce models.KeptnContextExtendedCE) error {
mtx.Lock()
defer mtx.Unlock()
integrationReceivedEvent = ce
return nil
},
}
ctx, cancel := context.WithCancel(context.TODO())
go controlPlane.Register(ctx, integration)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
Error return value of controlPlane.Register is not checked (errcheck)

require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
return subsChan != nil
}, time.Second, time.Millisecond*100)
require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
return eventChan != nil
}, time.Second, time.Millisecond*100)

// event update containing explicit subscriptions ID
eventUpdate2 := types.EventUpdate{KeptnEvent: models.KeptnContextExtendedCE{ID: "EVENT_ID2", Type: strutils.Stringp("sh.keptn.event.echo.triggered")}, SubscriptionID: "SUBSCRIPTION_ID"}
eventChan <- eventUpdate2

require.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
eventUpdate2.KeptnEvent.Data = integrationReceivedEvent.Data
return reflect.DeepEqual(eventUpdate2.KeptnEvent, integrationReceivedEvent)
}, time.Second, time.Millisecond*100)

eventData := map[string]interface{}{}
err := integrationReceivedEvent.DataAs(&eventData)
require.Nil(t, err)

require.Equal(t, map[string]interface{}{
"temporaryData": map[string]interface{}{
"distributor": map[string]interface{}{
"subscriptionID": "SUBSCRIPTION_ID",
},
},
}, eventData)

cancel()

require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
return subscriptionSourceStopCalled && eventSourceStopCalled
}, 5*time.Second, 100*time.Millisecond)
}

func TestControlPlaneInboundEventIsForwardedToIntegrationWithoutLogForwarder(t *testing.T) {
var eventChan chan types.EventUpdate
var subsChan chan []models.EventSubscription
Expand Down
16 changes: 14 additions & 2 deletions pkg/sdk/connector/eventsource/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func (hes *HTTPEventSource) Start(ctx context.Context, data types.RegistrationDa
func (hes *HTTPEventSource) OnSubscriptionUpdate(subscriptions []models.EventSubscription) {
hes.mutex.Lock()
defer hes.mutex.Unlock()
if subscriptionDiffer(subscriptions, hes.currentSubscriptions) {
hes.logger.Infof("Got new subscriptions: %v", getEvents(subscriptions))
}
hes.currentSubscriptions = subscriptions
}

Expand Down Expand Up @@ -135,8 +138,9 @@ func (hes *HTTPEventSource) doPoll(eventUpdates chan types.EventUpdate) error {
continue
}
eventUpdates <- types.EventUpdate{
KeptnEvent: *e,
MetaData: types.EventUpdateMetaData{Subject: sub.Event},
KeptnEvent: *e,
MetaData: types.EventUpdateMetaData{Subject: sub.Event},
SubscriptionID: sub.ID,
}
hes.cache.Add(sub.ID, e.ID)
}
Expand Down Expand Up @@ -166,3 +170,11 @@ func getEventFilterForSubscription(subscription models.EventSubscription) api.Ev

return eventFilter
}

func getEvents(subscriptions []models.EventSubscription) []string {
events := []string{}
for _, s := range subscriptions {
events = append(events, s.Event)
}
return events
}
17 changes: 17 additions & 0 deletions pkg/sdk/connector/eventsource/http/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,23 @@ func dedup(elements []string) []string {
return result
}

// subscriptionDiffer checks whether two lists of event subscriptions are different based on the
// event type they are targeting
func subscriptionDiffer(s1 []models.EventSubscription, s2 []models.EventSubscription) bool {
for _, ns := range s1 {
found := false
for _, os := range s2 {
if os.Event == ns.Event {
found = true
}
}
if !found {
return true
}
}
return len(s1) != len(s2)
}

func ToIds(events []*models.KeptnContextExtendedCE) []string {
ids := []string{}
for _, e := range events {
Expand Down
83 changes: 83 additions & 0 deletions pkg/sdk/connector/eventsource/http/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"github.com/keptn/go-utils/pkg/api/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
Expand Down Expand Up @@ -95,3 +96,85 @@ func TestKeep(t *testing.T) {
assert.False(t, cache.Contains("t2", "e4"))
assert.True(t, cache.Contains("t2", "e5"))
}

func Test_subscriptionDiffer(t *testing.T) {
type args struct {
new []models.EventSubscription
old []models.EventSubscription
}
tests := []struct {
name string
args args
want bool
}{
{
name: "equal empty",
args: args{
new: []models.EventSubscription{},
old: []models.EventSubscription{},
},
want: false,
},
{
name: "equal",
args: args{
new: []models.EventSubscription{{Event: "e1"}},
old: []models.EventSubscription{{Event: "e1"}},
},
want: false,
},
{
name: "differ1",
args: args{
new: []models.EventSubscription{{Event: "e1"}},
old: []models.EventSubscription{{Event: "e2"}},
},
want: true,
},
{
name: "differ2",
args: args{
new: []models.EventSubscription{{Event: "e2"}},
old: []models.EventSubscription{{Event: "e1"}},
},
want: true,
},
{
name: "differ3",
args: args{
new: []models.EventSubscription{{Event: "e2"}, {Event: "e1"}},
old: []models.EventSubscription{{Event: "e1"}},
},
want: true,
},
{
name: "differ4",
args: args{
new: []models.EventSubscription{},
old: []models.EventSubscription{{Event: "e1"}},
},
want: true,
},
{
name: "differ5",
args: args{
new: []models.EventSubscription{{Event: "e1"}},
old: []models.EventSubscription{},
},
want: true,
},
{
name: "differ6",
args: args{
new: []models.EventSubscription{{Event: "e2"}},
old: []models.EventSubscription{{Event: "e1"}, {Event: "e1"}},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, subscriptionDiffer(tt.args.new, tt.args.old), "subscriptionDiffer(%v, %v)", tt.args.new, tt.args.old)
})
}
}
5 changes: 3 additions & 2 deletions pkg/sdk/connector/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ type AdditionalSubscriptionData struct {
}

type EventUpdate struct {
KeptnEvent models.KeptnContextExtendedCE
MetaData EventUpdateMetaData
KeptnEvent models.KeptnContextExtendedCE
MetaData EventUpdateMetaData
SubscriptionID string // optional
}

type EventUpdateMetaData struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sdk/internal/api/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func getHttpScheme(env config.EnvConfig) (string, error) {

func eventSource(apiSet keptnapi.KeptnInterface, logger logger.Logger, env config.EnvConfig) eventsource.EventSource {
if env.PubSubConnectionType() == config.ConnectionTypeHTTP {
return eventsourceHttp.New(clock.New(), eventsourceHttp.NewEventAPI(apiSet.ShipyardControlV1(), apiSet.APIV1()))
return eventsourceHttp.New(clock.New(), eventsourceHttp.NewEventAPI(apiSet.ShipyardControlV1(), apiSet.APIV1()), eventsourceHttp.WithLogger(logger))
}
natsConnector := nats.New(env.EventBrokerURL, nats.WithLogger(logger))
return eventsourceNats.New(natsConnector, eventsourceNats.WithLogger(logger))
Expand Down