From bb582a188ca02243b3beb73013ec2d6053503179 Mon Sep 17 00:00:00 2001 From: Faisal Memon Date: Wed, 3 Jul 2024 18:32:06 -0700 Subject: [PATCH 1/5] Add comments to events based cache Signed-off-by: Faisal Memon --- .../authorized_entryfetcher_attested_nodes.go | 16 +++++++++++----- ...thorized_entryfetcher_registration_entries.go | 16 +++++++++++----- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go index 3bd125c736..522f793db0 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go +++ b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go @@ -31,7 +31,8 @@ type attestedNodes struct { sqlTransactionTimeout time.Duration } -// buildAttestedNodesCache Fetches all attested nodes and adds the unexpired ones to the cache +// buildAttestedNodesCache Fetches all attested nodes and adds the unexpired ones to the cache. +// It runs once at startup. func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) { resp, err := ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) if err != nil { @@ -49,6 +50,8 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat firstEventID = event.EventID firstEventTime = now } else { + // After getting the first event, search for any gaps in the event stream, from the first event to the last event. + // During each cache refresh cycle, it will check if any of these missed events gets populated. for i := lastEventID + 1; i < event.EventID; i++ { missedEvents[i] = now } @@ -89,11 +92,10 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat // updateCache Fetches all the events since the last time this function was running and updates // the cache with all the changes. func (a *attestedNodes) updateCache(ctx context.Context) error { + // Process events skipped over previously if err := a.missedStartupEvents(ctx); err != nil { a.log.WithError(err).Error("Unable to process missed startup events") } - - // Process events skipped over previously a.replayMissedEvents(ctx) req := &datastore.ListAttestedNodesEventsRequest{ @@ -111,8 +113,8 @@ func (a *attestedNodes) updateCache(ctx context.Context) error { // were skipped over and need to be queued in case they show up later. // This can happen when a long running transaction allocates an event ID but a shorter transaction // comes in after, allocates and commits the ID first. If a read comes in at this moment, the event id for - // the longer running transaction will be skipped over - if !a.firstEventTime.IsZero() && event.EventID != a.lastEventID+1 { + // the longer running transaction will be skipped over. + if !a.firstEventTime.IsZero() { for i := a.lastEventID + 1; i < event.EventID; i++ { a.log.WithField(telemetry.EventID, i).Info("Detected skipped attested node event") a.mu.Lock() @@ -143,6 +145,10 @@ func (a *attestedNodes) updateCache(ctx context.Context) error { return nil } +// missedStartupEvents will check for any events come in with an ID less than the first event ID we receive. +// For example if the first event ID we recieve is 3, this function will check for any IDs less than that. +// If event ID 2 comes in later on, due to a long running transaction, this function will update the cache +// with the information from this event. This function will run the first sqlTransactionTimeout hours after startup. func (a *attestedNodes) missedStartupEvents(ctx context.Context) error { if a.firstEventTime.IsZero() || a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout { return nil diff --git a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go index 0a7f3bf32c..b92698304e 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go +++ b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go @@ -31,7 +31,8 @@ type registrationEntries struct { sqlTransactionTimeout time.Duration } -// buildRegistrationEntriesCache Fetches all registration entries and adds them to the cache +// buildRegistrationEntriesCache Fetches all registration entries and adds them to the cache. +// It runs once at startup. func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, pageSize int32, sqlTransactionTimeout time.Duration) (*registrationEntries, error) { resp, err := ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{}) if err != nil { @@ -49,6 +50,8 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, firstEventID = event.EventID firstEventTime = now } else { + // After getting the first event, search for any gaps in the event stream, from the first event to the last event. + // During each cache refresh cycle, it will check if any of these missed events gets populated. for i := lastEventID + 1; i < event.EventID; i++ { missedEvents[i] = clk.Now() } @@ -102,11 +105,10 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, // updateCache Fetches all the events since the last time this function was running and updates // the cache with all the changes. func (a *registrationEntries) updateCache(ctx context.Context) error { + // Process events skipped over previously if err := a.missedStartupEvents(ctx); err != nil { a.log.WithError(err).Error("Unable to process missed startup events") } - - // Process events skipped over previously a.replayMissedEvents(ctx) req := &datastore.ListRegistrationEntriesEventsRequest{ @@ -124,8 +126,8 @@ func (a *registrationEntries) updateCache(ctx context.Context) error { // were skipped over and need to be queued in case they show up later. // This can happen when a long running transaction allocates an event ID but a shorter transaction // comes in after, allocates and commits the ID first. If a read comes in at this moment, the event id for - // the longer running transaction will be skipped over - if !a.firstEventTime.IsZero() && event.EventID != a.lastEventID+1 { + // the longer running transaction will be skipped over. + if !a.firstEventTime.IsZero() { for i := a.lastEventID + 1; i < event.EventID; i++ { a.log.WithField(telemetry.EventID, i).Info("Detected skipped registration entry event") a.mu.Lock() @@ -156,6 +158,10 @@ func (a *registrationEntries) updateCache(ctx context.Context) error { return nil } +// missedStartupEvents will check for any events come in with an ID less than the first event ID we receive. +// For example if the first event ID we recieve is 3, this function will check for any IDs less than that. +// If event ID 2 comes in later on, due to a long running transaction, this function will update the cache +// with the information from this event. This function will run the first sqlTransactionTimeout hours after startup. func (a *registrationEntries) missedStartupEvents(ctx context.Context) error { if a.firstEventTime.IsZero() || a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout { return nil From 029d35c8598d9f5b4d724c0b6f589e09ebd6908b Mon Sep 17 00:00:00 2001 From: Faisal Memon Date: Wed, 24 Jul 2024 09:16:16 -0700 Subject: [PATCH 2/5] Fix typos Signed-off-by: Faisal Memon --- pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go | 2 +- .../endpoints/authorized_entryfetcher_registration_entries.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go index 522f793db0..0be411ed1a 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go +++ b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go @@ -146,7 +146,7 @@ func (a *attestedNodes) updateCache(ctx context.Context) error { } // missedStartupEvents will check for any events come in with an ID less than the first event ID we receive. -// For example if the first event ID we recieve is 3, this function will check for any IDs less than that. +// For example if the first event ID we receive is 3, this function will check for any IDs less than that. // If event ID 2 comes in later on, due to a long running transaction, this function will update the cache // with the information from this event. This function will run the first sqlTransactionTimeout hours after startup. func (a *attestedNodes) missedStartupEvents(ctx context.Context) error { diff --git a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go index b92698304e..bc5a08941f 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go +++ b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go @@ -159,7 +159,7 @@ func (a *registrationEntries) updateCache(ctx context.Context) error { } // missedStartupEvents will check for any events come in with an ID less than the first event ID we receive. -// For example if the first event ID we recieve is 3, this function will check for any IDs less than that. +// For example if the first event ID we receive is 3, this function will check for any IDs less than that. // If event ID 2 comes in later on, due to a long running transaction, this function will update the cache // with the information from this event. This function will run the first sqlTransactionTimeout hours after startup. func (a *registrationEntries) missedStartupEvents(ctx context.Context) error { From 21f190937bcf7aa12c1b308313f0711c0fc7bc51 Mon Sep 17 00:00:00 2001 From: Faisal Memon Date: Wed, 31 Jul 2024 13:50:53 -0700 Subject: [PATCH 3/5] Remove scrolling debug log Signed-off-by: Faisal Memon --- pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go | 1 - .../endpoints/authorized_entryfetcher_attested_nodes_test.go | 2 +- .../endpoints/authorized_entryfetcher_registration_entries.go | 1 - .../authorized_entryfetcher_registration_entries_test.go | 2 +- 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go index 0be411ed1a..4207cd4fb8 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go +++ b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go @@ -188,7 +188,6 @@ func (a *attestedNodes) replayMissedEvents(ctx context.Context) { switch status.Code(err) { case codes.OK: case codes.NotFound: - log.Debug("Event not yet populated in database") continue default: log.WithError(err).Error("Failed to fetch info about missed Attested Node event") diff --git a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes_test.go b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes_test.go index 4f9b60d8e9..4dc04bf0b2 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes_test.go +++ b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes_test.go @@ -95,7 +95,7 @@ func TestAttestedNodesCacheMissedEventNotFound(t *testing.T) { attestedNodes.missedEvents[1] = clk.Now() attestedNodes.replayMissedEvents(ctx) - require.Equal(t, "Event not yet populated in database", hook.LastEntry().Message) + require.Zero(t, hook.Entries) } func TestAttestedNodesSavesMissedStartupEvents(t *testing.T) { diff --git a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go index bc5a08941f..3aa2e99bdc 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go +++ b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go @@ -201,7 +201,6 @@ func (a *registrationEntries) replayMissedEvents(ctx context.Context) { switch status.Code(err) { case codes.OK: case codes.NotFound: - log.Debug("Event not yet populated in database") continue default: log.WithError(err).Error("Failed to fetch info about missed event") diff --git a/pkg/server/endpoints/authorized_entryfetcher_registration_entries_test.go b/pkg/server/endpoints/authorized_entryfetcher_registration_entries_test.go index 1255048fc1..ac5415a410 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_registration_entries_test.go +++ b/pkg/server/endpoints/authorized_entryfetcher_registration_entries_test.go @@ -111,7 +111,7 @@ func TestRegistrationEntriesCacheMissedEventNotFound(t *testing.T) { registrationEntries.missedEvents[1] = clk.Now() registrationEntries.replayMissedEvents(ctx) - require.Equal(t, "Event not yet populated in database", hook.LastEntry().Message) + require.Zero(t, len(hook.Entries)) } func TestRegistrationEntriesSavesMissedStartupEvents(t *testing.T) { From 0be37926094cc7c4611dc142efe735f46abf0ef2 Mon Sep 17 00:00:00 2001 From: Faisal Memon Date: Mon, 19 Aug 2024 13:39:38 -0700 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Andrew Harding Signed-off-by: Faisal Memon --- .../endpoints/authorized_entryfetcher_attested_nodes.go | 6 +++--- .../authorized_entryfetcher_registration_entries.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go index 4207cd4fb8..69df061270 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go +++ b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go @@ -31,7 +31,7 @@ type attestedNodes struct { sqlTransactionTimeout time.Duration } -// buildAttestedNodesCache Fetches all attested nodes and adds the unexpired ones to the cache. +// buildAttestedNodesCache fetches all attested nodes and adds the unexpired ones to the cache. // It runs once at startup. func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) { resp, err := ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) @@ -51,7 +51,7 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat firstEventTime = now } else { // After getting the first event, search for any gaps in the event stream, from the first event to the last event. - // During each cache refresh cycle, it will check if any of these missed events gets populated. + // During each cache refresh cycle, we will check if any of these missed events get populated. for i := lastEventID + 1; i < event.EventID; i++ { missedEvents[i] = now } @@ -148,7 +148,7 @@ func (a *attestedNodes) updateCache(ctx context.Context) error { // missedStartupEvents will check for any events come in with an ID less than the first event ID we receive. // For example if the first event ID we receive is 3, this function will check for any IDs less than that. // If event ID 2 comes in later on, due to a long running transaction, this function will update the cache -// with the information from this event. This function will run the first sqlTransactionTimeout hours after startup. +// with the information from this event. This function will run until time equal to sqlTransactionTimeout has elapsed after startup. func (a *attestedNodes) missedStartupEvents(ctx context.Context) error { if a.firstEventTime.IsZero() || a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout { return nil diff --git a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go index 3aa2e99bdc..bffcd20164 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go +++ b/pkg/server/endpoints/authorized_entryfetcher_registration_entries.go @@ -31,7 +31,7 @@ type registrationEntries struct { sqlTransactionTimeout time.Duration } -// buildRegistrationEntriesCache Fetches all registration entries and adds them to the cache. +// buildRegistrationEntriesCache fetches all registration entries and adds them to the cache. // It runs once at startup. func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, pageSize int32, sqlTransactionTimeout time.Duration) (*registrationEntries, error) { resp, err := ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{}) @@ -51,7 +51,7 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, firstEventTime = now } else { // After getting the first event, search for any gaps in the event stream, from the first event to the last event. - // During each cache refresh cycle, it will check if any of these missed events gets populated. + // During each cache refresh cycle, we will check if any of these missed events get populated. for i := lastEventID + 1; i < event.EventID; i++ { missedEvents[i] = clk.Now() } @@ -161,7 +161,7 @@ func (a *registrationEntries) updateCache(ctx context.Context) error { // missedStartupEvents will check for any events come in with an ID less than the first event ID we receive. // For example if the first event ID we receive is 3, this function will check for any IDs less than that. // If event ID 2 comes in later on, due to a long running transaction, this function will update the cache -// with the information from this event. This function will run the first sqlTransactionTimeout hours after startup. +// with the information from this event. This function will run until time equal to sqlTransactionTimeout has elapsed after startup. func (a *registrationEntries) missedStartupEvents(ctx context.Context) error { if a.firstEventTime.IsZero() || a.clk.Now().Sub(a.firstEventTime) > a.sqlTransactionTimeout { return nil From e54bfe3d5d2aabd42f44004941fe5519fbed2163 Mon Sep 17 00:00:00 2001 From: Faisal Memon Date: Mon, 19 Aug 2024 13:39:56 -0700 Subject: [PATCH 5/5] Update pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go Co-authored-by: Andrew Harding Signed-off-by: Faisal Memon --- pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go index 69df061270..32199a91e8 100644 --- a/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go +++ b/pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go @@ -145,7 +145,7 @@ func (a *attestedNodes) updateCache(ctx context.Context) error { return nil } -// missedStartupEvents will check for any events come in with an ID less than the first event ID we receive. +// missedStartupEvents will check for any events that arrive with an ID less than the first event ID we receive. // For example if the first event ID we receive is 3, this function will check for any IDs less than that. // If event ID 2 comes in later on, due to a long running transaction, this function will update the cache // with the information from this event. This function will run until time equal to sqlTransactionTimeout has elapsed after startup.