Skip to content

Commit

Permalink
[SPIRE Agent] add telemetry around LRU cache entry operations
Browse files Browse the repository at this point in the history
Signed-off-by: gordonhu7 <hu.gordon@hotmail.com>
  • Loading branch information
gordonhu7 committed Jul 11, 2023
1 parent 3117f7b commit 2270bc9
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 15 deletions.
6 changes: 3 additions & 3 deletions pkg/agent/manager/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type StaleEntry struct {
// Entry stale registration entry
Entry *common.RegistrationEntry
// SVIDs expiration time
ExpiresAt time.Time
SVIDExpiresAt time.Time
}

func New(log logrus.FieldLogger, trustDomain spiffeid.TrustDomain, bundle *Bundle, metrics telemetry.Metrics) *Cache {
Expand Down Expand Up @@ -429,8 +429,8 @@ func (c *Cache) GetStaleEntries() []*StaleEntry {
}

staleEntries = append(staleEntries, &StaleEntry{
Entry: cachedEntry.entry,
ExpiresAt: expiresAt,
Entry: cachedEntry.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,8 @@ func TestGetStaleEntries(t *testing.T) {

// Assert that the entry again returns as stale. This time the `ExpiresAt` field should be populated with the expiration of the SVID.
expectedEntries = []*StaleEntry{{
Entry: cache.records[foo.EntryId].entry,
ExpiresAt: expiredAt,
Entry: cache.records[foo.EntryId].entry,
SVIDExpiresAt: expiredAt,
}}
assert.Equal(t, expectedEntries, cache.GetStaleEntries())

Expand Down
27 changes: 25 additions & 2 deletions pkg/agent/manager/cache/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
const (
DefaultSVIDCacheMaxSize = 1000
SVIDSyncInterval = 500 * time.Millisecond
SVIDMapSize = "lru_cache_svid_map_size"
RecordMapSize = "lru_cache_record_map_size"
EntryAdded = "lru_cache_entry_add"
EntryUpdated = "lru_cache_entry_update"
EntryRemoved = "lru_cache_entry_remove"
)

// Cache caches each registration entry, bundles, and JWT SVIDs for the agent.
Expand Down Expand Up @@ -174,6 +179,13 @@ func (c *LRUCache) CountSVIDs() int {
return len(c.svids)
}

func (c *LRUCache) CountRecords() int {
c.mu.RLock()
defer c.mu.RUnlock()

return len(c.records)
}

func (c *LRUCache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry {
set, setDone := allocSelectorSet(selectors...)
defer setDone()
Expand Down Expand Up @@ -214,6 +226,7 @@ func (c *LRUCache) NewSubscriber(selectors []*common.Selector) Subscriber {
// updated through a call to UpdateSVIDs.
func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool) {
c.mu.Lock()
defer func() { c.metrics.SetGauge([]string{RecordMapSize}, float32(c.CountRecords())) }()
defer c.mu.Unlock()

// Remove bundles that no longer exist. The bundle for the agent trust
Expand Down Expand Up @@ -262,13 +275,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
fedRem, fedRemDone := allocStringSet()
defer fedRemDone()

entriesRemoved := 0
// Remove records for registration entries that no longer exist
for id, record := range c.records {
if _, ok := update.RegistrationEntries[id]; !ok {
c.log.WithFields(logrus.Fields{
telemetry.Entry: id,
telemetry.SPIFFEID: record.entry.SpiffeId,
}).Debug("Entry removed")
entriesRemoved++

// built a set of selectors for the record being removed, drop the
// record for each selector index, and add the entry selectors to
Expand All @@ -283,8 +298,11 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
delete(c.staleEntries, id)
}
}
c.metrics.IncrCounter([]string{EntryRemoved}, float32(entriesRemoved))

outdatedEntries := make(map[string]struct{})
entriesUpdated := 0
entriesCreated := 0

// Add/update records for registration entries in the update
for _, newEntry := range update.RegistrationEntries {
Expand Down Expand Up @@ -366,11 +384,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
}
if existingEntry != nil {
log.Debug("Entry updated")
entriesUpdated++
} else {
log.Debug("Entry created")
entriesCreated++
}
}
}
c.metrics.IncrCounter([]string{EntryUpdated}, float32(entriesUpdated))
c.metrics.IncrCounter([]string{EntryAdded}, float32(entriesCreated))

// entries with active subscribers which are not cached will be put in staleEntries map;
// irrespective of what svid cache size as we cannot deny identity to a subscriber
Expand Down Expand Up @@ -422,6 +444,7 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R

func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs) {
c.mu.Lock()
defer func() { c.metrics.SetGauge([]string{SVIDMapSize}, float32(c.CountSVIDs())) }()
defer c.mu.Unlock()

// Allocate a set of selectors that
Expand Down Expand Up @@ -471,8 +494,8 @@ func (c *LRUCache) GetStaleEntries() []*StaleEntry {
}

staleEntries = append(staleEntries, &StaleEntry{
Entry: cachedEntry.entry,
ExpiresAt: expiresAt,
Entry: cachedEntry.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
59 changes: 57 additions & 2 deletions pkg/agent/manager/cache/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spiffe/spire/pkg/common/telemetry"
"github.com/spiffe/spire/proto/spire/common"
"github.com/spiffe/spire/test/clock"
"github.com/spiffe/spire/test/fakes/fakemetrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -102,6 +103,19 @@ func TestLRUCacheCountSVIDs(t *testing.T) {
require.Equal(t, 1, cache.CountSVIDs())
}

func TestLRUCacheCountRecords(t *testing.T) {
cache := newTestLRUCache(t)
// populate the cache with FOO and BAR without SVIDS
foo := makeRegistrationEntry("FOO", "A")
bar := makeRegistrationEntry("BAR", "B")
updateEntries := &UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(foo, bar),
}
cache.UpdateEntries(updateEntries, nil)
require.Equal(t, 2, cache.CountRecords())
}

func TestLRUCacheBundleChanges(t *testing.T) {
cache := newTestLRUCache(t)

Expand Down Expand Up @@ -564,8 +578,8 @@ func TestLRUCacheGetStaleEntries(t *testing.T) {

// Assert that the entry again returns as stale. This time the `ExpiresAt` field should be populated with the expiration of the SVID.
expectedEntries = []*StaleEntry{{
Entry: cache.records[bar.EntryId].entry,
ExpiresAt: expiredAt,
Entry: cache.records[bar.EntryId].entry,
SVIDExpiresAt: expiredAt,
}}
assert.Equal(t, expectedEntries, cache.GetStaleEntries())

Expand Down Expand Up @@ -872,6 +886,47 @@ func TestSubscribeToLRUCacheChanges(t *testing.T) {
}
}

func TestMetrics(t *testing.T) {
cache := newTestLRUCache(t)
fakeMetrics := fakemetrics.New()
cache.metrics = fakeMetrics

foo := makeRegistrationEntry("FOO", "A")
bar := makeRegistrationEntry("BAR", "B")
updateEntries := &UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(foo, bar),
}
updateSVIDs := &UpdateSVIDs{
X509SVIDs: makeX509SVIDs(foo),
}
cache.UpdateEntries(updateEntries, nil)
cache.UpdateSVIDs(updateSVIDs)

fooUpdate := makeRegistrationEntry("FOO", "A", "B")
cache.UpdateEntries(&UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(fooUpdate),
}, nil)
cache.UpdateEntries(updateEntries, nil)

assert.Equal(t, []fakemetrics.MetricItem{
{Type: fakemetrics.IncrCounterType, Key: []string{EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryUpdated}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryAdded}, Val: 2},
{Type: fakemetrics.SetGaugeType, Key: []string{RecordMapSize}, Val: 2},
{Type: fakemetrics.SetGaugeType, Key: []string{SVIDMapSize}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryRemoved}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryUpdated}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryAdded}, Val: 0},
{Type: fakemetrics.SetGaugeType, Key: []string{RecordMapSize}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryUpdated}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryAdded}, Val: 1},
{Type: fakemetrics.SetGaugeType, Key: []string{RecordMapSize}, Val: 2},
}, fakeMetrics.AllMetrics())
}

