Skip to content

Commit

Permalink
chore: Optimize usage of locking in the cluster [argoproj#602]
Browse files Browse the repository at this point in the history
Closes argoproj#602
Use read lock for handlers getting. Refactor updating resources in the cluster cache to only acquire lock for an update itself, not calling handlers.

Signed-off-by: Andrii Korotkov <andrii.korotkov@verkada.com>
  • Loading branch information
andrii-korotkov-verkada committed Jul 5, 2024
1 parent a22b346 commit 2ab9f88
Showing 1 changed file with 38 additions and 11 deletions.
49 changes: 38 additions & 11 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type clusterCache struct {
clusterResources bool
settings Settings

handlersLock sync.Mutex
handlersLock sync.RWMutex
handlerKey uint64
populateResourceInfoHandler OnPopulateResourceInfoHandler
resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler
Expand Down Expand Up @@ -261,8 +261,8 @@ func (c *clusterCache) OnResourceUpdated(handler OnResourceUpdatedHandler) Unsub
}

func (c *clusterCache) getResourceUpdatedHandlers() []OnResourceUpdatedHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
c.handlersLock.RLock()
defer c.handlersLock.RUnlock()
var handlers []OnResourceUpdatedHandler
for _, h := range c.resourceUpdatedHandlers {
handlers = append(handlers, h)
Expand All @@ -285,8 +285,8 @@ func (c *clusterCache) OnEvent(handler OnEventHandler) Unsubscribe {
}

func (c *clusterCache) getEventHandlers() []OnEventHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
c.handlersLock.RLock()
defer c.handlersLock.RUnlock()
handlers := make([]OnEventHandler, 0, len(c.eventHandlers))
for _, h := range c.eventHandlers {
handlers = append(handlers, h)
Expand Down Expand Up @@ -1113,26 +1113,45 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
return
}

ok, newRes, oldRes, ns := c.updateResource(key, event, un)
if ok {
for _, h := range c.getResourceUpdatedHandlers() {
h(newRes, oldRes, ns)
}
}
}

// Encapsulates the logic of updating the resource in the cluster cache to limit the scope of locking.
func (c *clusterCache) updateResource(key kube.ResourceKey, event watch.EventType, un *unstructured.Unstructured) (bool, *Resource, *Resource, map[kube.ResourceKey]*Resource) {
c.lock.Lock()
defer c.lock.Unlock()
existingNode, exists := c.resources[key]
if event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
ok, existing, ns := c.removeResource(key)
return ok, nil, existing, ns
} else {
return false, nil, nil, nil
}
} else if event != watch.Deleted {
c.onNodeUpdated(existingNode, c.newResource(un))
} else {
newRes, ns := c.setResource(c.newResource(un))
return true, newRes, existingNode, ns
}
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
func (c *clusterCache) setResource(newRes *Resource) (*Resource, map[kube.ResourceKey]*Resource) {
c.setNode(newRes)
return newRes, c.nsIndex[newRes.Ref.Namespace]
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
_, ns := c.setResource(newRes)
for _, h := range c.getResourceUpdatedHandlers() {
h(newRes, oldRes, c.nsIndex[newRes.Ref.Namespace])
h(newRes, oldRes, ns)
}
}

func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
func (c *clusterCache) removeResource(key kube.ResourceKey) (bool, *Resource, map[kube.ResourceKey]*Resource) {
existing, ok := c.resources[key]
if ok {
delete(c.resources, key)
Expand All @@ -1151,6 +1170,14 @@ func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
}
}
}
return true, existing, ns
}
return false, nil, nil
}

func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
ok, existing, ns := c.removeResource(key)
if ok {
for _, h := range c.getResourceUpdatedHandlers() {
h(nil, existing, ns)
}
Expand Down

0 comments on commit 2ab9f88

Please sign in to comment.