Skip to content

Commit

Permalink
Check missed events before start (#5289)
Browse files Browse the repository at this point in the history
Signed-off-by: Faisal Memon <fymemon@yahoo.com>
  • Loading branch information
faisal-memon committed Jul 24, 2024
1 parent 7c5e72e commit 01bedb8
Show file tree
Hide file tree
Showing 9 changed files with 588 additions and 106 deletions.
8 changes: 4 additions & 4 deletions pkg/server/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type ListAttestedNodesResponse struct {

type ListAttestedNodesEventsRequest struct {
GreaterThanEventID uint
LessThanEventID uint
}

type AttestedNodeEvent struct {
Expand All @@ -180,8 +181,7 @@ type AttestedNodeEvent struct {
}

type ListAttestedNodesEventsResponse struct {
Events []AttestedNodeEvent
FirstEventID uint
Events []AttestedNodeEvent
}

type ListBundlesRequest struct {
Expand Down Expand Up @@ -226,6 +226,7 @@ type ListRegistrationEntriesResponse struct {

type ListRegistrationEntriesEventsRequest struct {
GreaterThanEventID uint
LessThanEventID uint
}

type RegistrationEntryEvent struct {
Expand All @@ -234,8 +235,7 @@ type RegistrationEntryEvent struct {
}

type ListRegistrationEntriesEventsResponse struct {
Events []RegistrationEntryEvent
FirstEventID uint
Events []RegistrationEntryEvent
}

type ListFederationRelationshipsRequest struct {
Expand Down
58 changes: 48 additions & 10 deletions pkg/server/datastore/sqlstore/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,8 +1719,20 @@ func createAttestedNodeEvent(tx *gorm.DB, event *datastore.AttestedNodeEvent) er

func listAttestedNodesEvents(tx *gorm.DB, req *datastore.ListAttestedNodesEventsRequest) (*datastore.ListAttestedNodesEventsResponse, error) {
var events []AttestedNodeEvent
if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)

if req.GreaterThanEventID != 0 || req.LessThanEventID != 0 {
query, id, err := buildListEventsQueryString(req.GreaterThanEventID, req.LessThanEventID)
if err != nil {
return nil, sqlError.Wrap(err)
}

if err := tx.Find(&events, query.String(), id).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}
} else {
if err := tx.Find(&events).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}
}

resp := &datastore.ListAttestedNodesEventsResponse{
Expand All @@ -1730,9 +1742,6 @@ func listAttestedNodesEvents(tx *gorm.DB, req *datastore.ListAttestedNodesEvents
resp.Events[i].EventID = event.ID
resp.Events[i].SpiffeID = event.SpiffeID
}
if len(events) > 0 {
resp.FirstEventID = events[0].ID
}

return resp, nil
}
Expand Down Expand Up @@ -4096,8 +4105,20 @@ func deleteRegistrationEntryEvent(tx *gorm.DB, eventID uint) error {

func listRegistrationEntriesEvents(tx *gorm.DB, req *datastore.ListRegistrationEntriesEventsRequest) (*datastore.ListRegistrationEntriesEventsResponse, error) {
var events []RegisteredEntryEvent
if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)

if req.GreaterThanEventID != 0 || req.LessThanEventID != 0 {
query, id, err := buildListEventsQueryString(req.GreaterThanEventID, req.LessThanEventID)
if err != nil {
return nil, sqlError.Wrap(err)
}

if err := tx.Find(&events, query.String(), id).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}
} else {
if err := tx.Find(&events).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}
}

resp := &datastore.ListRegistrationEntriesEventsResponse{
Expand All @@ -4107,9 +4128,6 @@ func listRegistrationEntriesEvents(tx *gorm.DB, req *datastore.ListRegistrationE
resp.Events[i].EventID = event.ID
resp.Events[i].EntryID = event.EntryID
}
if len(events) > 0 {
resp.FirstEventID = events[0].ID
}

return resp, nil
}
Expand All @@ -4122,6 +4140,26 @@ func pruneRegistrationEntriesEvents(tx *gorm.DB, olderThan time.Duration) error
return nil
}

func buildListEventsQueryString(greaterThanEventID, lessThanEventID uint) (*strings.Builder, uint, error) {
if greaterThanEventID != 0 && lessThanEventID != 0 {
return nil, 0, errors.New("can't set both greater and less than event id")
}

var id uint
query := new(strings.Builder)
query.WriteString("id ")
if greaterThanEventID != 0 {
query.WriteString("> ?")
id = greaterThanEventID
}
if lessThanEventID != 0 {
query.WriteString("< ?")
id = lessThanEventID
}

return query, id, nil
}

func createJoinToken(tx *gorm.DB, token *datastore.JoinToken) error {
t := JoinToken{
Token: token.Token,
Expand Down
86 changes: 69 additions & 17 deletions pkg/server/datastore/sqlstore/sqlstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,36 +1539,62 @@ func (s *PluginSuite) TestListAttestedNodesEvents() {
tests := []struct {
name string
greaterThanEventID uint
lessThanEventID uint
expectedEvents []datastore.AttestedNodeEvent
expectedFirstEventID uint
expectedLastEventID uint
expectedErr string
}{
{
name: "All Events",
greaterThanEventID: 0,
expectedFirstEventID: 1,
expectedLastEventID: uint(len(expectedEvents)),
expectedEvents: expectedEvents,
},
{
name: "Half of the Events",
name: "Greater than half of the Events",
greaterThanEventID: uint(len(expectedEvents) / 2),
expectedFirstEventID: uint(len(expectedEvents)/2) + 1,
expectedLastEventID: uint(len(expectedEvents)),
expectedEvents: expectedEvents[len(expectedEvents)/2:],
},
{
name: "None of the Events",
greaterThanEventID: uint(len(expectedEvents)),
expectedFirstEventID: 0,
expectedEvents: []datastore.AttestedNodeEvent{},
name: "Less than half of the Events",
lessThanEventID: uint(len(expectedEvents) / 2),
expectedFirstEventID: 1,
expectedLastEventID: uint(len(expectedEvents)/2) - 1,
expectedEvents: expectedEvents[:len(expectedEvents)/2-1],
},
{
name: "Greater than largest Event ID",
greaterThanEventID: uint(len(expectedEvents)),
expectedEvents: []datastore.AttestedNodeEvent{},
},
{
name: "Setting both greater and less than",
greaterThanEventID: 1,
lessThanEventID: 1,
expectedErr: "rpc error: code = Unknown desc = datastore-sql: can't set both greater and less than event id",
},
}
for _, test := range tests {
s.T().Run(test.name, func(t *testing.T) {
resp, err := s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{
GreaterThanEventID: test.greaterThanEventID,
LessThanEventID: test.lessThanEventID,
})
s.Assert().NoError(err)
s.Assert().Equal(test.expectedFirstEventID, resp.FirstEventID)
s.Assert().Equal(test.expectedEvents, resp.Events)
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
return
}
s.Require().NoError(err)

s.Require().Equal(test.expectedEvents, resp.Events)
if len(resp.Events) > 0 {
s.Require().Equal(test.expectedFirstEventID, resp.Events[0].EventID)
s.Require().Equal(test.expectedLastEventID, resp.Events[len(resp.Events)-1].EventID)
}
})
}
}
Expand Down Expand Up @@ -3998,36 +4024,62 @@ func (s *PluginSuite) TestListRegistrationEntriesEvents() {
tests := []struct {
name string
greaterThanEventID uint
lessThanEventID uint
expectedEvents []datastore.RegistrationEntryEvent
expectedFirstEventID uint
expectedLastEventID uint
expectedErr string
}{
{
name: "All Events",
greaterThanEventID: 0,
expectedFirstEventID: 1,
expectedLastEventID: uint(len(expectedEvents)),
expectedEvents: expectedEvents,
},
{
name: "Half of the Events",
greaterThanEventID: 2,
expectedFirstEventID: 3,
expectedEvents: expectedEvents[2:],
name: "Greater than half of the Events",
greaterThanEventID: uint(len(expectedEvents) / 2),
expectedFirstEventID: uint(len(expectedEvents)/2) + 1,
expectedLastEventID: uint(len(expectedEvents)),
expectedEvents: expectedEvents[len(expectedEvents)/2:],
},
{
name: "Less than half of the Events",
lessThanEventID: uint(len(expectedEvents) / 2),
expectedFirstEventID: 1,
expectedLastEventID: uint(len(expectedEvents)/2) - 1,
expectedEvents: expectedEvents[:len(expectedEvents)/2-1],
},
{
name: "Greater than largest Event ID",
greaterThanEventID: 4,
expectedEvents: []datastore.RegistrationEntryEvent{},
},
{
name: "None of the Events",
greaterThanEventID: 4,
expectedFirstEventID: 0,
expectedEvents: []datastore.RegistrationEntryEvent{},
name: "Setting both greater and less than",
greaterThanEventID: 1,
lessThanEventID: 1,
expectedErr: "rpc error: code = Unknown desc = datastore-sql: can't set both greater and less than event id",
},
}
for _, test := range tests {
s.T().Run(test.name, func(t *testing.T) {
resp, err = s.ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{
GreaterThanEventID: test.greaterThanEventID,
LessThanEventID: test.lessThanEventID,
})
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
return
}
s.Require().NoError(err)
s.Require().Equal(test.expectedFirstEventID, resp.FirstEventID)

s.Require().Equal(test.expectedEvents, resp.Events)
if len(resp.Events) > 0 {
s.Require().Equal(test.expectedFirstEventID, resp.Events[0].EventID)
s.Require().Equal(test.expectedLastEventID, resp.Events[len(resp.Events)-1].EventID)
}
})
}
}
Expand Down
36 changes: 17 additions & 19 deletions pkg/server/endpoints/authorized_entryfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,35 @@ type AuthorizedEntryFetcherWithEventsBasedCache struct {
log logrus.FieldLogger
ds datastore.DataStore

cacheReloadInterval time.Duration
pruneEventsOlderThan time.Duration
sqlTransactionTimeout time.Duration
cacheReloadInterval time.Duration
pruneEventsOlderThan time.Duration

registrationEntries eventsBasedCache
attestedNodes eventsBasedCache
}

type eventsBasedCache interface {
updateCache(ctx context.Context) error
pruneMissedEvents(sqlTransactionTimeout time.Duration)
pruneMissedEvents()
}

func NewAuthorizedEntryFetcherWithEventsBasedCache(ctx context.Context, log logrus.FieldLogger, clk clock.Clock, ds datastore.DataStore, cacheReloadInterval, pruneEventsOlderThan, sqlTransactionTimeout time.Duration) (*AuthorizedEntryFetcherWithEventsBasedCache, error) {
log.Info("Building event-based in-memory entry cache")
cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk)
cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk, sqlTransactionTimeout)
if err != nil {
return nil, err
}
log.Info("Completed building event-based in-memory entry cache")