func TestNewLRUCache(t *testing.T) {
// negative value
cache := newTestLRUCacheWithConfig(-5, clock.NewMock(t))
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/storecache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ func (c *Cache) GetStaleEntries() []*cache.StaleEntry {
}

staleEntries = append(staleEntries, &cache.StaleEntry{
Entry: record.entry,
ExpiresAt: expiresAt,
Entry: record.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/storecache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,8 +1049,8 @@ func TestGetStaleEntries(t *testing.T) {
Entry: barEntry,
},
{
Entry: fohEntry,
ExpiresAt: expiresAt,
Entry: fohEntry,
SVIDExpiresAt: expiresAt,
},
}
require.Equal(t, expectedStaleEntries, c.GetStaleEntries())
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (m *manager) updateSVIDs(ctx context.Context, log logrus.FieldLogger, c SVI
csrs = append(csrs, csrRequest{
EntryID: entry.Entry.EntryId,
SpiffeID: entry.Entry.SpiffeId,
CurrentSVIDExpiresAt: entry.ExpiresAt,
CurrentSVIDExpiresAt: entry.SVIDExpiresAt,
})
}

Expand Down Expand Up @@ -158,7 +158,10 @@ func (m *manager) fetchSVIDs(ctx context.Context, csrs []csrRequest) (_ *cache.U

privateKeys := make(map[string]crypto.Signer, len(csrs))
for _, csr := range csrs {
log := m.c.Log.WithField("spiffe_id", csr.SpiffeID)
log := m.c.Log.WithFields(logrus.Fields{
"spiffe_id": csr.SpiffeID,
"entry_id": csr.EntryID,
})
if !csr.CurrentSVIDExpiresAt.IsZero() {
log = log.WithField("expires_at", csr.CurrentSVIDExpiresAt.Format(time.RFC3339))
}
Expand Down

0 comments on commit 2270bc9

Please sign in to comment.