Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPIRE Agent] add telemetry around LRU cache entry operations #4335

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ The following metrics are emitted:
| Call Counter | `agent_svid`, `rotate` | | The Agent's SVID is being rotated. |
| Sample | `cache_manager`, `expiring_svids` | | The number of expiring SVIDs that the Cache Manager has. |
| Sample | `cache_manager`, `outdated_svids` | | The number of outdated SVIDs that the Cache Manager has. |
| Counter | `lru_cache_entry_add` | | The number of entries added to the LRU cache. |
| Counter | `lru_cache_entry_remove` | | The number of entries removed from the LRU cache. |
| Counter | `lru_cache_entry_update` | | The number of entries updated in the LRU cache. |
| Call Counter | `manager`, `sync`, `fetch_entries_updates` | | The Sync Manager is fetching entries updates. |
| Call Counter | `manager`, `sync`, `fetch_svids_updates` | | The Sync Manager is fetching SVIDs updates. |
| Call Counter | `node`, `attestor`, `new_svid` | | The Node Attestor is calling to get an SVID. |
| Gauge | `lru_cache_record_map_size` | | The total number of entries in the LRU cache records map. |
| Counter | `sds_api`, `connections` | | The SDS API has successfully established a connection. |
| Gauge | `sds_api`, `connections` | | The number of active connection that the SDS API has. |
| Gauge | `lru_cache_svid_map_size` | | The total number of SVIDs in the LRU cache SVID map. |
| Counter | `workload_api`, `bundles_update`, `jwt` | | The Workload API has successfully updated a JWT bundle. |
| Counter | `workload_api`, `connection` | | The Workload API has successfully established a new connection. |
| Gauge | `workload_api`, `connections` | | The number of active connections that the Workload API has. |
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/manager/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type StaleEntry struct {
// Entry stale registration entry
Entry *common.RegistrationEntry
// SVIDs expiration time
ExpiresAt time.Time
SVIDExpiresAt time.Time
}

