Skip to content

Commit

Permalink
putting updateSVID under lock
Browse files Browse the repository at this point in the history
Signed-off-by: Prasad Borole <prasadb@uber.com>
  • Loading branch information
prasadborole1 committed Sep 9, 2022
1 parent d1aacc4 commit 5b9d6b1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
2 changes: 2 additions & 0 deletions pkg/agent/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type manager struct {

// Fields protected by mtx mutex.
mtx *sync.RWMutex
// Protects multiple goroutines from requesting SVID signings at the same time
updateSVIDMu sync.RWMutex

cache Cache
svid svid.Rotator
Expand Down
53 changes: 26 additions & 27 deletions pkg/agent/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ func (m *manager) syncSVIDs(ctx context.Context) (err error) {
// perform syncSVIDs only if using LRU cache
if m.c.SVIDCacheMaxSize > 0 {
m.cache.SyncSVIDsWithSubscribers()
staleEntries := m.cache.GetStaleEntries()
if len(staleEntries) > 0 {
return m.updateSVIDs(ctx, staleEntries, m.cache)
}
return m.updateSVIDs(ctx, m.c.Log.WithField(telemetry.CacheType, "workload"), m.cache)
}
return nil
}
Expand Down Expand Up @@ -109,39 +106,41 @@ func (m *manager) updateCache(ctx context.Context, update *cache.UpdateEntries,
log.WithField(telemetry.OutdatedSVIDs, outdated).Debug("Updating SVIDs with outdated attributes in cache")
}

return m.updateSVIDs(ctx, log, c)
}

func (m *manager) updateSVIDs(ctx context.Context, log logrus.FieldLogger, c SVIDCache) error {
m.updateSVIDMu.Lock()
defer m.updateSVIDMu.Unlock()

staleEntries := c.GetStaleEntries()
if len(staleEntries) > 0 {
var csrs []csrRequest
log.WithFields(logrus.Fields{
telemetry.Count: len(staleEntries),
telemetry.Limit: limits.SignLimitPerIP,
}).Debug("Renewing stale entries")
return m.updateSVIDs(ctx, staleEntries, c)
}
return nil
}

func (m *manager) updateSVIDs(ctx context.Context, entries []*cache.StaleEntry, c SVIDCache) error {
var csrs []csrRequest
for _, entry := range entries {
// we've exceeded the CSR limit, don't make any more CSRs
if len(csrs) >= limits.SignLimitPerIP {
break
for _, entry := range staleEntries {
// we've exceeded the CSR limit, don't make any more CSRs
if len(csrs) >= limits.SignLimitPerIP {
break
}

csrs = append(csrs, csrRequest{
EntryID: entry.Entry.EntryId,
SpiffeID: entry.Entry.SpiffeId,
CurrentSVIDExpiresAt: entry.ExpiresAt,
})
}

csrs = append(csrs, csrRequest{
EntryID: entry.Entry.EntryId,
SpiffeID: entry.Entry.SpiffeId,
CurrentSVIDExpiresAt: entry.ExpiresAt,
})
}

update, err := m.fetchSVIDs(ctx, csrs)
if err != nil {
return err
update, err := m.fetchSVIDs(ctx, csrs)
if err != nil {
return err
}
// the values in `update` now belong to the cache. DO NOT MODIFY.
c.UpdateSVIDs(update)
}
// the values in `update` now belong to the cache. DO NOT MODIFY.
c.UpdateSVIDs(update)

return nil
}

Expand Down

0 comments on commit 5b9d6b1

Please sign in to comment.