diff --git a/doc/telemetry/telemetry.md b/doc/telemetry/telemetry.md index 3c385d3426..a5bb1702a6 100644 --- a/doc/telemetry/telemetry.md +++ b/doc/telemetry/telemetry.md @@ -51,7 +51,15 @@ The following metrics are emitted: | Call Counter | `datastore`, `registration_entry_event`, `list` | | The Datastore is listing a registration entry events. | | Call Counter | `datastore`, `registration_entry_event`, `prune` | | The Datastore is pruning expired registration entry events. | | Call Counter | `datastore`, `registration_entry_event`, `fetch` | | The Datastore is fetching a specific registration entry event. | -| Call Counter | `entry`, `cache`, `reload` | | The Server is reloading its in-memory entry cache from the datastore. | +| Call Counter | `entry`, `cache`, `reload` | | The Server is reloading its in-memory entry cache from the datastore | +| Gauge | `node`, `agents_by_id_cache`, `count` | | The Server is re-hydrating the agents-by-id event-based cache | +| Gauge | `node`, `agents_by_expiresat_cache`, `count` | | The Server is re-hydrating the agents-by-expiresat event-based cache | +| Gauge | `node`, `skipped_node_event_ids`, `count` | | The count of skipped ids detected in the last `sql_transaction_timout` period. For databases that autoincrement ids by more than one, this number will overreport the skipped ids. [Issue](https://github.com/spiffe/spire/issues/5341) | +| Gauge | `entry`, `nodealiases_by_entryid_cache`, `count` | | The Server is re-hydrating the nodealiases-by-entryid event-based cache | +| Gauge | `entry`, `nodealiases_by_selector_cache`, `count` | | The Server is re-hydrating the nodealiases-by-selector event-based cache | +| Gauge | `entry`, `entries_by_entryid_cache`, `count` | | The Server is re-hydrating the entries-by-entryid event-based cache | +| Gauge | `entry`, `entries_by_parentid_cache`, `count` | | The Server is re-hydrating the entries-by-parentid event-based cache | +| Gauge | `entry`, `skipped_entry_event_ids`, `count` | | The count of skipped ids detected in the last sql_transaction_timout period. For databases that autoincrement ids by more than one, this number will overreport the skipped ids. [Issue](https://github.com/spiffe/spire/issues/5341) | | Counter | `manager`, `jwt_key`, `activate` | | The CA manager has successfully activated a JWT Key. | | Gauge | `manager`, `x509_ca`, `rotate`, `ttl` | `trust_domain_id` | The CA manager is rotating the X.509 CA with a given TTL for a specific Trust Domain. | | Call Counter | `registration_entry`, `manager`, `prune` | | The Registration manager is pruning entries. | diff --git a/pkg/common/telemetry/names.go b/pkg/common/telemetry/names.go index a26c8f9f9f..b24ed815d6 100644 --- a/pkg/common/telemetry/names.go +++ b/pkg/common/telemetry/names.go @@ -663,6 +663,24 @@ const ( // Cache functionality related to a cache Cache = "cache" + // AgentsByIDCache functionality related to the agent btree cache indexed by ID + AgentsByIDCache = "agents_by_id_cache" + + // AgentsByExpiresAtCache functionality related to the agent btree cache indexed by ExpiresAt + AgentsByExpiresAtCache = "agents_by_expiresat_cache" + + // NodeAliasesByEntryIDCache functionality related to the node-aliases btree cache indexed by EntryID + NodeAliasesByEntryIDCache = "nodealiases_by_entryid_cache" + + // NodeAliasesBySelectorCache functionality related to the node-aliases btree cache indexed by Selector + NodeAliasesBySelectorCache = "nodealiases_by_selector_cache" + + // EntriesByEntryIDCache functionality related to the entries btree cache indexed by EntryID + EntriesByEntryIDCache = "entries_by_entryid_cache" + + // EntriesByParentIDCache functionality related to the entries btree cache indexed by ParentID + EntriesByParentIDCache = "entries_by_parentid_cache" + // Cache type tag CacheType = "cache_type" @@ -861,8 +879,11 @@ const ( // ListAgents functionality related to listing agents ListAgents = "list_agents" - // CountEntries functionality related to counting all registration entries - CountEntries = "count_entries" + // SkippedEntryEventIDs functionality related to counting missed entry event IDs + SkippedEntryEventIDs = "skipped_entry_event_ids" + + // SkippedNodeEventIDs functionality related to counting missed node event IDs + SkippedNodeEventIDs = "skipped_node_event_ids" // ListAllEntriesWithPages functionality related to listing all registration entries with pagination ListAllEntriesWithPages = "list_all_entries_with_pages" diff --git a/pkg/common/telemetry/server/server.go b/pkg/common/telemetry/server/server.go index 67d1caf83b..980d4218a2 100644 --- a/pkg/common/telemetry/server/server.go +++ b/pkg/common/telemetry/server/server.go @@ -7,3 +7,51 @@ import "github.com/spiffe/spire/pkg/common/telemetry" func SetEntryDeletedGauge(m telemetry.Metrics, deleted int) { m.SetGauge([]string{telemetry.Entry, telemetry.Deleted}, float32(deleted)) } + +// SetAgentsByIDCacheCountGauge emits a gauge with the number of agents by ID that are +// currently in the node cache. +func SetAgentsByIDCacheCountGauge(m telemetry.Metrics, size int) { + m.SetGauge([]string{telemetry.Node, telemetry.AgentsByIDCache, telemetry.Count}, float32(size)) +} + +// SetAgentsByExpiresAtCacheCountGauge emits a gauge with the number of agents by expiresAt that are +// currently in the node cache. +func SetAgentsByExpiresAtCacheCountGauge(m telemetry.Metrics, size int) { + m.SetGauge([]string{telemetry.Node, telemetry.AgentsByExpiresAtCache, telemetry.Count}, float32(size)) +} + +// SetSkippedNodeEventIDsCacheCountGauge emits a gauge with the number of entries that are +// currently in the skipped-node events cache. +func SetSkippedNodeEventIDsCacheCountGauge(m telemetry.Metrics, size int) { + m.SetGauge([]string{telemetry.Node, telemetry.SkippedNodeEventIDs, telemetry.Count}, float32(size)) +} + +// SetNodeAliasesByEntryIDCacheCountGauge emits a gauge with the number of Node Aliases by EntryID that are +// currently in the entry cache. +func SetNodeAliasesByEntryIDCacheCountGauge(m telemetry.Metrics, size int) { + m.SetGauge([]string{telemetry.Entry, telemetry.NodeAliasesByEntryIDCache, telemetry.Count}, float32(size)) +} + +// SetNodeAliasesBySelectorCacheCountGauge emits a gauge with the number of Node Aliases by Selector that are +// currently in the entry cache. +func SetNodeAliasesBySelectorCacheCountGauge(m telemetry.Metrics, size int) { + m.SetGauge([]string{telemetry.Entry, telemetry.NodeAliasesBySelectorCache, telemetry.Count}, float32(size)) +} + +// SetEntriesByEntryIDCacheCountGauge emits a gauge with the number of entries by entryID that are +// currently in the entry cache. +func SetEntriesByEntryIDCacheCountGauge(m telemetry.Metrics, size int) { + m.SetGauge([]string{telemetry.Entry, telemetry.EntriesByEntryIDCache, telemetry.Count}, float32(size)) +} + +// SetEntriesByParentIDCacheCountGauge emits a gauge with the number of entries by parentID that are +// currently in the entry cache. +func SetEntriesByParentIDCacheCountGauge(m telemetry.Metrics, size int) { + m.SetGauge([]string{telemetry.Entry, telemetry.EntriesByParentIDCache, telemetry.Count}, float32(size)) +} + +// SetSkippedEntryEventIDsCacheCountGauge emits a gauge with the number of entries that are +// currently in the skipped-entry events cache. +func SetSkippedEntryEventIDsCacheCountGauge(m telemetry.Metrics, size int) { + m.SetGauge([]string{telemetry.Entry, telemetry.SkippedEntryEventIDs, telemetry.Count}, float32(size)) +} diff --git a/pkg/server/authorizedentries/cache.go b/pkg/server/authorizedentries/cache.go index 9814795816..77e2d7aaf5 100644 --- a/pkg/server/authorizedentries/cache.go +++ b/pkg/server/authorizedentries/cache.go @@ -267,7 +267,7 @@ func (c *Cache) removeEntry(entryID string) { } } -func (c *Cache) stats() cacheStats { +func (c *Cache) Stats() cacheStats { return cacheStats{ AgentsByID: c.agentsByID.Len(), AgentsByExpiresAt: c.agentsByExpiresAt.Len(), diff --git a/pkg/server/authorizedentries/cache_test.go b/pkg/server/authorizedentries/cache_test.go index be4c2ef929..86315bece5 100644 --- a/pkg/server/authorizedentries/cache_test.go +++ b/pkg/server/authorizedentries/cache_test.go @@ -173,7 +173,7 @@ func TestCacheInternalStats(t *testing.T) { clk := clock.NewMock(t) t.Run("pristine", func(t *testing.T) { cache := NewCache(clk) - require.Zero(t, cache.stats()) + require.Zero(t, cache.Stats()) }) t.Run("entries and aliases", func(t *testing.T) { @@ -189,13 +189,13 @@ func TestCacheInternalStats(t *testing.T) { require.Equal(t, cacheStats{ EntriesByEntryID: 1, EntriesByParentID: 1, - }, cache.stats()) + }, cache.Stats()) cache.UpdateEntry(entry2a) require.Equal(t, cacheStats{ EntriesByEntryID: 2, EntriesByParentID: 2, - }, cache.stats()) + }, cache.Stats()) cache.UpdateEntry(entry2b) require.Equal(t, cacheStats{ @@ -203,20 +203,20 @@ func TestCacheInternalStats(t *testing.T) { EntriesByParentID: 1, AliasesByEntryID: 2, // one for each selector AliasesBySelector: 2, // one for each selector - }, cache.stats()) + }, cache.Stats()) cache.RemoveEntry(entry1.Id) require.Equal(t, cacheStats{ AliasesByEntryID: 2, // one for each selector AliasesBySelector: 2, // one for each selector - }, cache.stats()) + }, cache.Stats()) cache.RemoveEntry(entry2b.Id) - require.Zero(t, cache.stats()) + require.Zero(t, cache.Stats()) // Remove again and make sure nothing happens. cache.RemoveEntry(entry2b.Id) - require.Zero(t, cache.stats()) + require.Zero(t, cache.Stats()) }) t.Run("agents", func(t *testing.T) { @@ -225,28 +225,28 @@ func TestCacheInternalStats(t *testing.T) { require.Equal(t, cacheStats{ AgentsByID: 1, AgentsByExpiresAt: 1, - }, cache.stats()) + }, cache.Stats()) cache.UpdateAgent(agent2.String(), now.Add(time.Hour*2), []*types.Selector{sel2}) require.Equal(t, cacheStats{ AgentsByID: 2, AgentsByExpiresAt: 2, - }, cache.stats()) + }, cache.Stats()) cache.UpdateAgent(agent2.String(), now.Add(time.Hour*3), []*types.Selector{sel2}) require.Equal(t, cacheStats{ AgentsByID: 2, AgentsByExpiresAt: 2, - }, cache.stats()) + }, cache.Stats()) cache.RemoveAgent(agent1.String()) require.Equal(t, cacheStats{ AgentsByID: 1, AgentsByExpiresAt: 1, - }, cache.stats()) + }, cache.Stats()) cache.RemoveAgent(agent2.String()) - require.Zero(t, cache.stats()) + require.Zero(t, cache.Stats()) }) } diff --git a/pkg/server/endpoints/authorized_entryfetcher.go b/pkg/server/endpoints/authorized_entryfetcher.go index b5c69dac70..27de3f14f3 100644 --- a/pkg/server/endpoints/authorized_entryfetcher.go +++ b/pkg/server/endpoints/authorized_entryfetcher.go @@ -9,6 +9,7 @@ import ( "github.com/sirupsen/logrus" "github.com/spiffe/go-spiffe/v2/spiffeid" "github.com/spiffe/spire-api-sdk/proto/spire/api/types" + "github.com/spiffe/spire/pkg/common/telemetry" "github.com/spiffe/spire/pkg/server/api" "github.com/spiffe/spire/pkg/server/authorizedentries" "github.com/spiffe/spire/pkg/server/datastore" @@ -36,9 +37,9 @@ type eventsBasedCache interface { pruneMissedEvents() } -func NewAuthorizedEntryFetcherWithEventsBasedCache(ctx context.Context, log logrus.FieldLogger, clk clock.Clock, ds datastore.DataStore, cacheReloadInterval, pruneEventsOlderThan, sqlTransactionTimeout time.Duration) (*AuthorizedEntryFetcherWithEventsBasedCache, error) { +func NewAuthorizedEntryFetcherWithEventsBasedCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, clk clock.Clock, ds datastore.DataStore, cacheReloadInterval, pruneEventsOlderThan, sqlTransactionTimeout time.Duration) (*AuthorizedEntryFetcherWithEventsBasedCache, error) { log.Info("Building event-based in-memory entry cache") - cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk, sqlTransactionTimeout) + cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, metrics, ds, clk, sqlTransactionTimeout) if err != nil { return nil, err } @@ -111,15 +112,15 @@ func (a *AuthorizedEntryFetcherWithEventsBasedCache) updateCache(ctx context.Con return errors.Join(updateRegistrationEntriesCacheErr, updateAttestedNodesCacheErr) } -func buildCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, sqlTransactionTimeout time.Duration) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) { +func buildCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, sqlTransactionTimeout time.Duration) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) { cache := authorizedentries.NewCache(clk) - registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, sqlTransactionTimeout) + registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, buildCachePageSize, sqlTransactionTimeout) if err != nil { return nil, nil, nil, err } - attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, sqlTransactionTimeout) + attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, sqlTransactionTimeout) if err != nil { return nil, nil, nil, err } diff --git a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go index 32199a91e8..6d692eeba7 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go +++ b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go @@ -8,7 +8,9 @@ import ( "github.com/andres-erbsen/clock" "github.com/sirupsen/logrus" + "github.com/spiffe/spire/pkg/common/telemetry" + server_telemetry "github.com/spiffe/spire/pkg/common/telemetry/server" "github.com/spiffe/spire/pkg/server/api" "github.com/spiffe/spire/pkg/server/authorizedentries" "github.com/spiffe/spire/pkg/server/datastore" @@ -17,11 +19,12 @@ import ( ) type attestedNodes struct { - cache *authorizedentries.Cache - clk clock.Clock - ds datastore.DataStore - log logrus.FieldLogger - mu sync.RWMutex + cache *authorizedentries.Cache + clk clock.Clock + ds datastore.DataStore + log logrus.FieldLogger + metrics telemetry.Metrics + mu sync.RWMutex firstEventID uint firstEventTime time.Time @@ -33,7 +36,7 @@ type attestedNodes struct { // buildAttestedNodesCache fetches all attested nodes and adds the unexpired ones to the cache. // It runs once at startup. -func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) { +func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) { resp, err := ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) if err != nil { return nil, err @@ -82,6 +85,7 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat firstEventID: firstEventID, firstEventTime: firstEventTime, log: log, + metrics: metrics, lastEventID: lastEventID, missedEvents: missedEvents, seenMissedStartupEvents: make(map[uint]struct{}), @@ -142,6 +146,10 @@ func (a *attestedNodes) updateCache(ctx context.Context) error { a.lastEventID = event.EventID } + // These two should be the same value but it's valuable to have them both be emitted for incident triage. + server_telemetry.SetAgentsByExpiresAtCacheCountGauge(a.metrics, a.cache.Stats().AgentsByExpiresAt) + server_telemetry.SetAgentsByIDCacheCountGauge(a.metrics, a.cache.Stats().AgentsByID) + return nil } @@ -201,6 +209,7 @@ func (a *attestedNodes) replayMissedEvents(ctx context.Context) { delete(a.missedEvents, eventID) } + server_telemetry.SetSkippedNodeEventIDsCacheCountGauge(a.metrics, len(a.missedEvents)) } // updatedCacheEntry update/deletes/creates an individual attested node in the cache. diff --git a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes_test.go b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes_test.go index 4dc04bf0b2..d02ebe60dd 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes_test.go +++ b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes_test.go @@ -9,62 +9,83 @@ import ( "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/spiffe/go-spiffe/v2/spiffeid" + "github.com/spiffe/spire/pkg/common/telemetry" "github.com/spiffe/spire/pkg/server/authorizedentries" "github.com/spiffe/spire/pkg/server/datastore" "github.com/spiffe/spire/proto/spire/common" "github.com/spiffe/spire/test/clock" "github.com/spiffe/spire/test/fakes/fakedatastore" + "github.com/spiffe/spire/test/fakes/fakemetrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestUpdateAttestedNodesCache(t *testing.T) { - ctx := context.Background() - log, _ := test.NewNullLogger() - clk := clock.NewMock(t) - ds := fakedatastore.New(t) - cache := authorizedentries.NewCache(clk) - - attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, defaultSQLTransactionTimeout) - require.NoError(t, err) - require.NotNil(t, attestedNodes) - - agentID, err := spiffeid.FromString("spiffe://example.org/myagent") - require.NoError(t, err) - - _, err = ds.CreateAttestedNode(ctx, &common.AttestedNode{ - SpiffeId: agentID.String(), - CertNotAfter: time.Now().Add(5 * time.Hour).Unix(), - }) - require.NoError(t, err) - for _, tt := range []struct { name string errs []error expectedLastAttestedNodeEventID uint + expectMetrics []fakemetrics.MetricItem }{ { name: "Error Listing Attested Node Events", errs: []error{errors.New("listing attested node events")}, expectedLastAttestedNodeEventID: uint(0), + expectMetrics: nil, }, { name: "Error Fetching Attested Node", errs: []error{nil, errors.New("fetching attested node")}, expectedLastAttestedNodeEventID: uint(0), + expectMetrics: nil, }, { name: "Error Getting Node Selectors", errs: []error{nil, nil, errors.New("getting node selectors")}, expectedLastAttestedNodeEventID: uint(0), + expectMetrics: nil, }, { name: "No Errors", expectedLastAttestedNodeEventID: uint(1), + expectMetrics: []fakemetrics.MetricItem{ + { + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Node, telemetry.AgentsByExpiresAtCache, telemetry.Count}, + Val: 1, + Labels: nil, + }, + { + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Node, telemetry.AgentsByIDCache, telemetry.Count}, + Val: 1, + Labels: nil, + }, + }, }, } { tt := tt t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + log, _ := test.NewNullLogger() + clk := clock.NewMock(t) + ds := fakedatastore.New(t) + cache := authorizedentries.NewCache(clk) + metrics := fakemetrics.New() + + attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, defaultSQLTransactionTimeout) + require.NoError(t, err) + require.NotNil(t, attestedNodes) + + agentID, err := spiffeid.FromString("spiffe://example.org/myagent") + require.NoError(t, err) + + _, err = ds.CreateAttestedNode(ctx, &common.AttestedNode{ + SpiffeId: agentID.String(), + CertNotAfter: time.Now().Add(5 * time.Hour).Unix(), + }) + require.NoError(t, err) + for _, err = range tt.errs { ds.AppendNextError(err) } @@ -77,6 +98,10 @@ func TestUpdateAttestedNodesCache(t *testing.T) { } assert.Equal(t, tt.expectedLastAttestedNodeEventID, attestedNodes.lastEventID) + + if tt.expectMetrics != nil { + assert.Subset(t, metrics.AllMetrics(), tt.expectMetrics) + } }) } } @@ -88,8 +113,9 @@ func TestAttestedNodesCacheMissedEventNotFound(t *testing.T) { clk := clock.NewMock(t) ds := fakedatastore.New(t) cache := authorizedentries.NewCache(clk) + metrics := fakemetrics.New() - attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, defaultSQLTransactionTimeout) + attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, attestedNodes) @@ -105,6 +131,7 @@ func TestAttestedNodesSavesMissedStartupEvents(t *testing.T) { clk := clock.NewMock(t) ds := fakedatastore.New(t) cache := authorizedentries.NewCache(clk) + metrics := fakemetrics.New() err := ds.CreateAttestedNodeEventForTesting(ctx, &datastore.AttestedNodeEvent{ EventID: 3, @@ -112,7 +139,7 @@ func TestAttestedNodesSavesMissedStartupEvents(t *testing.T) { }) require.NoError(t, err) - attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, defaultSQLTransactionTimeout) + attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, attestedNodes) require.Equal(t, uint(3), attestedNodes.firstEventID) diff --git a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go index bffcd20164..570cbad008 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go +++ b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go @@ -9,6 +9,7 @@ import ( "github.com/andres-erbsen/clock" "github.com/sirupsen/logrus" "github.com/spiffe/spire/pkg/common/telemetry" + server_telemetry "github.com/spiffe/spire/pkg/common/telemetry/server" "github.com/spiffe/spire/pkg/server/api" "github.com/spiffe/spire/pkg/server/authorizedentries" "github.com/spiffe/spire/pkg/server/datastore" @@ -17,11 +18,12 @@ import ( ) type registrationEntries struct { - cache *authorizedentries.Cache - clk clock.Clock - ds datastore.DataStore - log logrus.FieldLogger - mu sync.RWMutex + cache *authorizedentries.Cache + clk clock.Clock + ds datastore.DataStore + log logrus.FieldLogger + metrics telemetry.Metrics + mu sync.RWMutex firstEventID uint firstEventTime time.Time @@ -31,9 +33,8 @@ type registrationEntries struct { sqlTransactionTimeout time.Duration } -// buildRegistrationEntriesCache fetches all registration entries and adds them to the cache. -// It runs once at startup. -func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, pageSize int32, sqlTransactionTimeout time.Duration) (*registrationEntries, error) { +// buildRegistrationEntriesCache Fetches all registration entries and adds them to the cache +func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, pageSize int32, sqlTransactionTimeout time.Duration) (*registrationEntries, error) { resp, err := ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{}) if err != nil { return nil, err @@ -95,6 +96,7 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, firstEventID: firstEventID, firstEventTime: firstEventTime, log: log, + metrics: metrics, lastEventID: lastEventID, missedEvents: missedEvents, seenMissedStartupEvents: make(map[uint]struct{}), @@ -155,6 +157,14 @@ func (a *registrationEntries) updateCache(ctx context.Context) error { a.lastEventID = event.EventID } + // These two should be the same value but it's valuable to have them both be emitted for incident triage. + server_telemetry.SetNodeAliasesByEntryIDCacheCountGauge(a.metrics, a.cache.Stats().AliasesByEntryID) + server_telemetry.SetNodeAliasesBySelectorCacheCountGauge(a.metrics, a.cache.Stats().AliasesBySelector) + + // These two should be the same value but it's valuable to have them both be emitted for incident triage. + server_telemetry.SetEntriesByEntryIDCacheCountGauge(a.metrics, a.cache.Stats().EntriesByEntryID) + server_telemetry.SetEntriesByParentIDCacheCountGauge(a.metrics, a.cache.Stats().EntriesByParentID) + return nil } @@ -214,6 +224,7 @@ func (a *registrationEntries) replayMissedEvents(ctx context.Context) { delete(a.missedEvents, eventID) } + server_telemetry.SetSkippedEntryEventIDsCacheCountGauge(a.metrics, len(a.missedEvents)) } // updateCacheEntry update/deletes/creates an individual registration entry in the cache. diff --git a/pkg/server/endpoints/authorized_entryfetcher_registration_entries_test.go b/pkg/server/endpoints/authorized_entryfetcher_registration_entries_test.go index ac5415a410..44b0b531ed 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_registration_entries_test.go +++ b/pkg/server/endpoints/authorized_entryfetcher_registration_entries_test.go @@ -16,6 +16,7 @@ import ( "github.com/spiffe/spire/proto/spire/common" "github.com/spiffe/spire/test/clock" "github.com/spiffe/spire/test/fakes/fakedatastore" + "github.com/spiffe/spire/test/fakes/fakemetrics" "github.com/stretchr/testify/require" ) @@ -70,7 +71,9 @@ func TestBuildRegistrationEntriesCache(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { cache := authorizedentries.NewCache(clk) - registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, tt.pageSize, defaultSQLTransactionTimeout) + metrics := fakemetrics.New() + + registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, tt.pageSize, defaultSQLTransactionTimeout) if tt.err != "" { require.ErrorContains(t, err, tt.err) return @@ -104,8 +107,9 @@ func TestRegistrationEntriesCacheMissedEventNotFound(t *testing.T) { clk := clock.NewMock(t) ds := fakedatastore.New(t) cache := authorizedentries.NewCache(clk) + metrics := fakemetrics.New() - registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, defaultSQLTransactionTimeout) + registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, buildCachePageSize, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, registrationEntries) @@ -121,6 +125,7 @@ func TestRegistrationEntriesSavesMissedStartupEvents(t *testing.T) { clk := clock.NewMock(t) ds := fakedatastore.New(t) cache := authorizedentries.NewCache(clk) + metrics := fakemetrics.New() err := ds.CreateRegistrationEntryEventForTesting(ctx, &datastore.RegistrationEntryEvent{ EventID: 3, @@ -128,7 +133,7 @@ func TestRegistrationEntriesSavesMissedStartupEvents(t *testing.T) { }) require.NoError(t, err) - registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, defaultSQLTransactionTimeout) + registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, buildCachePageSize, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, registrationEntries) require.Equal(t, uint(3), registrationEntries.firstEventID) diff --git a/pkg/server/endpoints/authorized_entryfetcher_test.go b/pkg/server/endpoints/authorized_entryfetcher_test.go index 4cf74e2f03..761ce966ed 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_test.go +++ b/pkg/server/endpoints/authorized_entryfetcher_test.go @@ -10,10 +10,12 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/spiffe/go-spiffe/v2/spiffeid" "github.com/spiffe/spire/pkg/common/idutil" + "github.com/spiffe/spire/pkg/common/telemetry" "github.com/spiffe/spire/pkg/server/datastore" "github.com/spiffe/spire/proto/spire/common" "github.com/spiffe/spire/test/clock" "github.com/spiffe/spire/test/fakes/fakedatastore" + "github.com/spiffe/spire/test/fakes/fakemetrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -23,8 +25,9 @@ func TestNewAuthorizedEntryFetcherWithEventsBasedCache(t *testing.T) { log, _ := test.NewNullLogger() clk := clock.NewMock(t) ds := fakedatastore.New(t) + metrics := fakemetrics.New() - ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) + ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) assert.NoError(t, err) assert.NotNil(t, ef) @@ -94,6 +97,21 @@ func TestNewAuthorizedEntryFetcherWithEventsBasedCache(t *testing.T) { entries, err := ef.FetchAuthorizedEntries(ctx, agentID) assert.NoError(t, err) assert.Equal(t, 2, len(entries)) + + // Assert metrics + expectedMetrics := []fakemetrics.MetricItem{ + agentsByIDMetric(1), + agentsByIDExpiresAtMetric(1), + nodeAliasesByEntryIDMetric(1), + nodeAliasesBySelectorMetric(1), + entriesByEntryIDMetric(2), + entriesByParentIDMetric(2), + // Here we have 2 skipped events, one for nodes, one for entries + nodeSkippedEventMetric(0), + entriesSkippedEventMetric(0), + } + + assert.ElementsMatch(t, expectedMetrics, metrics.AllMetrics(), "should emit metrics for node aliases, entries, and agents") } func TestNewAuthorizedEntryFetcherWithEventsBasedCacheErrorBuildingCache(t *testing.T) { @@ -101,13 +119,18 @@ func TestNewAuthorizedEntryFetcherWithEventsBasedCacheErrorBuildingCache(t *test log, _ := test.NewNullLogger() clk := clock.NewMock(t) ds := fakedatastore.New(t) + metrics := fakemetrics.New() buildErr := errors.New("build error") ds.SetNextError(buildErr) - ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) + ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) assert.Error(t, err) assert.Nil(t, ef) + + // Assert metrics + expectedMetrics := []fakemetrics.MetricItem{} + assert.ElementsMatch(t, expectedMetrics, metrics.AllMetrics(), "should emit no metrics") } func TestBuildCacheSavesMissedEvents(t *testing.T) { @@ -115,6 +138,7 @@ func TestBuildCacheSavesMissedEvents(t *testing.T) { log, _ := test.NewNullLogger() clk := clock.NewMock(t) ds := fakedatastore.New(t) + metrics := fakemetrics.New() // Create Registration Entry Events with a gap err := ds.CreateRegistrationEntryEventForTesting(ctx, &datastore.RegistrationEntryEvent{ @@ -142,7 +166,7 @@ func TestBuildCacheSavesMissedEvents(t *testing.T) { }) require.NoError(t, err) - _, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk, defaultSQLTransactionTimeout) + _, registrationEntries, attestedNodes, err := buildCache(ctx, log, metrics, ds, clk, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, registrationEntries) require.NotNil(t, attestedNodes) @@ -153,6 +177,10 @@ func TestBuildCacheSavesMissedEvents(t *testing.T) { assert.Contains(t, attestedNodes.missedEvents, uint(2)) assert.Contains(t, attestedNodes.missedEvents, uint(3)) assert.Equal(t, uint(4), attestedNodes.lastEventID) + + // Assert metrics since the updateCache() method doesn't get called right at built time. + expectedMetrics := []fakemetrics.MetricItem{} + assert.ElementsMatch(t, expectedMetrics, metrics.AllMetrics(), "should emit no metrics") } func TestRunUpdateCacheTaskPrunesExpiredAgents(t *testing.T) { @@ -161,8 +189,9 @@ func TestRunUpdateCacheTaskPrunesExpiredAgents(t *testing.T) { log.SetLevel(logrus.DebugLevel) clk := clock.NewMock(t) ds := fakedatastore.New(t) + metrics := fakemetrics.New() - ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) + ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, ef) @@ -226,8 +255,9 @@ func TestUpdateRegistrationEntriesCacheMissedEvents(t *testing.T) { log, _ := test.NewNullLogger() clk := clock.NewMock(t) ds := fakedatastore.New(t) + metrics := fakemetrics.New() - ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) + ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, ef) @@ -309,6 +339,7 @@ func TestUpdateRegistrationEntriesCacheMissedStartupEvents(t *testing.T) { log, _ := test.NewNullLogger() clk := clock.NewMock(t) ds := fakedatastore.New(t) + metrics := fakemetrics.New() agentID := spiffeid.RequireFromString("spiffe://example.org/myagent") @@ -345,7 +376,7 @@ func TestUpdateRegistrationEntriesCacheMissedStartupEvents(t *testing.T) { require.NoError(t, err) // Create entry fetcher - ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) + ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, ef) @@ -415,8 +446,9 @@ func TestUpdateAttestedNodesCacheMissedEvents(t *testing.T) { log, _ := test.NewNullLogger() clk := clock.NewMock(t) ds := fakedatastore.New(t) + metrics := fakemetrics.New() - ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) + ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, ef) @@ -531,6 +563,7 @@ func TestUpdateAttestedNodesCacheMissedStartupEvents(t *testing.T) { log, _ := test.NewNullLogger() clk := clock.NewMock(t) ds := fakedatastore.New(t) + metrics := fakemetrics.New() agent1 := spiffeid.RequireFromString("spiffe://example.org/myagent1") agent2 := spiffeid.RequireFromString("spiffe://example.org/myagent2") @@ -594,7 +627,7 @@ func TestUpdateAttestedNodesCacheMissedStartupEvents(t *testing.T) { require.NoError(t, err) // Create entry fetcher - ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) + ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout) require.NoError(t, err) require.NotNil(t, ef) @@ -652,3 +685,75 @@ func TestUpdateAttestedNodesCacheMissedStartupEvents(t *testing.T) { require.Equal(t, entry.EntryId, entries[0].Id) require.Equal(t, entry.SpiffeId, idutil.RequireIDProtoString(entries[0].SpiffeId)) } + +// AgentsByIDCacheCount +func agentsByIDMetric(val float32) fakemetrics.MetricItem { + return fakemetrics.MetricItem{ + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Node, telemetry.AgentsByIDCache, telemetry.Count}, + Val: val, + Labels: nil} +} + +func agentsByIDExpiresAtMetric(val float32) fakemetrics.MetricItem { + return fakemetrics.MetricItem{ + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Node, telemetry.AgentsByExpiresAtCache, telemetry.Count}, + Val: val, + Labels: nil, + } +} + +func nodeAliasesByEntryIDMetric(val float32) fakemetrics.MetricItem { + return fakemetrics.MetricItem{ + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Entry, telemetry.NodeAliasesByEntryIDCache, telemetry.Count}, + Val: val, + Labels: nil, + } +} + +func nodeSkippedEventMetric(val float32) fakemetrics.MetricItem { + return fakemetrics.MetricItem{ + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Node, telemetry.SkippedNodeEventIDs, telemetry.Count}, + Val: val, + Labels: nil, + } +} + +func nodeAliasesBySelectorMetric(val float32) fakemetrics.MetricItem { + return fakemetrics.MetricItem{ + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Entry, telemetry.NodeAliasesBySelectorCache, telemetry.Count}, + Val: val, + Labels: nil, + } +} + +func entriesByEntryIDMetric(val float32) fakemetrics.MetricItem { + return fakemetrics.MetricItem{ + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Entry, telemetry.EntriesByEntryIDCache, telemetry.Count}, + Val: val, + Labels: nil, + } +} + +func entriesByParentIDMetric(val float32) fakemetrics.MetricItem { + return fakemetrics.MetricItem{ + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Entry, telemetry.EntriesByParentIDCache, telemetry.Count}, + Val: val, + Labels: nil, + } +} + +func entriesSkippedEventMetric(val float32) fakemetrics.MetricItem { + return fakemetrics.MetricItem{ + Type: fakemetrics.SetGaugeType, + Key: []string{telemetry.Entry, telemetry.SkippedEntryEventIDs, telemetry.Count}, + Val: val, + Labels: nil, + } +} diff --git a/pkg/server/endpoints/endpoints.go b/pkg/server/endpoints/endpoints.go index af2a476e03..af2db4f908 100644 --- a/pkg/server/endpoints/endpoints.go +++ b/pkg/server/endpoints/endpoints.go @@ -134,7 +134,7 @@ func New(ctx context.Context, c Config) (*Endpoints, error) { var ef api.AuthorizedEntryFetcher var cacheRebuildTask, pruneEventsTask func(context.Context) error if c.EventsBasedCache { - efEventsBasedCache, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, c.Log, c.Clock, ds, c.CacheReloadInterval, c.PruneEventsOlderThan, c.SQLTransactionTimeout) + efEventsBasedCache, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, c.Log, c.Metrics, c.Clock, ds, c.CacheReloadInterval, c.PruneEventsOlderThan, c.SQLTransactionTimeout) if err != nil { return nil, err }