Skip to content

Commit

Permalink
ticker: Add config hooks to start/stop ticker
Browse files Browse the repository at this point in the history
Ticker will run it's own configmap listener and watch for changes
on the ConfigResyncInterval flag, and configure ticker appropriately.

Signed-off-by: edu <eduser25@gmail.com>
  • Loading branch information
eduser25 committed Mar 23, 2021
1 parent 890430f commit 3562e0c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewMeshCatalog(kubeController k8s.Controller, kubeClient kubernetes.Interfa
mc.releaseCertificateHandler()

go mc.dispatcher()
ticker.InitTicker()
ticker.InitTicker(cfg)

return &mc
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/catalog/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package catalog
import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
access "github.com/servicemeshinterface/smi-sdk-go/pkg/apis/access/v1alpha3"
Expand Down Expand Up @@ -123,6 +124,7 @@ func newFakeMeshCatalogForRoutes(t *testing.T, testParams testParams) *MeshCatal
mockKubeController.EXPECT().ListMonitoredNamespaces().Return(listExpectedNs, nil).AnyTimes()

mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(testParams.permissiveMode).AnyTimes()
mockConfigurator.EXPECT().GetConfigResyncInterval().Return(time.Duration(0)).AnyTimes()

mockMeshSpec.EXPECT().ListTrafficTargets().Return([]*access.TrafficTarget{&tests.TrafficTarget, &tests.BookstoreV2TrafficTarget}).AnyTimes()
mockMeshSpec.EXPECT().ListHTTPTrafficSpecs().Return([]*specs.HTTPRouteGroup{&tests.HTTPRouteGroup}).AnyTimes()
Expand Down
98 changes: 93 additions & 5 deletions pkg/ticker/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,111 @@ import (
"time"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/kubernetes/events"
"github.com/openservicemesh/osm/pkg/logger"
)

const (
// Any value under minimumTickerDuration will be understood as a ticker stop
// Conversely, a value equals or above it will be understood as ticker start
minimumTickerDuration = time.Duration(1 * time.Minute)
)

// ResyncTicker contains the stop configuration for the ticker routines
type ResyncTicker struct {
stopTickerRoutine chan struct{}
stopConfigRoutine chan struct{}
}

var (
log = logger.New("ticker")
// Local reference to global ticker
rTicker *ResyncTicker = nil
)

// InitTicker initializes a global ticker that is configured via
// pubsub, and triggers global proxy updates also through pubsub.
// Upon this function return, the ticker is guaranteed to be started
// and ready to receive new events.
func InitTicker() {
doneInit := make(chan struct{})
go ticker(doneInit)
<-doneInit
func InitTicker(c configurator.Configurator) *ResyncTicker {
if rTicker != nil {
return rTicker
}

// Start config resync ticker routine
tickerIsReady := make(chan struct{})
stopTicker := make(chan struct{})
go ticker(tickerIsReady, stopTicker)
<-tickerIsReady

// Start config listener
configIsReady := make(chan struct{})
stopConfig := make(chan struct{})
go tickerConfigListener(c, configIsReady, stopConfig)
<-configIsReady

rTicker = &ResyncTicker{
stopTickerRoutine: stopTicker,
stopConfigRoutine: stopConfig,
}
return rTicker
}

// Listens to configmap events and notifies ticker routine to start/stop
func tickerConfigListener(cfg configurator.Configurator, ready chan struct{}, stop <-chan struct{}) {
// Subscribe to configuration updates
configMapChannel := events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)

// Run config listener
// Bootstrap after subscribing
currentDuration := cfg.GetConfigResyncInterval()

// Initial config
if currentDuration >= minimumTickerDuration {
events.GetPubSubInstance().Publish(events.PubSubMessage{
AnnouncementType: announcements.TickerStart,
NewObj: currentDuration,
})
}
close(ready)

for {
select {
case <-configMapChannel:
newResyncInterval := cfg.GetConfigResyncInterval()
// Skip no changes from current applied conf
if currentDuration == newResyncInterval {
continue
}

// We have a change
if newResyncInterval >= minimumTickerDuration {
// Notify to re/start ticker
log.Warn().Msgf("Interval %s >= %s, issuing start ticker.", newResyncInterval, minimumTickerDuration)
events.GetPubSubInstance().Publish(events.PubSubMessage{
AnnouncementType: announcements.TickerStart,
NewObj: newResyncInterval,
})
} else {
// Notify to ticker to stop
log.Warn().Msgf("Interval %s < %s, issuing ticker stop.", newResyncInterval, minimumTickerDuration)
events.GetPubSubInstance().Publish(events.PubSubMessage{
AnnouncementType: announcements.TickerStop,
NewObj: newResyncInterval,
})
}
currentDuration = newResyncInterval
case <-stop:
return
}
}
}

func ticker(ready chan struct{}) {
func ticker(ready chan struct{}, stop <-chan struct{}) {
ticker := make(<-chan time.Time)
tickStart := events.GetPubSubInstance().Subscribe(
announcements.TickerStart)
Expand Down Expand Up @@ -62,6 +148,8 @@ func ticker(ready chan struct{}) {
AnnouncementType: announcements.ScheduleProxyBroadcast,
},
)
case <-stop:
return
}
}
}
71 changes: 69 additions & 2 deletions pkg/ticker/ticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/kubernetes/events"
)

