From 295a4eb0bb2e31f170f9a645896642bcec015d15 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Wed, 28 Jun 2023 08:23:31 -0700 Subject: [PATCH] remove timeout Signed-off-by: Kavindu Dodanduwa --- pkg/openfeature/event_executor.go | 99 ++++++++++---------------- pkg/openfeature/event_executor_test.go | 48 ++----------- 2 files changed, 45 insertions(+), 102 deletions(-) diff --git a/pkg/openfeature/event_executor.go b/pkg/openfeature/event_executor.go index f46935fd..66ae3421 100644 --- a/pkg/openfeature/event_executor.go +++ b/pkg/openfeature/event_executor.go @@ -1,20 +1,15 @@ package openfeature import ( - "context" "fmt" "sync" "time" "github.com/go-logr/logr" - "golang.org/x/sync/errgroup" ) // event executor is a registry to connect API and Client event handlers to Providers -// handlerExecutionTime defines the maximum time event handler will wait for its handlers to complete -const handlerExecutionTime = 500 * time.Millisecond - type eventExecutor struct { defaultProviderReference *providerReference namedProviderReference map[string]*providerReference @@ -223,10 +218,7 @@ func (e *eventExecutor) listenAndShutdown(newProvider *providerReference, oldRef for { select { case event := <-(*newProvider.eventHandler).EventChannel(): - err := e.triggerEvent(event, newProvider.clientNameAssociation, newProvider.isDefault) - if err != nil { - e.logger.Error(err, fmt.Sprintf("error handling event type: %s", event.EventType)) - } + e.triggerEvent(event, newProvider.clientNameAssociation, newProvider.isDefault) case <-newProvider.shutdownSemaphore: return } @@ -247,73 +239,58 @@ func (e *eventExecutor) listenAndShutdown(newProvider *providerReference, oldRef } // triggerEvent performs the actual event handling -func (e *eventExecutor) triggerEvent(event Event, clientNameAssociation string, isDefault bool) error { +func (e *eventExecutor) triggerEvent(event Event, clientNameAssociation string, isDefault bool) { e.mu.Lock() defer e.mu.Unlock() - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - - group, gCtx := errgroup.WithContext(ctx) - - group.Go(func() error { - // first run API handlers - for _, c := range e.apiRegistry[event.EventType] { - e.executeHandler(*c, event) - } - - // then run Client handlers for name association + // first run API handlers + for _, c := range e.apiRegistry[event.EventType] { + e.executeHandler(*c, event) + } - // first direct associates - associateClientRegistry := e.scopedRegistry[clientNameAssociation] - for _, c := range associateClientRegistry.callbacks[event.EventType] { - e.executeHandler(*c, event) - } + // then run Client handlers for name association - if !isDefault { - return nil - } + // first direct associates + associateClientRegistry := e.scopedRegistry[clientNameAssociation] + for _, c := range associateClientRegistry.callbacks[event.EventType] { + e.executeHandler(*c, event) + } - // handling the default provider - invoke default provider bound handlers by filtering + if !isDefault { + return + } - var defaultHandlers []EventCallback + // handling the default provider - invoke default provider bound handlers by filtering - for clientName, registry := range e.scopedRegistry { - if _, ok := e.namedProviderReference[clientName]; !ok { - defaultHandlers = append(defaultHandlers, registry.callbacks[event.EventType]...) - } - } + var defaultHandlers []EventCallback - for _, c := range defaultHandlers { - e.executeHandler(*c, event) + for clientName, registry := range e.scopedRegistry { + if _, ok := e.namedProviderReference[clientName]; !ok { + defaultHandlers = append(defaultHandlers, registry.callbacks[event.EventType]...) } + } - return nil - }) - - // wait for completion or timeout - select { - case <-time.After(handlerExecutionTime): - return fmt.Errorf("event handlers timeout") - case <-gCtx.Done(): - return nil + for _, c := range defaultHandlers { + e.executeHandler(*c, event) } } // executeHandler is a helper which performs the actual invocation of the callback func (e *eventExecutor) executeHandler(f func(details EventDetails), event Event) { - defer func() { - if r := recover(); r != nil { - e.logger.Info("recovered from a panic") - } - }() + go func() { + defer func() { + if r := recover(); r != nil { + e.logger.Info("recovered from a panic") + } + }() - f(EventDetails{ - providerName: event.ProviderName, - ProviderEventDetails: ProviderEventDetails{ - Message: event.Message, - FlagChanges: event.FlagChanges, - EventMetadata: event.EventMetadata, - }, - }) + f(EventDetails{ + providerName: event.ProviderName, + ProviderEventDetails: ProviderEventDetails{ + Message: event.Message, + FlagChanges: event.FlagChanges, + EventMetadata: event.EventMetadata, + }, + }) + }() } diff --git a/pkg/openfeature/event_executor_test.go b/pkg/openfeature/event_executor_test.go index 62fd0124..245a8dc4 100644 --- a/pkg/openfeature/event_executor_test.go +++ b/pkg/openfeature/event_executor_test.go @@ -1,7 +1,6 @@ package openfeature import ( - "context" "reflect" "testing" "time" @@ -9,7 +8,6 @@ import ( "github.com/go-logr/logr" "github.com/open-feature/go-sdk/pkg/openfeature/internal" "golang.org/x/exp/slices" - "golang.org/x/sync/errgroup" ) var logger logr.Logger @@ -125,7 +123,7 @@ func TestEventHandler_Eventing(t *testing.T) { select { case result = <-rsp: break - case <-time.After(handlerExecutionTime): + case <-time.After(200 * time.Millisecond): t.Fatalf("timeout - event did not trigger") } if result.Message != "ReadyMessage" { @@ -193,7 +191,7 @@ func TestEventHandler_Eventing(t *testing.T) { select { case result = <-rsp: break - case <-time.After(handlerExecutionTime): + case <-time.After(200 * time.Millisecond): t.Fatalf("timeout - event did not trigger") } @@ -271,7 +269,7 @@ func TestEventHandler_clientAssociation(t *testing.T) { select { case <-rsp: t.Fatalf("incorrect association - executor must not have been invoked") - case <-time.After(handlerExecutionTime): + case <-time.After(200 * time.Millisecond): break } } @@ -307,7 +305,7 @@ func TestEventHandler_ErrorHandling(t *testing.T) { // trigger events manually go func() { - _ = executor.triggerEvent(Event{ + executor.triggerEvent(Event{ ProviderName: provider, EventType: ProviderReady, ProviderEventDetails: ProviderEventDetails{}, @@ -317,14 +315,14 @@ func TestEventHandler_ErrorHandling(t *testing.T) { select { case <-rsp: break - case <-time.After(handlerExecutionTime): + case <-time.After(200 * time.Millisecond): t.Error("API level callback timeout - executor recovery was not successful") } select { case <-rspClient: break - case <-time.After(handlerExecutionTime): + case <-time.After(200 * time.Millisecond): t.Error("client callback timeout - executor recovery was not successful") } } @@ -361,43 +359,11 @@ func TestEventHandler_ProviderReadiness(t *testing.T) { select { case <-rsp: break - case <-time.After(handlerExecutionTime): + case <-time.After(200 * time.Millisecond): t.Errorf("timedout waiting for ready state callback, but got none") } } -// Make sure event handler cannot block -func TestEventHandler_Timeout(t *testing.T) { - timeoutCallback := func(e EventDetails) { - time.Sleep(handlerExecutionTime * 10) - } - - executor := newEventExecutor(logger) - executor.registerApiHandler(ProviderReady, &timeoutCallback) - - group, ctx := errgroup.WithContext(context.Background()) - - group.Go(func() error { - return executor.triggerEvent(Event{ - ProviderName: "provider", - EventType: ProviderReady, - ProviderEventDetails: ProviderEventDetails{}, - }, "", true) - }) - - select { - case <-ctx.Done(): - break - case <-time.After(handlerExecutionTime * 2): - t.Fatalf("timeout while waiting for condition") - } - - err := group.Wait() - if err == nil { - t.Errorf("expected timeout error, but got none") - } -} - // Contract tests - registration & removal func TestEventHandler_Registration(t *testing.T) {