Skip to content

Commit

Permalink
remove timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
  • Loading branch information
Kavindu-Dodan committed Jun 28, 2023
1 parent e5dd9d3 commit 295a4eb
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 102 deletions.
99 changes: 38 additions & 61 deletions pkg/openfeature/event_executor.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package openfeature

import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
)

// event executor is a registry to connect API and Client event handlers to Providers

// handlerExecutionTime defines the maximum time event handler will wait for its handlers to complete
const handlerExecutionTime = 500 * time.Millisecond

type eventExecutor struct {
defaultProviderReference *providerReference
namedProviderReference map[string]*providerReference
Expand Down Expand Up @@ -223,10 +218,7 @@ func (e *eventExecutor) listenAndShutdown(newProvider *providerReference, oldRef
for {
select {
case event := <-(*newProvider.eventHandler).EventChannel():
err := e.triggerEvent(event, newProvider.clientNameAssociation, newProvider.isDefault)
if err != nil {
e.logger.Error(err, fmt.Sprintf("error handling event type: %s", event.EventType))
}
e.triggerEvent(event, newProvider.clientNameAssociation, newProvider.isDefault)
case <-newProvider.shutdownSemaphore:
return
}
Expand All @@ -247,73 +239,58 @@ func (e *eventExecutor) listenAndShutdown(newProvider *providerReference, oldRef
}

// triggerEvent performs the actual event handling
func (e *eventExecutor) triggerEvent(event Event, clientNameAssociation string, isDefault bool) error {
func (e *eventExecutor) triggerEvent(event Event, clientNameAssociation string, isDefault bool) {
e.mu.Lock()
defer e.mu.Unlock()

ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

group, gCtx := errgroup.WithContext(ctx)

group.Go(func() error {
// first run API handlers
for _, c := range e.apiRegistry[event.EventType] {
e.executeHandler(*c, event)
}

// then run Client handlers for name association
// first run API handlers
for _, c := range e.apiRegistry[event.EventType] {
e.executeHandler(*c, event)
}

// first direct associates
associateClientRegistry := e.scopedRegistry[clientNameAssociation]
for _, c := range associateClientRegistry.callbacks[event.EventType] {
e.executeHandler(*c, event)
}
// then run Client handlers for name association

if !isDefault {
return nil
}
// first direct associates
associateClientRegistry := e.scopedRegistry[clientNameAssociation]
for _, c := range associateClientRegistry.callbacks[event.EventType] {
e.executeHandler(*c, event)
}

// handling the default provider - invoke default provider bound handlers by filtering
if !isDefault {
return
}

var defaultHandlers []EventCallback
// handling the default provider - invoke default provider bound handlers by filtering

for clientName, registry := range e.scopedRegistry {
if _, ok := e.namedProviderReference[clientName]; !ok {
defaultHandlers = append(defaultHandlers, registry.callbacks[event.EventType]...)
}
}
var defaultHandlers []EventCallback

for _, c := range defaultHandlers {
e.executeHandler(*c, event)
for clientName, registry := range e.scopedRegistry {
if _, ok := e.namedProviderReference[clientName]; !ok {
defaultHandlers = append(defaultHandlers, registry.callbacks[event.EventType]...)
}
}

return nil
})

// wait for completion or timeout
select {
case <-time.After(handlerExecutionTime):
return fmt.Errorf("event handlers timeout")
case <-gCtx.Done():
return nil
for _, c := range defaultHandlers {
e.executeHandler(*c, event)
}
}

// executeHandler is a helper which performs the actual invocation of the callback
func (e *eventExecutor) executeHandler(f func(details EventDetails), event Event) {
defer func() {
if r := recover(); r != nil {
e.logger.Info("recovered from a panic")
}
}()
go func() {
defer func() {
if r := recover(); r != nil {
e.logger.Info("recovered from a panic")
}
}()

f(EventDetails{
providerName: event.ProviderName,
ProviderEventDetails: ProviderEventDetails{
Message: event.Message,
FlagChanges: event.FlagChanges,
EventMetadata: event.EventMetadata,
},
})
f(EventDetails{
providerName: event.ProviderName,
ProviderEventDetails: ProviderEventDetails{
Message: event.Message,
FlagChanges: event.FlagChanges,
EventMetadata: event.EventMetadata,
},
})
}()
}
48 changes: 7 additions & 41 deletions pkg/openfeature/event_executor_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package openfeature

import (
"context"
"reflect"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/open-feature/go-sdk/pkg/openfeature/internal"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

var logger logr.Logger
Expand Down Expand Up @@ -125,7 +123,7 @@ func TestEventHandler_Eventing(t *testing.T) {
select {
case result = <-rsp:
break
case <-time.After(handlerExecutionTime):
case <-time.After(200 * time.Millisecond):
t.Fatalf("timeout - event did not trigger")
}
if result.Message != "ReadyMessage" {
Expand Down Expand Up @@ -193,7 +191,7 @@ func TestEventHandler_Eventing(t *testing.T) {
select {
case result = <-rsp:
break
case <-time.After(handlerExecutionTime):
case <-time.After(200 * time.Millisecond):
t.Fatalf("timeout - event did not trigger")
}

Expand Down Expand Up @@ -271,7 +269,7 @@ func TestEventHandler_clientAssociation(t *testing.T) {
select {
case <-rsp:
t.Fatalf("incorrect association - executor must not have been invoked")
case <-time.After(handlerExecutionTime):
case <-time.After(200 * time.Millisecond):
break
}
}
Expand Down Expand Up @@ -307,7 +305,7 @@ func TestEventHandler_ErrorHandling(t *testing.T) {

// trigger events manually
go func() {
_ = executor.triggerEvent(Event{
executor.triggerEvent(Event{
ProviderName: provider,
EventType: ProviderReady,
ProviderEventDetails: ProviderEventDetails{},
Expand All @@ -317,14 +315,14 @@ func TestEventHandler_ErrorHandling(t *testing.T) {
select {
case <-rsp:
break
case <-time.After(handlerExecutionTime):
case <-time.After(200 * time.Millisecond):
t.Error("API level callback timeout - executor recovery was not successful")
}

select {
case <-rspClient:
break
case <-time.After(handlerExecutionTime):
case <-time.After(200 * time.Millisecond):
t.Error("client callback timeout - executor recovery was not successful")
}
}
Expand Down Expand Up @@ -361,43 +359,11 @@ func TestEventHandler_ProviderReadiness(t *testing.T) {
select {
case <-rsp:
break
case <-time.After(handlerExecutionTime):
case <-time.After(200 * time.Millisecond):
t.Errorf("timedout waiting for ready state callback, but got none")
}
}

// Make sure event handler cannot block
func TestEventHandler_Timeout(t *testing.T) {
timeoutCallback := func(e EventDetails) {
time.Sleep(handlerExecutionTime * 10)
}

executor := newEventExecutor(logger)
executor.registerApiHandler(ProviderReady, &timeoutCallback)

group, ctx := errgroup.WithContext(context.Background())

group.Go(func() error {
return executor.triggerEvent(Event{
ProviderName: "provider",
EventType: ProviderReady,
ProviderEventDetails: ProviderEventDetails{},
}, "", true)
})

select {
case <-ctx.Done():
break
case <-time.After(handlerExecutionTime * 2):
t.Fatalf("timeout while waiting for condition")
}

err := group.Wait()
if err == nil {
t.Errorf("expected timeout error, but got none")
}
}

// Contract tests - registration & removal

func TestEventHandler_Registration(t *testing.T) {
Expand Down

0 comments on commit 295a4eb

Please sign in to comment.