Skip to content

Commit

Permalink
chore: Optimize usage of locking in the cluster [argoproj#602]
Browse files Browse the repository at this point in the history
Closes argoproj#602
Use read lock for handlers getting. Refactor updating resources in the cluster cache to only acquire lock for an update itself, not calling handlers.

Signed-off-by: Andrii Korotkov <andrii.korotkov@verkada.com>
  • Loading branch information
andrii-korotkov-verkada committed Jul 11, 2024
1 parent a22b346 commit d5cb4d1
Showing 1 changed file with 47 additions and 11 deletions.
58 changes: 47 additions & 11 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type clusterCache struct {
clusterResources bool
settings Settings

handlersLock sync.Mutex
handlersLock sync.RWMutex
handlerKey uint64
populateResourceInfoHandler OnPopulateResourceInfoHandler
resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler
Expand Down Expand Up @@ -261,8 +261,8 @@ func (c *clusterCache) OnResourceUpdated(handler OnResourceUpdatedHandler) Unsub
}

func (c *clusterCache) getResourceUpdatedHandlers() []OnResourceUpdatedHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
c.handlersLock.RLock()
defer c.handlersLock.RUnlock()
var handlers []OnResourceUpdatedHandler
for _, h := range c.resourceUpdatedHandlers {
handlers = append(handlers, h)
Expand All @@ -285,8 +285,8 @@ func (c *clusterCache) OnEvent(handler OnEventHandler) Unsubscribe {
}

func (c *clusterCache) getEventHandlers() []OnEventHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
c.handlersLock.RLock()
defer c.handlersLock.RUnlock()
handlers := make([]OnEventHandler, 0, len(c.eventHandlers))
for _, h := range c.eventHandlers {
handlers = append(handlers, h)
Expand Down Expand Up @@ -1113,26 +1113,54 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
return
}

ok, newRes, oldRes, ns := c.writeForResourceEvent(key, event, un)
if ok {
// Requesting a read lock, so that namespace resources aren't written to as they are being read from.
// Since each group/kind is processed by its own goroutine, resource shouldn't be updated between
// releasing write lock in `writeForResourceEvent` and acquiring this read lock, but namespace resources might be
// updated by other goroutines, resulting in a potentially fresher view of resources. However, potentially all
// of these variables can become stale if there's a cluster cache update. With respect to ArgoCD usage, either of
// these scenarios can result in triggering refresh for a wrong app in the worst case, which should be rare and
// doesn't hurt.
c.lock.RLock()
defer c.lock.RUnlock()
for _, h := range c.getResourceUpdatedHandlers() {
h(newRes, oldRes, ns)
}
}
}

// Encapsulates the logic of updating the resource in the cluster cache to limit the scope of locking.
func (c *clusterCache) writeForResourceEvent(key kube.ResourceKey, event watch.EventType, un *unstructured.Unstructured) (bool, *Resource, *Resource, map[kube.ResourceKey]*Resource) {
c.lock.Lock()
defer c.lock.Unlock()
existingNode, exists := c.resources[key]
if event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
ok, existing, ns := c.removeNode(key)
return ok, nil, existing, ns
} else {
return false, nil, nil, nil
}
} else if event != watch.Deleted {
c.onNodeUpdated(existingNode, c.newResource(un))
} else {
newRes, ns := c.updateNode(c.newResource(un))
return true, newRes, existingNode, ns
}
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
func (c *clusterCache) updateNode(newRes *Resource) (*Resource, map[kube.ResourceKey]*Resource) {
c.setNode(newRes)
return newRes, c.nsIndex[newRes.Ref.Namespace]
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
_, ns := c.updateNode(newRes)
for _, h := range c.getResourceUpdatedHandlers() {
h(newRes, oldRes, c.nsIndex[newRes.Ref.Namespace])
h(newRes, oldRes, ns)
}
}

func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
func (c *clusterCache) removeNode(key kube.ResourceKey) (bool, *Resource, map[kube.ResourceKey]*Resource) {
existing, ok := c.resources[key]
if ok {
delete(c.resources, key)
Expand All @@ -1151,6 +1179,14 @@ func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
}
}
}
return true, existing, ns
}
return false, nil, nil
}

func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
ok, existing, ns := c.removeNode(key)
if ok {
for _, h := range c.getResourceUpdatedHandlers() {
h(nil, existing, ns)
}
Expand Down

0 comments on commit d5cb4d1

Please sign in to comment.