func New(log logrus.FieldLogger, trustDomain spiffeid.TrustDomain, bundle *Bundle, metrics telemetry.Metrics) *Cache {
Expand Down Expand Up @@ -429,8 +429,8 @@ func (c *Cache) GetStaleEntries() []*StaleEntry {
}

staleEntries = append(staleEntries, &StaleEntry{
Entry: cachedEntry.entry,
ExpiresAt: expiresAt,
Entry: cachedEntry.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,8 @@ func TestGetStaleEntries(t *testing.T) {

// Assert that the entry again returns as stale. This time the `ExpiresAt` field should be populated with the expiration of the SVID.
expectedEntries = []*StaleEntry{{
Entry: cache.records[foo.EntryId].entry,
ExpiresAt: expiredAt,
Entry: cache.records[foo.EntryId].entry,
SVIDExpiresAt: expiredAt,
}}
assert.Equal(t, expectedEntries, cache.GetStaleEntries())

Expand Down
23 changes: 21 additions & 2 deletions pkg/agent/manager/cache/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/spire/pkg/agent/common/backoff"
"github.com/spiffe/spire/pkg/common/telemetry"
agentmetrics "github.com/spiffe/spire/pkg/common/telemetry/agent"
"github.com/spiffe/spire/proto/spire/common"
)

Expand Down Expand Up @@ -174,6 +175,13 @@ func (c *LRUCache) CountSVIDs() int {
return len(c.svids)
}

func (c *LRUCache) CountRecords() int {
c.mu.RLock()
defer c.mu.RUnlock()

return len(c.records)
}

func (c *LRUCache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry {
set, setDone := allocSelectorSet(selectors...)
defer setDone()
Expand Down Expand Up @@ -214,6 +222,7 @@ func (c *LRUCache) NewSubscriber(selectors []*common.Selector) Subscriber {
// updated through a call to UpdateSVIDs.
func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool) {
c.mu.Lock()
defer func() { agentmetrics.SetEntriesMapSize(c.metrics, c.CountRecords()) }()
defer c.mu.Unlock()

// Remove bundles that no longer exist. The bundle for the agent trust
Expand Down Expand Up @@ -262,13 +271,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
fedRem, fedRemDone := allocStringSet()
defer fedRemDone()

entriesRemoved := 0
// Remove records for registration entries that no longer exist
for id, record := range c.records {
if _, ok := update.RegistrationEntries[id]; !ok {
c.log.WithFields(logrus.Fields{
telemetry.Entry: id,
telemetry.SPIFFEID: record.entry.SpiffeId,
}).Debug("Entry removed")
entriesRemoved++

// built a set of selectors for the record being removed, drop the
// record for each selector index, and add the entry selectors to
Expand All @@ -283,8 +294,11 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
delete(c.staleEntries, id)
}
}
agentmetrics.IncrementEntriesRemoved(c.metrics, entriesRemoved)

outdatedEntries := make(map[string]struct{})
entriesUpdated := 0
entriesCreated := 0

// Add/update records for registration entries in the update
for _, newEntry := range update.RegistrationEntries {
Expand Down Expand Up @@ -366,11 +380,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
}
if existingEntry != nil {
log.Debug("Entry updated")
entriesUpdated++
} else {
log.Debug("Entry created")
entriesCreated++
}
}
}
agentmetrics.IncrementEntriesAdded(c.metrics, entriesCreated)
agentmetrics.IncrementEntriesUpdated(c.metrics, entriesUpdated)

// entries with active subscribers which are not cached will be put in staleEntries map;
// irrespective of what svid cache size as we cannot deny identity to a subscriber
Expand Down Expand Up @@ -422,6 +440,7 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R

func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs) {
c.mu.Lock()
defer func() { agentmetrics.SetSVIDMapSize(c.metrics, c.CountSVIDs()) }()
defer c.mu.Unlock()

// Allocate a set of selectors that
Expand Down Expand Up @@ -471,8 +490,8 @@ func (c *LRUCache) GetStaleEntries() []*StaleEntry {
}

staleEntries = append(staleEntries, &StaleEntry{
Entry: cachedEntry.entry,
ExpiresAt: expiresAt,
Entry: cachedEntry.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
76 changes: 74 additions & 2 deletions pkg/agent/manager/cache/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spiffe/spire/pkg/common/telemetry"
"github.com/spiffe/spire/proto/spire/common"
"github.com/spiffe/spire/test/clock"
"github.com/spiffe/spire/test/fakes/fakemetrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -102,6 +103,19 @@ func TestLRUCacheCountSVIDs(t *testing.T) {
require.Equal(t, 1, cache.CountSVIDs())
}

func TestLRUCacheCountRecords(t *testing.T) {
cache := newTestLRUCache(t)
// populate the cache with FOO and BAR without SVIDS
foo := makeRegistrationEntry("FOO", "A")
bar := makeRegistrationEntry("BAR", "B")
updateEntries := &UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(foo, bar),
}
cache.UpdateEntries(updateEntries, nil)
require.Equal(t, 2, cache.CountRecords())
}

func TestLRUCacheBundleChanges(t *testing.T) {
cache := newTestLRUCache(t)

Expand Down Expand Up @@ -564,8 +578,8 @@ func TestLRUCacheGetStaleEntries(t *testing.T) {

// Assert that the entry again returns as stale. This time the `ExpiresAt` field should be populated with the expiration of the SVID.
expectedEntries = []*StaleEntry{{
Entry: cache.records[bar.EntryId].entry,
ExpiresAt: expiredAt,
Entry: cache.records[bar.EntryId].entry,
SVIDExpiresAt: expiredAt,
}}
assert.Equal(t, expectedEntries, cache.GetStaleEntries())

Expand Down Expand Up @@ -872,6 +886,64 @@ func TestSubscribeToLRUCacheChanges(t *testing.T) {
}
}

func TestMetrics(t *testing.T) {
cache := newTestLRUCache(t)
fakeMetrics := fakemetrics.New()
cache.metrics = fakeMetrics

foo := makeRegistrationEntry("FOO", "A")
bar := makeRegistrationEntry("BAR", "B")
updateEntries := &UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(foo, bar),
}

// add entries to cache
cache.UpdateEntries(updateEntries, nil)
assert.Equal(t, []fakemetrics.MetricItem{
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 2},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 0},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 2},
}, fakeMetrics.AllMetrics())

// add SVIDs to cache
updateSVIDs := &UpdateSVIDs{
X509SVIDs: makeX509SVIDs(foo),
}
cache.UpdateSVIDs(updateSVIDs)
assert.Equal(t, []fakemetrics.MetricItem{
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 2},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 0},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 2},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.SVIDMapSize}, Val: 1},
}, fakeMetrics.AllMetrics())

// update entries in cache
fooUpdate := makeRegistrationEntry("FOO", "A", "B")
cache.UpdateEntries(&UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(fooUpdate),
}, nil)
cache.UpdateEntries(updateEntries, nil)
assert.Equal(t, []fakemetrics.MetricItem{
hugordon7 marked this conversation as resolved.
Show resolved Hide resolved
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 2},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 0},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 2},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.SVIDMapSize}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 1},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 1},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 2},
}, fakeMetrics.AllMetrics())
}

