diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 50791920e..83016e74c 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -214,7 +214,7 @@ type clusterCache struct { clusterResources bool settings Settings - handlersLock sync.Mutex + handlersLock sync.RWMutex handlerKey uint64 populateResourceInfoHandler OnPopulateResourceInfoHandler resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler @@ -261,8 +261,8 @@ func (c *clusterCache) OnResourceUpdated(handler OnResourceUpdatedHandler) Unsub } func (c *clusterCache) getResourceUpdatedHandlers() []OnResourceUpdatedHandler { - c.handlersLock.Lock() - defer c.handlersLock.Unlock() + c.handlersLock.RLock() + defer c.handlersLock.RUnlock() var handlers []OnResourceUpdatedHandler for _, h := range c.resourceUpdatedHandlers { handlers = append(handlers, h) @@ -285,8 +285,8 @@ func (c *clusterCache) OnEvent(handler OnEventHandler) Unsubscribe { } func (c *clusterCache) getEventHandlers() []OnEventHandler { - c.handlersLock.Lock() - defer c.handlersLock.Unlock() + c.handlersLock.RLock() + defer c.handlersLock.RUnlock() handlers := make([]OnEventHandler, 0, len(c.eventHandlers)) for _, h := range c.eventHandlers { handlers = append(handlers, h) @@ -1113,26 +1113,51 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst return } + ok, newRes, oldRes, ns := c.updateResource(key, event, un) + // Still requesting read lock since namespace resources can have a race of being used and being updated. + // Some race is still possible between releasing the lock and acquiring this, but since each resource type + // is processed by one goroutine, at most we'd get a fresher namespace resources, which should be fine, + // at least for existing resource update handler. + c.lock.RLock() + defer c.lock.RUnlock() + if ok { + for _, h := range c.getResourceUpdatedHandlers() { + h(newRes, oldRes, ns) + } + } +} + +// Encapsulates the logic of updating the resource in the cluster cache to limit the scope of locking. +func (c *clusterCache) updateResource(key kube.ResourceKey, event watch.EventType, un *unstructured.Unstructured) (bool, *Resource, *Resource, map[kube.ResourceKey]*Resource) { c.lock.Lock() defer c.lock.Unlock() existingNode, exists := c.resources[key] if event == watch.Deleted { if exists { - c.onNodeRemoved(key) + ok, existing, ns := c.removeResource(key) + return ok, nil, existing, ns + } else { + return false, nil, nil, nil } - } else if event != watch.Deleted { - c.onNodeUpdated(existingNode, c.newResource(un)) + } else { + newRes, ns := c.setResource(c.newResource(un)) + return true, newRes, existingNode, ns } } -func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) { +func (c *clusterCache) setResource(newRes *Resource) (*Resource, map[kube.ResourceKey]*Resource) { c.setNode(newRes) + return newRes, c.nsIndex[newRes.Ref.Namespace] +} + +func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) { + _, ns := c.setResource(newRes) for _, h := range c.getResourceUpdatedHandlers() { - h(newRes, oldRes, c.nsIndex[newRes.Ref.Namespace]) + h(newRes, oldRes, ns) } } -func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) { +func (c *clusterCache) removeResource(key kube.ResourceKey) (bool, *Resource, map[kube.ResourceKey]*Resource) { existing, ok := c.resources[key] if ok { delete(c.resources, key) @@ -1151,6 +1176,14 @@ func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) { } } } + return true, existing, ns + } + return false, nil, nil +} + +func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) { + ok, existing, ns := c.removeResource(key) + if ok { for _, h := range c.getResourceUpdatedHandlers() { h(nil, existing, ns) }