return &AuthorizedEntryFetcherWithEventsBasedCache{
cache: cache,
clk: clk,
log: log,
ds: ds,
cacheReloadInterval: cacheReloadInterval,
pruneEventsOlderThan: pruneEventsOlderThan,
sqlTransactionTimeout: sqlTransactionTimeout,
registrationEntries: registrationEntries,
attestedNodes: attestedNodes,
cache: cache,
clk: clk,
log: log,
ds: ds,
cacheReloadInterval: cacheReloadInterval,
pruneEventsOlderThan: pruneEventsOlderThan,
registrationEntries: registrationEntries,
attestedNodes: attestedNodes,
}, nil
}

Expand Down Expand Up @@ -100,8 +98,8 @@ func (a *AuthorizedEntryFetcherWithEventsBasedCache) pruneEvents(ctx context.Con
pruneRegistrationEntriesEventsErr := a.ds.PruneRegistrationEntriesEvents(ctx, olderThan)
pruneAttestedNodesEventsErr := a.ds.PruneAttestedNodesEvents(ctx, olderThan)

a.registrationEntries.pruneMissedEvents(a.sqlTransactionTimeout)
a.attestedNodes.pruneMissedEvents(a.sqlTransactionTimeout)
a.registrationEntries.pruneMissedEvents()
a.attestedNodes.pruneMissedEvents()

return errors.Join(pruneRegistrationEntriesEventsErr, pruneAttestedNodesEventsErr)
}
Expand All @@ -113,15 +111,15 @@ func (a *AuthorizedEntryFetcherWithEventsBasedCache) updateCache(ctx context.Con
return errors.Join(updateRegistrationEntriesCacheErr, updateAttestedNodesCacheErr)
}

func buildCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) {
func buildCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, sqlTransactionTimeout time.Duration) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) {
cache := authorizedentries.NewCache(clk)

registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize)
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, sqlTransactionTimeout)
if err != nil {
return nil, nil, nil, err
}

attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache)
attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, sqlTransactionTimeout)
if err != nil {
return nil, nil, nil, err
}
Expand Down
Loading

0 comments on commit 01bedb8

Please sign in to comment.