Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comments to events based cache code #5327

Merged
merged 9 commits into from
Aug 20, 2024
17 changes: 11 additions & 6 deletions pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type attestedNodes struct {
sqlTransactionTimeout time.Duration
}

// buildAttestedNodesCache Fetches all attested nodes and adds the unexpired ones to the cache
// buildAttestedNodesCache fetches all attested nodes and adds the unexpired ones to the cache.
// It runs once at startup.
func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) {
resp, err := ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{})
if err != nil {
Expand All @@ -49,6 +50,8 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat
firstEventID = event.EventID
firstEventTime = now
} else {
// After getting the first event, search for any gaps in the event stream, from the first event to the last event.
// During each cache refresh cycle, we will check if any of these missed events get populated.
for i := lastEventID + 1; i < event.EventID; i++ {
missedEvents[i] = now
}
Expand Down Expand Up @@ -89,11 +92,10 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat
// updateCache Fetches all the events since the last time this function was running and updates
// the cache with all the changes.
func (a *attestedNodes) updateCache(ctx context.Context) error {
// Process events skipped over previously
if err := a.missedStartupEvents(ctx); err != nil {
a.log.WithError(err).Error("Unable to process missed startup events")
}

// Process events skipped over previously
a.replayMissedEvents(ctx)

req := &datastore.ListAttestedNodesEventsRequest{
Expand All @@ -111,8 +113,8 @@ func (a *attestedNodes) updateCache(ctx context.Context) error {
// were skipped over and need to be queued in case they show up later.
// This can happen when a long running transaction allocates an event ID but a shorter transaction
// comes in after, allocates and commits the ID first. If a read comes in at this moment, the event id for
// the longer running transaction will be skipped over
if !a.firstEventTime.IsZero() && event.EventID != a.lastEventID+1 {
// the longer running transaction will be skipped over.
if !a.firstEventTime.IsZero() {
Comment on lines -114 to +117
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following same style suggested by this code review comment.

for i := a.lastEventID + 1; i < event.EventID; i++ {
a.log.WithField(telemetry.EventID, i).Info("Detected skipped attested node event")
a.mu.Lock()
Expand Down Expand Up @@ -143,6 +145,10 @@ func (a *attestedNodes) updateCache(ctx context.Context) error {
return nil
}

// missedStartupEvents will check for any events that arrive with an ID less than the first event ID we receive.
// For example if the first event ID we receive is 3, this function will check for any IDs less than that.
// If event ID 2 comes in later on, due to a long running transaction, this function will update the cache
// with the information from this event. This function will run until time equal to sqlTransactionTimeout has elapsed after startup.
func (a *attestedNodes) missedStartupEvents(ctx context.Context) error {
if a.firstEventTime.IsZero() || a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout {
return nil
Expand Down Expand Up @@ -182,7 +188,6 @@ func (a *attestedNodes) replayMissedEvents(ctx context.Context) {
switch status.Code(err) {
case codes.OK:
case codes.NotFound:
log.Debug("Event not yet populated in database")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do two things here:

  1. Leave a comment as to why we don't log anything (noise that was brought up)
  2. Make this a metric? That way at least we're not blind to the situation but it won't muddy our logs.

Side quesiton: Where did we land on this being downgraded to a Trace log to reduce the noise instead of getting rid of it altogether?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @stevend-uber for the feedback. Ill work on making the metrics changes. I didnt understand the side question though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For #1: Mostly why we don't do anything for case codes.NotFound:.

Upon initial reading of the code without your or my context, another contributor is likely to add a log here and we're back where we started :)

It's not blocking so I'm cool just adding it later :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging for not found is likely to be better handled in #5341 as part of the backoff algorithm, and we intend to add a guage to track the entries in #4720. With these items, which are more specific, I'll ask that we not attempt non-comment changes in this PR, as it makes the PR lose focus and then velocity reduces.

If the two PRs mentioned above don't handle the issue, please search #5327, #5342, #4720, or #5349 to see if it is covered there, and if not, open a new PR.

continue
default:
log.WithError(err).Error("Failed to fetch info about missed Attested Node event")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestAttestedNodesCacheMissedEventNotFound(t *testing.T) {

attestedNodes.missedEvents[1] = clk.Now()
attestedNodes.replayMissedEvents(ctx)
require.Equal(t, "Event not yet populated in database", hook.LastEntry().Message)
require.Zero(t, hook.Entries)
}

func TestAttestedNodesSavesMissedStartupEvents(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type registrationEntries struct {
sqlTransactionTimeout time.Duration
}

// buildRegistrationEntriesCache Fetches all registration entries and adds them to the cache
// buildRegistrationEntriesCache fetches all registration entries and adds them to the cache.
// It runs once at startup.
func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, pageSize int32, sqlTransactionTimeout time.Duration) (*registrationEntries, error) {
resp, err := ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{})
if err != nil {
Expand All @@ -49,6 +50,8 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger,
firstEventID = event.EventID
firstEventTime = now
} else {
// After getting the first event, search for any gaps in the event stream, from the first event to the last event.
// During each cache refresh cycle, we will check if any of these missed events get populated.
for i := lastEventID + 1; i < event.EventID; i++ {
missedEvents[i] = clk.Now()
}
Expand Down Expand Up @@ -102,11 +105,10 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger,
// updateCache Fetches all the events since the last time this function was running and updates
// the cache with all the changes.
func (a *registrationEntries) updateCache(ctx context.Context) error {
// Process events skipped over previously
if err := a.missedStartupEvents(ctx); err != nil {
a.log.WithError(err).Error("Unable to process missed startup events")
}

// Process events skipped over previously
a.replayMissedEvents(ctx)

req := &datastore.ListRegistrationEntriesEventsRequest{
Expand All @@ -124,8 +126,8 @@ func (a *registrationEntries) updateCache(ctx context.Context) error {
// were skipped over and need to be queued in case they show up later.
// This can happen when a long running transaction allocates an event ID but a shorter transaction
// comes in after, allocates and commits the ID first. If a read comes in at this moment, the event id for
// the longer running transaction will be skipped over
if !a.firstEventTime.IsZero() && event.EventID != a.lastEventID+1 {
// the longer running transaction will be skipped over.
if !a.firstEventTime.IsZero() {
Comment on lines -127 to +130
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following same style suggested by this code review comment.

for i := a.lastEventID + 1; i < event.EventID; i++ {
a.log.WithField(telemetry.EventID, i).Info("Detected skipped registration entry event")
a.mu.Lock()
Expand Down Expand Up @@ -156,6 +158,10 @@ func (a *registrationEntries) updateCache(ctx context.Context) error {
return nil
}

// missedStartupEvents will check for any events come in with an ID less than the first event ID we receive.
// For example if the first event ID we receive is 3, this function will check for any IDs less than that.
// If event ID 2 comes in later on, due to a long running transaction, this function will update the cache
// with the information from this event. This function will run until time equal to sqlTransactionTimeout has elapsed after startup.
func (a *registrationEntries) missedStartupEvents(ctx context.Context) error {
if a.firstEventTime.IsZero() || a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout {
return nil
Expand Down Expand Up @@ -195,7 +201,6 @@ func (a *registrationEntries) replayMissedEvents(ctx context.Context) {
switch status.Code(err) {
case codes.OK:
case codes.NotFound:
log.Debug("Event not yet populated in database")
continue
default:
log.WithError(err).Error("Failed to fetch info about missed event")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestRegistrationEntriesCacheMissedEventNotFound(t *testing.T) {

registrationEntries.missedEvents[1] = clk.Now()
registrationEntries.replayMissedEvents(ctx)
require.Equal(t, "Event not yet populated in database", hook.LastEntry().Message)
require.Zero(t, len(hook.Entries))
}

func TestRegistrationEntriesSavesMissedStartupEvents(t *testing.T) {
Expand Down
Loading