diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 7421aca491..8b449dbcb0 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -33,7 +33,7 @@ func NewMeshCatalog(kubeController k8s.Controller, kubeClient kubernetes.Interfa mc.releaseCertificateHandler() go mc.dispatcher() - ticker.InitTicker() + ticker.InitTicker(cfg) return &mc } diff --git a/pkg/catalog/helpers_test.go b/pkg/catalog/helpers_test.go index 534137fa04..9bcc5d25ed 100644 --- a/pkg/catalog/helpers_test.go +++ b/pkg/catalog/helpers_test.go @@ -3,6 +3,7 @@ package catalog import ( "context" "testing" + "time" "github.com/golang/mock/gomock" access "github.com/servicemeshinterface/smi-sdk-go/pkg/apis/access/v1alpha3" @@ -123,6 +124,7 @@ func newFakeMeshCatalogForRoutes(t *testing.T, testParams testParams) *MeshCatal mockKubeController.EXPECT().ListMonitoredNamespaces().Return(listExpectedNs, nil).AnyTimes() mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(testParams.permissiveMode).AnyTimes() + mockConfigurator.EXPECT().GetConfigResyncInterval().Return(time.Duration(0)).AnyTimes() mockMeshSpec.EXPECT().ListTrafficTargets().Return([]*access.TrafficTarget{&tests.TrafficTarget, &tests.BookstoreV2TrafficTarget}).AnyTimes() mockMeshSpec.EXPECT().ListHTTPTrafficSpecs().Return([]*specs.HTTPRouteGroup{&tests.HTTPRouteGroup}).AnyTimes() diff --git a/pkg/ticker/ticker.go b/pkg/ticker/ticker.go index dac6e27d5f..d10d33c0a6 100644 --- a/pkg/ticker/ticker.go +++ b/pkg/ticker/ticker.go @@ -4,25 +4,111 @@ import ( "time" "github.com/openservicemesh/osm/pkg/announcements" + "github.com/openservicemesh/osm/pkg/configurator" "github.com/openservicemesh/osm/pkg/kubernetes/events" "github.com/openservicemesh/osm/pkg/logger" ) +const ( + // Any value under minimumTickerDuration will be understood as a ticker stop + // Conversely, a value equals or above it will be understood as ticker start + minimumTickerDuration = time.Duration(1 * time.Minute) +) + +// ResyncTicker contains the stop configuration for the ticker routines +type ResyncTicker struct { + stopTickerRoutine chan struct{} + stopConfigRoutine chan struct{} +} + var ( log = logger.New("ticker") + // Local reference to global ticker + rTicker *ResyncTicker = nil ) // InitTicker initializes a global ticker that is configured via // pubsub, and triggers global proxy updates also through pubsub. // Upon this function return, the ticker is guaranteed to be started // and ready to receive new events. -func InitTicker() { - doneInit := make(chan struct{}) - go ticker(doneInit) - <-doneInit +func InitTicker(c configurator.Configurator) *ResyncTicker { + if rTicker != nil { + return rTicker + } + + // Start config resync ticker routine + tickerIsReady := make(chan struct{}) + stopTicker := make(chan struct{}) + go ticker(tickerIsReady, stopTicker) + <-tickerIsReady + + // Start config listener + configIsReady := make(chan struct{}) + stopConfig := make(chan struct{}) + go tickerConfigListener(c, configIsReady, stopConfig) + <-configIsReady + + rTicker = &ResyncTicker{ + stopTickerRoutine: stopTicker, + stopConfigRoutine: stopConfig, + } + return rTicker +} + +// Listens to configmap events and notifies ticker routine to start/stop +func tickerConfigListener(cfg configurator.Configurator, ready chan struct{}, stop <-chan struct{}) { + // Subscribe to configuration updates + configMapChannel := events.GetPubSubInstance().Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) + + // Run config listener + // Bootstrap after subscribing + currentDuration := cfg.GetConfigResyncInterval() + + // Initial config + if currentDuration >= minimumTickerDuration { + events.GetPubSubInstance().Publish(events.PubSubMessage{ + AnnouncementType: announcements.TickerStart, + NewObj: currentDuration, + }) + } + close(ready) + + for { + select { + case <-configMapChannel: + newResyncInterval := cfg.GetConfigResyncInterval() + // Skip no changes from current applied conf + if currentDuration == newResyncInterval { + continue + } + + // We have a change + if newResyncInterval >= minimumTickerDuration { + // Notify to re/start ticker + log.Warn().Msgf("Interval %s >= %s, issuing start ticker.", newResyncInterval, minimumTickerDuration) + events.GetPubSubInstance().Publish(events.PubSubMessage{ + AnnouncementType: announcements.TickerStart, + NewObj: newResyncInterval, + }) + } else { + // Notify to ticker to stop + log.Warn().Msgf("Interval %s < %s, issuing ticker stop.", newResyncInterval, minimumTickerDuration) + events.GetPubSubInstance().Publish(events.PubSubMessage{ + AnnouncementType: announcements.TickerStop, + NewObj: newResyncInterval, + }) + } + currentDuration = newResyncInterval + case <-stop: + return + } + } } -func ticker(ready chan struct{}) { +func ticker(ready chan struct{}, stop <-chan struct{}) { ticker := make(<-chan time.Time) tickStart := events.GetPubSubInstance().Subscribe( announcements.TickerStart) @@ -62,6 +148,8 @@ func ticker(ready chan struct{}) { AnnouncementType: announcements.ScheduleProxyBroadcast, }, ) + case <-stop: + return } } } diff --git a/pkg/ticker/ticker_test.go b/pkg/ticker/ticker_test.go index a7d756d4f0..08c7a96168 100644 --- a/pkg/ticker/ticker_test.go +++ b/pkg/ticker/ticker_test.go @@ -4,9 +4,11 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/openservicemesh/osm/pkg/announcements" + "github.com/openservicemesh/osm/pkg/configurator" "github.com/openservicemesh/osm/pkg/kubernetes/events" ) @@ -14,8 +16,11 @@ func TestTicker(t *testing.T) { assert := assert.New(t) broadcastEvents := events.GetPubSubInstance().Subscribe(announcements.ScheduleProxyBroadcast) + defer events.GetPubSubInstance().Unsub(broadcastEvents) + broadcastsReceived := 0 stop := make(chan struct{}) + defer close(stop) go func() { for { select { @@ -28,7 +33,11 @@ func TestTicker(t *testing.T) { }() // Start the ticker routine - InitTicker() + doneInit := make(chan struct{}) + stopTicker := make(chan struct{}) + defer close(stopTicker) + go ticker(doneInit, stop) + <-doneInit // Start ticker, tick at 100ms rate events.GetPubSubInstance().Publish(events.PubSubMessage{ @@ -54,6 +63,64 @@ func TestTicker(t *testing.T) { return firstRead == secondRead }, 6*time.Second, 2*time.Second) +} + +// Test the ConfigMap event listener code for ticker +func TestTickerConfigurator(t *testing.T) { + assert := assert.New(t) + mockConfigurator := configurator.NewMockConfigurator(gomock.NewController(t)) + + tickerStartEvents := events.GetPubSubInstance().Subscribe(announcements.TickerStart) + tickerStopEvents := events.GetPubSubInstance().Subscribe(announcements.TickerStop) + + // First init will expect defaults to false + mockConfigurator.EXPECT().GetConfigResyncInterval().Return(time.Duration(0)) + + doneInit := make(chan struct{}) + stopConfig := make(chan struct{}) + defer close(stopConfig) + go tickerConfigListener(mockConfigurator, doneInit, stopConfig) + <-doneInit + + type tickerConfigTests struct { + mockTickerDurationVal time.Duration + expectStartEvent int + expectStopEvent int + } + + tickerConfTests := []tickerConfigTests{ + {time.Duration(2 * time.Minute), 1, 0}, // default (off) -> 2m, expect start + {time.Duration(2 * time.Minute), 0, 0}, // No change, expect no event + {time.Duration(3 * time.Minute), 1, 0}, // 2m -> enabled 3m, expect start + {time.Duration(0), 0, 1}, // 2m -> stop, expect stop + {time.Duration(30 * time.Second), 0, 1}, // stop -> still smaller than threshold, expect stop + {time.Duration(0), 0, 1}, // stopped -> stopped, still trigger change + {time.Duration(2 * time.Minute), 1, 0}, // stopped -> start, expect start + } + + for _, test := range tickerConfTests { + // Simulate a configmap change, expect the right calls if it is enabled + mockConfigurator.EXPECT().GetConfigResyncInterval().Return(test.mockTickerDurationVal) + events.GetPubSubInstance().Publish(events.PubSubMessage{ + AnnouncementType: announcements.ConfigMapUpdated, + }) + + receivedStartEvent := 0 + receivedStopEvent := 0 + done := false + for !done { + select { + case <-tickerStartEvents: + receivedStartEvent++ + case <-tickerStopEvents: + receivedStopEvent++ + // 500mili should be plenty for this + case <-time.After(500 * time.Millisecond): + done = true + } + } - close(stop) + assert.Equal(test.expectStartEvent, receivedStartEvent) + assert.Equal(test.expectStopEvent, receivedStopEvent) + } }