Skip to content

Commit

Permalink
Add comments to events based cache code (#5327)
Browse files Browse the repository at this point in the history
* Add comments to events based cache
* Remove scrolling debug log

Signed-off-by: Faisal Memon <fymemon@yahoo.com>
  • Loading branch information
faisal-memon authored Aug 20, 2024
1 parent 65ec692 commit 42225bf
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 14 deletions.
17 changes: 11 additions & 6 deletions pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type attestedNodes struct {
sqlTransactionTimeout time.Duration
}

// buildAttestedNodesCache Fetches all attested nodes and adds the unexpired ones to the cache
// buildAttestedNodesCache fetches all attested nodes and adds the unexpired ones to the cache.
// It runs once at startup.
func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) {
resp, err := ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{})
if err != nil {
Expand All @@ -49,6 +50,8 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat
firstEventID = event.EventID
firstEventTime = now
} else {
// After getting the first event, search for any gaps in the event stream, from the first event to the last event.
// During each cache refresh cycle, we will check if any of these missed events get populated.
for i := lastEventID + 1; i < event.EventID; i++ {
missedEvents[i] = now
}
Expand Down Expand Up @@ -89,11 +92,10 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat
// updateCache Fetches all the events since the last time this function was running and updates
// the cache with all the changes.
func (a *attestedNodes) updateCache(ctx context.Context) error {
// Process events skipped over previously
if err := a.missedStartupEvents(ctx); err != nil {
a.log.WithError(err).Error("Unable to process missed startup events")
}

// Process events skipped over previously
a.replayMissedEvents(ctx)

req := &datastore.ListAttestedNodesEventsRequest{
Expand All @@ -111,8 +113,8 @@ func (a *attestedNodes) updateCache(ctx context.Context) error {
// were skipped over and need to be queued in case they show up later.
// This can happen when a long running transaction allocates an event ID but a shorter transaction
// comes in after, allocates and commits the ID first. If a read comes in at this moment, the event id for
// the longer running transaction will be skipped over
if !a.firstEventTime.IsZero() && event.EventID != a.lastEventID+1 {
// the longer running transaction will be skipped over.
if !a.firstEventTime.IsZero() {
for i := a.lastEventID + 1; i < event.EventID; i++ {
a.log.WithField(telemetry.EventID, i).Info("Detected skipped attested node event")
a.mu.Lock()
Expand Down Expand Up @@ -143,6 +145,10 @@ func (a *attestedNodes) updateCache(ctx context.Context) error {
return nil
}

// missedStartupEvents will check for any events that arrive with an ID less than the first event ID we receive.
// For example if the first event ID we receive is 3, this function will check for any IDs less than that.
// If event ID 2 comes in later on, due to a long running transaction, this function will update the cache
// with the information from this event. This function will run until time equal to sqlTransactionTimeout has elapsed after startup.
func (a *attestedNodes) missedStartupEvents(ctx context.Context) error {
if a.firstEventTime.IsZero() || a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout {
return nil
Expand Down Expand Up @@ -182,7 +188,6 @@ func (a *attestedNodes) replayMissedEvents(ctx context.Context) {
switch status.Code(err) {
case codes.OK:
case codes.NotFound:
log.Debug("Event not yet populated in database")
continue
default:
log.WithError(err).Error("Failed to fetch info about missed Attested Node event")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestAttestedNodesCacheMissedEventNotFound(t *testing.T) {

attestedNodes.missedEvents[1] = clk.Now()
attestedNodes.replayMissedEvents(ctx)
require.Equal(t, "Event not yet populated in database", hook.LastEntry().Message)
require.Zero(t, hook.Entries)
}

func TestAttestedNodesSavesMissedStartupEvents(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type registrationEntries struct {
sqlTransactionTimeout time.Duration
}

// buildRegistrationEntriesCache Fetches all registration entries and adds them to the cache
// buildRegistrationEntriesCache fetches all registration entries and adds them to the cache.
// It runs once at startup.
func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, pageSize int32, sqlTransactionTimeout time.Duration) (*registrationEntries, error) {
resp, err := ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{})
if err != nil {
Expand All @@ -49,6 +50,8 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger,
firstEventID = event.EventID
firstEventTime = now
} else {
// After getting the first event, search for any gaps in the event stream, from the first event to the last event.
// During each cache refresh cycle, we will check if any of these missed events get populated.
for i := lastEventID + 1; i < event.EventID; i++ {
missedEvents[i] = clk.Now()
}
Expand Down Expand Up @@ -102,11 +105,10 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger,
// updateCache Fetches all the events since the last time this function was running and updates
// the cache with all the changes.
func (a *registrationEntries) updateCache(ctx context.Context) error {
// Process events skipped over previously
if err := a.missedStartupEvents(ctx); err != nil {
a.log.WithError(err).Error("Unable to process missed startup events")
}

// Process events skipped over previously
a.replayMissedEvents(ctx)

req := &datastore.ListRegistrationEntriesEventsRequest{
Expand All @@ -124,8 +126,8 @@ func (a *registrationEntries) updateCache(ctx context.Context) error {
// were skipped over and need to be queued in case they show up later.
// This can happen when a long running transaction allocates an event ID but a shorter transaction
// comes in after, allocates and commits the ID first. If a read comes in at this moment, the event id for
// the longer running transaction will be skipped over
if !a.firstEventTime.IsZero() && event.EventID != a.lastEventID+1 {
// the longer running transaction will be skipped over.
if !a.firstEventTime.IsZero() {
for i := a.lastEventID + 1; i < event.EventID; i++ {
a.log.WithField(telemetry.EventID, i).Info("Detected skipped registration entry event")
a.mu.Lock()
Expand Down Expand Up @@ -156,6 +158,10 @@ func (a *registrationEntries) updateCache(ctx context.Context) error {
return nil
}

// missedStartupEvents will check for any events come in with an ID less than the first event ID we receive.
// For example if the first event ID we receive is 3, this function will check for any IDs less than that.
// If event ID 2 comes in later on, due to a long running transaction, this function will update the cache
// with the information from this event. This function will run until time equal to sqlTransactionTimeout has elapsed after startup.
func (a *registrationEntries) missedStartupEvents(ctx context.Context) error {
if a.firstEventTime.IsZero() || a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout {
return nil
Expand Down Expand Up @@ -195,7 +201,6 @@ func (a *registrationEntries) replayMissedEvents(ctx context.Context) {
switch status.Code(err) {
case codes.OK:
case codes.NotFound:
log.Debug("Event not yet populated in database")
continue
default:
log.WithError(err).Error("Failed to fetch info about missed event")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestRegistrationEntriesCacheMissedEventNotFound(t *testing.T) {

registrationEntries.missedEvents[1] = clk.Now()
registrationEntries.replayMissedEvents(ctx)
require.Equal(t, "Event not yet populated in database", hook.LastEntry().Message)
require.Zero(t, len(hook.Entries))
}

func TestRegistrationEntriesSavesMissedStartupEvents(t *testing.T) {
Expand Down

0 comments on commit 42225bf

Please sign in to comment.