Skip to content

Commit

Permalink
add repeated, segmented, and writing
Browse files Browse the repository at this point in the history
  • Loading branch information
miagilepner committed May 23, 2023
1 parent c212cc3 commit b5c5450
Show file tree
Hide file tree
Showing 3 changed files with 422 additions and 60 deletions.
33 changes: 18 additions & 15 deletions vault/activity_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
}
a.currentSegment.currentClients.Clients = segmentClients

err := a.saveCurrentSegmentInternal(ctx, force)
_, err := a.saveSegmentInternal(ctx, a.currentSegment, force)
if err != nil {
// The current fragment(s) have already been placed into the in-memory
// segment, but we may lose any excess (in excessClients).
Expand All @@ -384,7 +384,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
excessClients = excessClients[:activitySegmentClientCapacity]
}
a.currentSegment.currentClients.Clients = excessClients
err := a.saveCurrentSegmentInternal(ctx, force)
_, err := a.saveSegmentInternal(ctx, a.currentSegment, force)
if err != nil {
return err
}
Expand All @@ -393,23 +393,24 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
}

// :force: forces a save of tokens/entities even if the in-memory log is empty
func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool) error {
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, a.currentSegment.startTimestamp, a.currentSegment.clientSequenceNumber)
func (a *ActivityLog) saveSegmentInternal(ctx context.Context, currentSegment segmentInfo, force bool) ([]string, error) {
entityPath := fmt.Sprintf("%s%d/%d", activityEntityBasePath, currentSegment.startTimestamp, currentSegment.clientSequenceNumber)
// RFC (VLT-120) defines this as 1-indexed, but it should be 0-indexed
tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, a.currentSegment.startTimestamp)
tokenPath := fmt.Sprintf("%s%d/0", activityTokenBasePath, currentSegment.startTimestamp)
paths := make([]string, 0, 2)

for _, client := range a.currentSegment.currentClients.Clients {
// Explicitly catch and throw clear error message if client ID creation and storage
// results in a []byte that doesn't assert into a valid string.
if !utf8.ValidString(client.ClientID) {
return fmt.Errorf("client ID %q is not a valid string:", client.ClientID)
return nil, fmt.Errorf("client ID %q is not a valid string:", client.ClientID)
}
}

if len(a.currentSegment.currentClients.Clients) > 0 || force {
clients, err := proto.Marshal(a.currentSegment.currentClients)
if len(currentSegment.currentClients.Clients) > 0 || force {
clients, err := proto.Marshal(currentSegment.currentClients)
if err != nil {
return err
return nil, err
}

a.logger.Trace("writing segment", "path", entityPath)
Expand All @@ -418,14 +419,15 @@ func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool
Value: clients,
})
if err != nil {
return err
return nil, err
}
paths = append(paths, entityPath)
}

// We must still allow for the tokenCount of the current segment to
// be written to storage, since if we remove this code we will incur
// data loss for one segment's worth of TWEs.
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 || force {
if len(currentSegment.tokenCount.CountByNamespaceID) > 0 || force {
// We can get away with simply using the oldest version stored because
// the storing of versions was introduced at the same time as this code.
oldestVersion, oldestUpgradeTime, err := a.core.FindOldestVersionTimestamp()
Expand All @@ -442,7 +444,7 @@ func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool
}
tokenCount, err := proto.Marshal(a.currentSegment.tokenCount)
if err != nil {
return err
return nil, err
}

a.logger.Trace("writing segment", "path", tokenPath)
Expand All @@ -451,10 +453,11 @@ func (a *ActivityLog) saveCurrentSegmentInternal(ctx context.Context, force bool
Value: tokenCount,
})
if err != nil {
return err
return nil, err
}
paths = append(paths, tokenPath)
}
return nil
return paths, nil
}

// parseSegmentNumberFromPath returns the segment number from a path
Expand Down Expand Up @@ -1021,7 +1024,7 @@ func (a *ActivityLog) SetConfig(ctx context.Context, config activityConfig) {

if forceSave {
// l is still held here
a.saveCurrentSegmentInternal(ctx, true)
a.saveSegmentInternal(ctx, a.currentSegment, true)
}

a.defaultReportMonths = config.DefaultReportMonths
Expand Down
Loading

0 comments on commit b5c5450

Please sign in to comment.