Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VAULT-14735: write mock activity log entity files #20702

Merged
merged 2 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 64 additions & 49 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,67 +409,82 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for

// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber)
_, err := a.saveSegmentEntitiesInternal(ctx, a.currentSegment, force)
if err != nil {
return err
}
_, err = a.saveSegmentTokensInternal(ctx, a.currentSegment, force)
return err
}

func (a *ActivityLog) saveSegmentTokensInternal(ctx context.Context, currentSegment segmentInfo, force bool) (string, error) {
if len(currentSegment.tokenCount.CountByNamespaceID) == 0 && !force {
return "", nil
}
// RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed
tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, a.currentSegment.startTimestamp)
tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, currentSegment.startTimestamp)
// We must still allow for the tokenCount of the current segment to
// be written to storage, since if we remove this code we will incur
// data loss for one segment's worth of TWEs.
// We can get away with simply using the oldest version stored because
// the storing of versions was introduced at the same time as this code.
oldestVersion, oldestUpgradeTime, err := a.core.FindOldestVersionTimestamp()
switch {
case err != nil:
a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error()))
case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 &&
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(time.Now())):
a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion))
default:
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
a.logger.Info("storing nonzero token count")
}
}
tokenCount, err := proto.Marshal(a.currentSegment.tokenCount)
if err != nil {
return "", err
}

a.logger.Trace("writing segment", "path", tokenPath)
err = a.view.Put(ctx, &logical.StorageEntry{
Key: tokenPath,
Value: tokenCount,
})
if err != nil {
return "", err
}

return tokenPath, nil
}

func (a *ActivityLog) saveSegmentEntitiesInternal(ctx context.Context, currentSegment segmentInfo, force bool) (string, error) {
mpalmi marked this conversation as resolved.
Show resolved Hide resolved
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber)

for _, client := range a.currentSegment.currentClients.Clients {
// Explicitly catch and throw clear error message if client ID creation and storage
// results in a []byte that doesn't assert into a valid string.
if !utf8.ValidString(client.ClientID) {
return fmt.Errorf("client ID %q is not a valid string:", client.ClientID)
return "", fmt.Errorf("client ID %q is not a valid string:", client.ClientID)
}
}

if len(a.currentSegment.currentClients.Clients) > 0 || force {
clients, err := proto.Marshal(a.currentSegment.currentClients)
if err != nil {
return err
}

a.logger.Trace("writing segment", "path", entityPath)
err = a.view.Put(ctx, &logical.StorageEntry{
Key: entityPath,
Value: clients,
})
if err != nil {
return err
}
if len(currentSegment.currentClients.Clients) == 0 && !force {
return "", nil
}
clients, err := proto.Marshal(currentSegment.currentClients)
if err != nil {
return entityPath, err
}

// We must still allow for the tokenCount of the current segment to
// be written to storage, since if we remove this code we will incur
// data loss for one segment's worth of TWEs.
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 || force {
// We can get away with simply using the oldest version stored because
// the storing of versions was introduced at the same time as this code.
oldestVersion, oldestUpgradeTime, err := a.core.FindOldestVersionTimestamp()
switch {
case err != nil:
a.logger.Error(fmt.Sprintf("unable to retrieve oldest version timestamp: %s", err.Error()))
case len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 &&
(oldestUpgradeTime.Add(time.Duration(trackedTWESegmentPeriod * time.Hour)).Before(a.clock.Now())):
a.logger.Error(fmt.Sprintf("storing nonzero token count over a month after vault was upgraded to %s", oldestVersion))
default:
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
a.logger.Info("storing nonzero token count")
}
}
tokenCount, err := proto.Marshal(a.currentSegment.tokenCount)
if err != nil {
return err
}

a.logger.Trace("writing segment", "path", tokenPath)
err = a.view.Put(ctx, &logical.StorageEntry{
Key: tokenPath,
Value: tokenCount,
})
if err != nil {
return err
}
a.logger.Trace("writing segment", "path", entityPath)
err = a.view.Put(ctx, &logical.StorageEntry{
Key: entityPath,
Value: clients,
})
if err != nil {
return "", err
}
return nil
return entityPath, err
}

// parseSegmentNumberFromPath returns the segment number from a path
Expand Down
73 changes: 72 additions & 1 deletion vault/logical_system_activity_write_testonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ package vault
import (
"context"
"fmt"
"sync"
"time"

"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/framework"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
Expand Down Expand Up @@ -53,7 +56,34 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo
if len(input.Data) == 0 {
return logical.ErrorResponse("Missing required \"data\" values"), logical.ErrInvalidRequest
}
return nil, nil

numMonths := 0
for _, month := range input.Data {
if int(month.GetMonthsAgo()) > numMonths {
numMonths = int(month.GetMonthsAgo())
}
}
generated := newMultipleMonthsActivityClients(numMonths + 1)
for _, month := range input.Data {
err := generated.processMonth(ctx, b.Core, month)
if err != nil {
return logical.ErrorResponse("failed to process data for month %d", month.GetMonthsAgo()), err
}
}

opts := make(map[generation.WriteOptions]struct{}, len(input.Write))
for _, opt := range input.Write {
opts[opt] = struct{}{}
}
paths, err := generated.write(ctx, opts, b.Core.activityLog)
if err != nil {
return logical.ErrorResponse("failed to write data"), err
}
return &logical.Response{
Data: map[string]interface{}{
"paths": paths,
},
}, nil
}

// singleMonthActivityClients holds a single month's client IDs, in the order they were seen
Expand Down Expand Up @@ -287,6 +317,47 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g
return nil
}

func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) {
now := timeutil.StartOfMonth(time.Now().UTC())
paths := []string{}
for i, month := range m.months {
var timestamp time.Time
if i > 0 {
timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
} else {
timestamp = now
}
segments, err := month.populateSegments()
if err != nil {
return nil, err
}
for segmentIndex, segment := range segments {
if _, ok := opts[generation.WriteOptions_WRITE_ENTITIES]; ok {
if segment == nil {
// skip the index
continue
}
entityPath, err := activityLog.saveSegmentEntitiesInternal(ctx, segmentInfo{
startTimestamp: timestamp.Unix(),
currentClients: &activity.EntityActivityLog{Clients: segment},
clientSequenceNumber: uint64(segmentIndex),
tokenCount: &activity.TokenCount{},
}, true)
if err != nil {
return nil, err
}
paths = append(paths, entityPath)
}
}
}
wg := sync.WaitGroup{}
err := activityLog.refreshFromStoredLog(ctx, &wg, now)
if err != nil {
return nil, err
}
return paths, nil
}

func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivityClients {
m := &multipleMonthsActivityClients{
months: make([]*singleMonthActivityClients, numberOfMonths),
Expand Down
Loading