Skip to content

Commit

Permalink
add lock to watcher hash map to prevent concurrent access panics
Browse files Browse the repository at this point in the history
  • Loading branch information
jabdoa2 authored and toelke committed May 3, 2024
1 parent 0085b96 commit 761ba66
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 39 deletions.
15 changes: 12 additions & 3 deletions pkg/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"sync"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -34,13 +35,21 @@ import (
type Handler struct {
client.Client
recorder record.EventRecorder
watchedConfigmaps map[types.NamespacedName]map[types.NamespacedName]bool
watchedSecrets map[types.NamespacedName]map[types.NamespacedName]bool
watchedConfigmaps WatcherList
watchedSecrets WatcherList
}

// NewHandler constructs a new instance of Handler
func NewHandler(c client.Client, r record.EventRecorder) *Handler {
return &Handler{Client: c, recorder: r, watchedConfigmaps: make(map[types.NamespacedName]map[types.NamespacedName]bool), watchedSecrets: make(map[types.NamespacedName]map[types.NamespacedName]bool)}
return &Handler{Client: c, recorder: r,
watchedConfigmaps: WatcherList{
watchers: make(map[types.NamespacedName]map[types.NamespacedName]bool),
watchersMutex: &sync.RWMutex{},
},
watchedSecrets: WatcherList{
watchers: make(map[types.NamespacedName]map[types.NamespacedName]bool),
watchersMutex: &sync.RWMutex{},
}}
}

// HandleDeployment is called by the deployment controller to reconcile deployments
Expand Down
36 changes: 18 additions & 18 deletions pkg/core/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ var _ = Describe("Wave controller Suite", func() {
})

It("Is watched by the handler", func() {
Expect(h.GetWatchedConfigmaps()[example1Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps()[example2Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps()[example3Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets()[example1Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets()[example2Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets()[example3Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps().watchers[example1Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps().watchers[example2Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps().watchers[example3Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets().watchers[example1Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets().watchers[example2Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets().watchers[example3Name]).To(HaveKey(instanceName))
})

It("Sends an event when updating the hash", func() {
Expand Down Expand Up @@ -240,12 +240,12 @@ var _ = Describe("Wave controller Suite", func() {
})

It("Is is not longer watched by the handler", func() {
Expect(h.GetWatchedConfigmaps()[example1Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps()[example2Name]).NotTo(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps()[example3Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets()[example1Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets()[example2Name]).NotTo(HaveKey(instanceName))
Expect(h.GetWatchedSecrets()[example3Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps().watchers[example1Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps().watchers[example2Name]).NotTo(HaveKey(instanceName))
Expect(h.GetWatchedConfigmaps().watchers[example3Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets().watchers[example1Name]).To(HaveKey(instanceName))
Expect(h.GetWatchedSecrets().watchers[example2Name]).NotTo(HaveKey(instanceName))
Expect(h.GetWatchedSecrets().watchers[example3Name]).To(HaveKey(instanceName))
})
})

Expand Down Expand Up @@ -464,8 +464,8 @@ var _ = Describe("Wave controller Suite", func() {
})

It("No longer is watched by the handler", func() {
Expect(len(h.GetWatchedConfigmaps())).To(Equal(0))
Expect(len(h.GetWatchedSecrets())).To(Equal(0))
Expect(len(h.GetWatchedConfigmaps().watchers)).To(Equal(0))
Expect(len(h.GetWatchedSecrets().watchers)).To(Equal(0))
})
})

Expand All @@ -486,8 +486,8 @@ var _ = Describe("Wave controller Suite", func() {
})

It("No longer is watched by the handler", func() {
Expect(len(h.GetWatchedConfigmaps())).To(Equal(0))
Expect(len(h.GetWatchedSecrets())).To(Equal(0))
Expect(len(h.GetWatchedConfigmaps().watchers)).To(Equal(0))
Expect(len(h.GetWatchedSecrets().watchers)).To(Equal(0))
})
})
})
Expand All @@ -499,8 +499,8 @@ var _ = Describe("Wave controller Suite", func() {
})

It("Is not watched by the handler", func() {
Expect(len(h.GetWatchedConfigmaps())).To(Equal(0))
Expect(len(h.GetWatchedSecrets())).To(Equal(0))
Expect(len(h.GetWatchedConfigmaps().watchers)).To(Equal(0))
Expect(len(h.GetWatchedSecrets().watchers)).To(Equal(0))
})

It("Doesn't add a config hash to the Pod Template", func() {
Expand Down
50 changes: 32 additions & 18 deletions pkg/core/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -13,14 +14,18 @@ import (

var _ handler.EventHandler = &enqueueRequestForWatcher{}

type WatcherList struct {
watchers map[types.NamespacedName]map[types.NamespacedName]bool
watchersMutex *sync.RWMutex
}

type enqueueRequestForWatcher struct {
// watcherList
watcherList map[types.NamespacedName]map[types.NamespacedName]bool
WatcherList
}

func EnqueueRequestForWatcher(watcherList map[types.NamespacedName]map[types.NamespacedName]bool) handler.EventHandler {
func EnqueueRequestForWatcher(watcherList WatcherList) handler.EventHandler {
e := &enqueueRequestForWatcher{
watcherList: watcherList,
WatcherList: watcherList,
}
return e
}
Expand Down Expand Up @@ -50,54 +55,63 @@ func (e *enqueueRequestForWatcher) Generic(ctx context.Context, evt event.Generi
// all owners of object
func (e *enqueueRequestForWatcher) queueOwnerReconcileRequest(object metav1.Object, q workqueue.RateLimitingInterface) {
name := GetNamespacedNameFromObject(object)
if watchers, ok := e.watcherList[name]; ok {
e.watchersMutex.Lock()
if watchers, ok := e.watchers[name]; ok {
for watcher := range watchers {
request := reconcile.Request{NamespacedName: watcher}
q.Add(request)
}
}
e.watchersMutex.Unlock()
}

func (h *Handler) GetWatchedConfigmaps() map[types.NamespacedName]map[types.NamespacedName]bool {
func (h *Handler) GetWatchedConfigmaps() WatcherList {
return h.watchedConfigmaps
}

func (h *Handler) GetWatchedSecrets() map[types.NamespacedName]map[types.NamespacedName]bool {
func (h *Handler) GetWatchedSecrets() WatcherList {
return h.watchedSecrets
}

func (h *Handler) watchChildrenForInstance(instance podController, configMaps configMetadataMap, secrets configMetadataMap) {
instanceName := GetNamespacedNameFromObject(instance)
h.watchedConfigmaps.watchersMutex.Lock()
for childName := range configMaps {

if _, ok := h.watchedConfigmaps[childName]; !ok {
h.watchedConfigmaps[childName] = map[types.NamespacedName]bool{}
if _, ok := h.watchedConfigmaps.watchers[childName]; !ok {
h.watchedConfigmaps.watchers[childName] = map[types.NamespacedName]bool{}
}
h.watchedConfigmaps[childName][instanceName] = true
h.watchedConfigmaps.watchers[childName][instanceName] = true
}
h.watchedConfigmaps.watchersMutex.Unlock()
h.watchedSecrets.watchersMutex.Lock()
for childName := range secrets {
if _, ok := h.watchedSecrets[childName]; !ok {
h.watchedSecrets[childName] = map[types.NamespacedName]bool{}
if _, ok := h.watchedSecrets.watchers[childName]; !ok {
h.watchedSecrets.watchers[childName] = map[types.NamespacedName]bool{}
}
h.watchedSecrets[childName][instanceName] = true
h.watchedSecrets.watchers[childName][instanceName] = true
}
h.watchedSecrets.watchersMutex.Unlock()
}

func (h *Handler) removeWatchesForInstance(instance podController) {
h.RemoveWatches(GetNamespacedNameFromObject(instance))
}

func (h *Handler) RemoveWatches(instanceName types.NamespacedName) {
for child, watchers := range h.watchedConfigmaps {
h.watchedConfigmaps.watchersMutex.Lock()
for child, watchers := range h.watchedConfigmaps.watchers {
delete(watchers, instanceName)
if len(watchers) == 0 {
delete(h.watchedConfigmaps, child)
delete(h.watchedConfigmaps.watchers, child)
}
}
for child, watchers := range h.watchedSecrets {
h.watchedConfigmaps.watchersMutex.Unlock()
h.watchedSecrets.watchersMutex.Lock()
for child, watchers := range h.watchedSecrets.watchers {
delete(watchers, instanceName)
if len(watchers) == 0 {
delete(h.watchedSecrets, child)
delete(h.watchedSecrets.watchers, child)
}
}
h.watchedSecrets.watchersMutex.Unlock()
}

0 comments on commit 761ba66

Please sign in to comment.