func TestTicker(t *testing.T) {
assert := assert.New(t)

broadcastEvents := events.GetPubSubInstance().Subscribe(announcements.ScheduleProxyBroadcast)
defer events.GetPubSubInstance().Unsub(broadcastEvents)

broadcastsReceived := 0
stop := make(chan struct{})
defer close(stop)
go func() {
for {
select {
Expand All @@ -28,7 +33,11 @@ func TestTicker(t *testing.T) {
}()

// Start the ticker routine
InitTicker()
doneInit := make(chan struct{})
stopTicker := make(chan struct{})
defer close(stopTicker)
go ticker(doneInit, stop)
<-doneInit

// Start ticker, tick at 100ms rate
events.GetPubSubInstance().Publish(events.PubSubMessage{
Expand All @@ -54,6 +63,64 @@ func TestTicker(t *testing.T) {

return firstRead == secondRead
}, 6*time.Second, 2*time.Second)
}

// Test the ConfigMap event listener code for ticker
func TestTickerConfigurator(t *testing.T) {
assert := assert.New(t)
mockConfigurator := configurator.NewMockConfigurator(gomock.NewController(t))

tickerStartEvents := events.GetPubSubInstance().Subscribe(announcements.TickerStart)
tickerStopEvents := events.GetPubSubInstance().Subscribe(announcements.TickerStop)

// First init will expect defaults to false
mockConfigurator.EXPECT().GetConfigResyncInterval().Return(time.Duration(0))

doneInit := make(chan struct{})
stopConfig := make(chan struct{})
defer close(stopConfig)
go tickerConfigListener(mockConfigurator, doneInit, stopConfig)
<-doneInit

type tickerConfigTests struct {
mockTickerDurationVal time.Duration
expectStartEvent int
expectStopEvent int
}

tickerConfTests := []tickerConfigTests{
{time.Duration(2 * time.Minute), 1, 0}, // default (off) -> 2m, expect start
{time.Duration(2 * time.Minute), 0, 0}, // No change, expect no event
{time.Duration(3 * time.Minute), 1, 0}, // 2m -> enabled 3m, expect start
{time.Duration(0), 0, 1}, // 2m -> stop, expect stop
{time.Duration(30 * time.Second), 0, 1}, // stop -> still smaller than threshold, expect stop
{time.Duration(0), 0, 1}, // stopped -> stopped, still trigger change
{time.Duration(2 * time.Minute), 1, 0}, // stopped -> start, expect start
}

for _, test := range tickerConfTests {
// Simulate a configmap change, expect the right calls if it is enabled
mockConfigurator.EXPECT().GetConfigResyncInterval().Return(test.mockTickerDurationVal)
events.GetPubSubInstance().Publish(events.PubSubMessage{
AnnouncementType: announcements.ConfigMapUpdated,
})

receivedStartEvent := 0
receivedStopEvent := 0
done := false
for !done {
select {
case <-tickerStartEvents:
receivedStartEvent++
case <-tickerStopEvents:
receivedStopEvent++
// 500mili should be plenty for this
case <-time.After(500 * time.Millisecond):
done = true
}
}

close(stop)
assert.Equal(test.expectStartEvent, receivedStartEvent)
assert.Equal(test.expectStopEvent, receivedStopEvent)
}
}

0 comments on commit 3562e0c

Please sign in to comment.