Skip to content

Commit

Permalink
Reworked some of the unit tests to the refactored algorithm.
Browse files Browse the repository at this point in the history
Signed-off-by: Edwin Buck <edwbuck@gmail.com>
  • Loading branch information
edwbuck committed Sep 24, 2024
1 parent b4d2de9 commit bf6ad3c
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 204 deletions.
6 changes: 3 additions & 3 deletions pkg/server/authorizedentries/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ func (c *Cache) removeEntry(entryID string) {
}
}

func (c *Cache) Stats() cacheStats {
return cacheStats{
func (c *Cache) Stats() CacheStats {
return CacheStats{
AgentsByID: c.agentsByID.Len(),
AgentsByExpiresAt: c.agentsByExpiresAt.Len(),
AliasesByEntryID: c.aliasesByEntryID.Len(),
Expand All @@ -286,7 +286,7 @@ func isNodeAlias(e *types.Entry) bool {
return e.ParentId.Path == idutil.ServerIDPath
}

type cacheStats struct {
type CacheStats struct {
AgentsByID int
AgentsByExpiresAt int
AliasesByEntryID int
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/datastore/sqlstore/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ func (ds *Plugin) CreateAttestedNode(ctx context.Context, node *common.AttestedN
if err != nil {
return err
}
// TODO: this is at the wrong level of the software stack.
// It should be created in the caller of the datastore interface.
return createAttestedNodeEvent(tx, &datastore.AttestedNodeEvent{
SpiffeID: node.SpiffeId,
})
Expand Down
195 changes: 105 additions & 90 deletions pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ type attestedNodes struct {

eventsBeforeFirst map[uint]struct{}

firstEvent uint
firstEventTime time.Time
lastEvent uint
firstEvent uint
firstEventTime time.Time
lastEvent uint

eventTracker *eventTracker
sqlTransactionTimeout time.Duration
eventTracker *eventTracker
sqlTransactionTimeout time.Duration

fetchNodes map[string]struct{}
fetchNodes map[string]struct{}
lastCacheStats authorizedentries.CacheStats
}

func (a *attestedNodes) captureChangedNodes(ctx context.Context) error {
Expand All @@ -54,88 +55,88 @@ func (a *attestedNodes) captureChangedNodes(ctx context.Context) error {
}

func (a *attestedNodes) searchBeforeFirstEvent(ctx context.Context) error {
// First event detected, and startup was less than a transaction timout away.
if !a.firstEventTime.IsZero() && a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout {
// First event detected, and startup was less than a transaction timout away.
if !a.firstEventTime.IsZero() && a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout {
resp, err := a.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{
LessThanEventID: a.firstEvent,
})
if err != nil {
return err
}
for _, event := range resp.Events {
// if we have seen it before, don't reload it.
if _, seen := a.eventsBeforeFirst[event.EventID]; !seen {
a.fetchNodes[event.SpiffeID] = struct{}{}
a.eventsBeforeFirst[event.EventID] = struct{}{}
}
}
return nil
}

// zero out unused event tracker
if len(a.eventsBeforeFirst) != 0 {
a.eventsBeforeFirst = make(map[uint]struct{})
}
if err != nil {
return err
}
for _, event := range resp.Events {
// if we have seen it before, don't reload it.
if _, seen := a.eventsBeforeFirst[event.EventID]; !seen {
a.fetchNodes[event.SpiffeID] = struct{}{}
a.eventsBeforeFirst[event.EventID] = struct{}{}
}
}
return nil
}

// zero out unused event tracker
if len(a.eventsBeforeFirst) != 0 {
a.eventsBeforeFirst = make(map[uint]struct{})
}

return nil
}

func (a *attestedNodes) selectPolledEvents(ctx context.Context) {
// check if the polled events have appeared out-of-order
for _, eventID := range a.eventTracker.SelectEvents() {
log := a.log.WithField(telemetry.EventID, eventID)
// check if the polled events have appeared out-of-order
for _, eventID := range a.eventTracker.SelectEvents() {
log := a.log.WithField(telemetry.EventID, eventID)
event, err := a.ds.FetchAttestedNodeEvent(ctx, eventID)

switch status.Code(err) {
case codes.OK:
case codes.NotFound:
continue
default:
log.WithError(err).Errorf("Failed to fetch info about skipped node event %d", eventID)
continue
}

a.fetchNodes[event.SpiffeID] = struct{}{}
a.eventTracker.StopTracking(uint(eventID))
}
switch status.Code(err) {
case codes.OK:
case codes.NotFound:
continue
default:
log.WithError(err).Errorf("Failed to fetch info about skipped node event %d", eventID)
continue
}

a.fetchNodes[event.SpiffeID] = struct{}{}
a.eventTracker.StopTracking(uint(eventID))
}
server_telemetry.SetSkippedNodeEventIDsCacheCountGauge(a.metrics, int(a.eventTracker.EventCount()))
}

func (a *attestedNodes) scanForNewEvents(ctx context.Context) error {
// If we haven't seen an event, scan for all events; otherwise, scan from the last event.
var resp *datastore.ListAttestedNodesEventsResponse
var err error
if a.firstEventTime.IsZero() {
// If we haven't seen an event, scan for all events; otherwise, scan from the last event.
var resp *datastore.ListAttestedNodesEventsResponse
var err error
if a.firstEventTime.IsZero() {
resp, err = a.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{})
} else {
} else {
resp, err = a.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{
GreaterThanEventID: a.lastEvent,
})
}
if err != nil {
return err
}

for _, event := range resp.Events {
// event time determines if we have seen the first event.
if a.firstEventTime.IsZero() {
a.firstEvent = event.EventID
a.lastEvent = event.EventID
a.fetchNodes[event.SpiffeID] = struct{}{}
a.firstEventTime = a.clk.Now()
continue
}

// track any skipped event ids, should the appear later.
for skipped := a.lastEvent + 1; skipped < event.EventID; skipped++ {
a.eventTracker.StartTracking(skipped)
}

// every event adds its entry to the entry fetch list.
a.fetchNodes[event.SpiffeID] = struct{}{}
a.lastEvent = event.EventID
}
return nil
}
if err != nil {
return err
}

for _, event := range resp.Events {
// event time determines if we have seen the first event.
if a.firstEventTime.IsZero() {
a.firstEvent = event.EventID
a.lastEvent = event.EventID
a.fetchNodes[event.SpiffeID] = struct{}{}
a.firstEventTime = a.clk.Now()
continue
}

// track any skipped event ids, should the appear later.
for skipped := a.lastEvent + 1; skipped < event.EventID; skipped++ {
a.eventTracker.StartTracking(skipped)
}

// every event adds its entry to the entry fetch list.
a.fetchNodes[event.SpiffeID] = struct{}{}
a.lastEvent = event.EventID
}
return nil
}

func (a *attestedNodes) loadCache(ctx context.Context) error {
Expand All @@ -154,6 +155,7 @@ func (a *attestedNodes) loadCache(ctx context.Context) error {
}
a.cache.UpdateAgent(node.SpiffeId, agentExpiresAt, api.ProtoFromSelectors(node.Selectors))
}
a.emitCacheMetrics()

return nil
}
Expand All @@ -165,26 +167,29 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, metric
pollBoundaries := BoundaryBuilder(cacheReloadInterval, sqlTransactionTimeout)

attestedNodes := &attestedNodes{
cache: cache,
clk: clk,
ds: ds,
log: log,
metrics: metrics,
sqlTransactionTimeout: sqlTransactionTimeout,

eventsBeforeFirst: make(map[uint]struct{}),
fetchNodes: make(map[string]struct{}),

eventTracker: NewEventTracker(pollPeriods, pollBoundaries),
cache: cache,
clk: clk,
ds: ds,
log: log,
metrics: metrics,
sqlTransactionTimeout: sqlTransactionTimeout,

eventsBeforeFirst: make(map[uint]struct{}),
fetchNodes: make(map[string]struct{}),

eventTracker: NewEventTracker(pollPeriods, pollBoundaries),
lastCacheStats: authorizedentries.CacheStats{
// nonsense counts to force a change, even to zero
AgentsByID: -1,
AgentsByExpiresAt: -1,
},
}

if err := attestedNodes.loadCache(ctx); err != nil {
return nil, err
}
if err := attestedNodes.captureChangedNodes(ctx); err != nil {
return nil, err
}
if err := attestedNodes.updateCachedNodes(ctx); err != nil {
// TODO: is it really necessary to udpate on first load?
if err := attestedNodes.updateCache(ctx); err != nil {
return nil, err
}

Expand All @@ -201,15 +206,11 @@ func (a *attestedNodes) updateCache(ctx context.Context) error {
return err
}

// These two should be the same value but it's valuable to have them both be emitted for incident triage.
server_telemetry.SetAgentsByExpiresAtCacheCountGauge(a.metrics, a.cache.Stats().AgentsByExpiresAt)
server_telemetry.SetAgentsByIDCacheCountGauge(a.metrics, a.cache.Stats().AgentsByID)

return nil
}

func (a *attestedNodes) updateCachedNodes(ctx context.Context) error {
for spiffeId, _ := range a.fetchNodes {
for spiffeId, _ := range a.fetchNodes {
node, err := a.ds.FetchAttestedNode(ctx, spiffeId)
if err != nil {
return err
Expand All @@ -231,6 +232,20 @@ func (a *attestedNodes) updateCachedNodes(ctx context.Context) error {
a.cache.UpdateAgent(node.SpiffeId, agentExpiresAt, api.ProtoFromSelectors(node.Selectors))

}
a.emitCacheMetrics()
return nil
}

func (a *attestedNodes) emitCacheMetrics() {
cacheStats := a.cache.Stats()

if a.lastCacheStats.AgentsByExpiresAt != cacheStats.AgentsByExpiresAt {
server_telemetry.SetAgentsByExpiresAtCacheCountGauge(a.metrics, cacheStats.AgentsByExpiresAt)
a.lastCacheStats.AgentsByExpiresAt = cacheStats.AgentsByExpiresAt
}
// Should be the same as AgentsByExpireAt. Not de-duplicated for incident triage.
if a.lastCacheStats.AgentsByID != cacheStats.AgentsByID {
server_telemetry.SetAgentsByIDCacheCountGauge(a.metrics, cacheStats.AgentsByID)
a.lastCacheStats.AgentsByID = cacheStats.AgentsByID
}
}
Loading

0 comments on commit bf6ad3c

Please sign in to comment.