Skip to content

Commit

Permalink
tests for writing entity segments
Browse files Browse the repository at this point in the history
  • Loading branch information
miagilepner committed May 24, 2023
1 parent 561517a commit 2355aed
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 62 deletions.
122 changes: 67 additions & 55 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
}
a.currentSegment.currentClients.Clients = segmentClients

_, err := a.saveSegmentInternal(ctx, a.currentSegment, force)
err := a.saveCurrentSegmentInternal(ctx, force)
if err != nil {
// The current fragment(s) have already been placed into the in-memory
// segment, but we may lose any excess (in excessClients).
Expand All @@ -399,7 +399,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
excessClients = excessClients[:activitySegmentClientCapacity]
}
a.currentSegment.currentClients.Clients = excessClients
_, err := a.saveSegmentInternal(ctx, a.currentSegment, force)
err := a.saveCurrentSegmentInternal(ctx, force)
if err != nil {
return err
}
Expand All @@ -408,71 +408,83 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
}

// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveSegmentInternal(ctx context.Context, currentSegment segmentInfo, force bool) ([]string, error) {
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber)
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
_, err := a.saveSegmentEntitiesInternal(ctx, a.currentSegment, force)
if err != nil {
return err
}
_, err = a.saveSegmentTokensInternal(ctx, a.currentSegment, force)
return err
}

func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegment segmentInfo, force bool) (string, error) {
if len(currentSegment.tokenCount.CountByNamespaceID) == 0 && !force {
return "", nil
}
// RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed
tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, currentSegment.startTimestamp)
paths := make([]string, 0, 2)
// We must still allow for the tokenCount of the current segment to
// be written to storage, since if we remove this code we will incur
// data loss for one segment's worth of TWEs.
// We can get away with simply using the oldest version stored because
// the storing of versions was introduced at the same time as this code.
oldestVersion, oldestUpgradeTime, err := a.core.FindOldestVersionTimestamp()
switch {
case err != nil:
a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error()))
case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 &&
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(time.Now())):
a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion))
default:
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
a.logger.Info("storing nonzero token count")
}
}
tokenCount, err := proto.Marshal(a.currentSegment.tokenCount)
if err != nil {
return "", err
}

a.logger.Trace("writing segment", "path", tokenPath)
err = a.view.Put(ctx, &logical.StorageEntry{
Key: tokenPath,
Value: tokenCount,
})
if err != nil {
return "", err
}

return tokenPath, nil
}

func (a *ActivityLog) saveSegmentEntitiesInternal(ctx context.Context, currentSegment segmentInfo, force bool) (string, error) {
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber)

for _, client := range a.currentSegment.currentClients.Clients {
// Explicitly catch and throw clear error message if client ID creation and storage
// results in a []byte that doesn't assert into a valid string.
if !utf8.ValidString(client.ClientID) {
return nil, fmt.Errorf("client ID %q is not a valid string:", client.ClientID)
return "", fmt.Errorf("client ID %q is not a valid string:", client.ClientID)
}
}

if len(currentSegment.currentClients.Clients) > 0 || force {
clients, err := proto.Marshal(currentSegment.currentClients)
if err != nil {
return nil, err
}

a.logger.Trace("writing segment", "path", entityPath)
err = a.view.Put(ctx, &logical.StorageEntry{
Key: entityPath,
Value: clients,
})
if err != nil {
return nil, err
}
paths = append(paths, entityPath)
if len(currentSegment.currentClients.Clients) == 0 && !force {
return "", nil
}
clients, err := proto.Marshal(currentSegment.currentClients)
if err != nil {
return entityPath, err
}

// We must still allow for the tokenCount of the current segment to
// be written to storage, since if we remove this code we will incur
// data loss for one segment's worth of TWEs.
if len(currentSegment.tokenCount.CountByNamespaceID) > 0 || force {
// We can get away with simply using the oldest version stored because
// the storing of versions was introduced at the same time as this code.
oldestVersion, oldestUpgradeTime, err := a.core.FindOldestVersionTimestamp()
switch {
case err != nil:
a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error()))
case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 &&
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(a.clock.Now())):
a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion))
default:
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
a.logger.Info("storing nonzero token count")
}
}
tokenCount, err := proto.Marshal(a.currentSegment.tokenCount)
if err != nil {
return nil, err
}

a.logger.Trace("writing segment", "path", tokenPath)
err = a.view.Put(ctx, &logical.StorageEntry{
Key: tokenPath,
Value: tokenCount,
})
if err != nil {
return nil, err
}
paths = append(paths, tokenPath)
a.logger.Trace("writing segment", "path", entityPath)
err = a.view.Put(ctx, &logical.StorageEntry{
Key: entityPath,
Value: clients,
})
if err != nil {
return "", err
}
return paths, nil
return entityPath, err
}

// parseSegmentNumberFromPath returns the segment number from a path
Expand Down Expand Up @@ -1039,7 +1051,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {

if forceSave {
// l is still held here
a.saveSegmentInternal(ctx, a.currentSegment, true)
a.saveCurrentSegmentInternal(ctx, true)
}

a.defaultReportMonths = config.DefaultReportMonths
Expand Down
32 changes: 25 additions & 7 deletions vault/logical_system_activity_write_testonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ package vault
import (
"context"
"fmt"
"sync"
"time"

"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/framework"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
Expand Down Expand Up @@ -60,7 +63,7 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo
numMonths = int(month.GetMonthsAgo())
}
}
generated := newMultipleMonthsActivityClients(numMonths)
generated := newMultipleMonthsActivityClients(numMonths + 1)
for _, month := range input.Data {
err := generated.processMonth(ctx, b.Core, month)
if err != nil {
Expand Down Expand Up @@ -315,28 +318,43 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g
}

func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) {
now := timeutil.StartOfMonth(time.Now().UTC())
paths := []string{}
for _, month := range m.months {
segments := month.populateSegments()
timestamp := 0
for i, month := range m.months {
var timestamp time.Time
if i > 0 {
timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
} else {
timestamp = now
}
segments, err := month.populateSegments()
if err != nil {
return nil, err
}
for segmentIndex, segment := range segments {
if _, ok := opts[generation.WriteOptions_WRITE_ENTITIES]; ok {
if segment == nil {
// skip the index
continue
}
entityPaths, err := activityLog.saveSegmentInternal(ctx, segmentInfo{
startTimestamp: int64(timestamp),
entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{
startTimestamp: timestamp.Unix(),
currentClients: &activity.EntityActivityLog{Clients: segment},
clientSequenceNumber: uint64(segmentIndex),
tokenCount: &activity.TokenCount{},
}, true)
if err != nil {
return nil, err
}
paths = append(paths, entityPaths...)
paths = append(paths, entityPath)
}
}
}
wg := sync.WaitGroup{}
err := activityLog.refreshFromStoredLog(ctx, &wg, now)
if err != nil {
return nil, err
}
return paths, nil
}

Expand Down
Loading

0 comments on commit 2355aed

Please sign in to comment.