func TestNewLRUCache(t *testing.T) {
// negative value
cache := newTestLRUCacheWithConfig(-5, clock.NewMock(t))
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/storecache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ func (c *Cache) GetStaleEntries() []*cache.StaleEntry {
}

staleEntries = append(staleEntries, &cache.StaleEntry{
Entry: record.entry,
ExpiresAt: expiresAt,
Entry: record.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/storecache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,8 +1049,8 @@ func TestGetStaleEntries(t *testing.T) {
Entry: barEntry,
},
{
Entry: fohEntry,
ExpiresAt: expiresAt,
Entry: fohEntry,
SVIDExpiresAt: expiresAt,
},
}
require.Equal(t, expectedStaleEntries, c.GetStaleEntries())
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (m *manager) updateSVIDs(ctx context.Context, log logrus.FieldLogger, c SVI
csrs = append(csrs, csrRequest{
EntryID: entry.Entry.EntryId,
SpiffeID: entry.Entry.SpiffeId,
CurrentSVIDExpiresAt: entry.ExpiresAt,
CurrentSVIDExpiresAt: entry.SVIDExpiresAt,
})
}

Expand Down Expand Up @@ -158,7 +158,10 @@ func (m *manager) fetchSVIDs(ctx context.Context, csrs []csrRequest) (_ *cache.U

privateKeys := make(map[string]crypto.Signer, len(csrs))
for _, csr := range csrs {
log := m.c.Log.WithField("spiffe_id", csr.SpiffeID)
log := m.c.Log.WithFields(logrus.Fields{
"spiffe_id": csr.SpiffeID,
"entry_id": csr.EntryID,
hugordon7 marked this conversation as resolved.
Show resolved Hide resolved
})
if !csr.CurrentSVIDExpiresAt.IsZero() {
log = log.WithField("expires_at", csr.CurrentSVIDExpiresAt.Format(time.RFC3339))
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/common/telemetry/agent/lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package agent

import "github.com/spiffe/spire/pkg/common/telemetry"

func IncrementEntriesAdded(m telemetry.Metrics, entriesAdded int) {
m.IncrCounter([]string{telemetry.EntryAdded}, float32(entriesAdded))
}

func IncrementEntriesUpdated(m telemetry.Metrics, entriesUpdated int) {
m.IncrCounter([]string{telemetry.EntryUpdated}, float32(entriesUpdated))
}

func IncrementEntriesRemoved(m telemetry.Metrics, entriesRemoved int) {
m.IncrCounter([]string{telemetry.EntryRemoved}, float32(entriesRemoved))
}

func SetEntriesMapSize(m telemetry.Metrics, recordMapSize int) {
m.SetGauge([]string{telemetry.RecordMapSize}, float32(recordMapSize))
}

func SetSVIDMapSize(m telemetry.Metrics, svidMapSize int) {
m.SetGauge([]string{telemetry.SVIDMapSize}, float32(svidMapSize))
}
15 changes: 15 additions & 0 deletions pkg/common/telemetry/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ const (
// ElapsedTime tags some duration of time.
ElapsedTime = "elapsed_time"

// EntryAdded is the counter key for when a entry is added to LRU cache
hugordon7 marked this conversation as resolved.
Show resolved Hide resolved
EntryAdded = "lru_cache_entry_add"

// EntryRemoved is the counter key for when a entry is removed from LRU cache
EntryRemoved = "lru_cache_entry_remove"

// EntryUpdated is the counter key for when an LRU cache entry is updated
EntryUpdated = "lru_cache_entry_update"

// EndpointSpiffeID tags endpoint SPIFFE ID
EndpointSpiffeID = "endpoint_spiffe_id"

Expand Down Expand Up @@ -415,6 +424,9 @@ const (
// ReceivedUID is like Received, specific to uid.
ReceivedUID = "received_uid"

// RecordMapSize is the gauge key to hold the size of the LRU cache entries map
RecordMapSize = "lru_cache_record_map_size"

// RefreshHint tags a bundle refresh hint
RefreshHint = "refresh_hint"

Expand Down Expand Up @@ -493,6 +505,9 @@ const (
// with other tags to add clarity
Subject = "subject"

// SVIDMapSize is the gauge key for the size of the LRU cache SVID map
SVIDMapSize = "lru_cache_svid_map_size"

// SVIDResponseLatency tags latency for SVID response
SVIDResponseLatency = "svid_response_latency"

Expand Down