Skip to content

Commit e909258

Browse files
committed
Fix segments fragments loss (#23781)
* add ent changes * add changelog * make fmt
1 parent c07fb7e commit e909258

File tree

3 files changed

+42
-22
lines changed

3 files changed

+42
-22
lines changed

changelog/23781.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
core/activity: Fixes segments fragment loss due to exceeding entry record size limit
3+
```

vault/activity_log.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,18 @@ const (
6060
activitySegmentWriteTimeout = 1 * time.Minute
6161

6262
// Number of client records to store per segment. Each ClientRecord may
63-
// consume upto 99 bytes; rounding it to 100bytes. Considering the storage
64-
// limit of 512KB per storage entry, we can roughly store 512KB/100bytes =
65-
// 5241 clients; rounding down to 5000 clients.
66-
activitySegmentClientCapacity = 5000
63+
// consume upto 99 bytes; rounding it to 100bytes. This []byte undergo JSON marshalling
64+
// before adding them in storage increasing the size by approximately 4/3 times. Considering the storage
65+
// limit of 512KB per storage entry, we can roughly store 512KB/(100bytes * 4/3) yielding approximately 3820 records.
66+
ActivitySegmentClientCapacity = 3820
6767

6868
// Maximum number of segments per month. This allows for 700K entities per
69-
// month; 700K/5K. These limits are geared towards controlling the storage
69+
// month; 700K/3820 (ActivitySegmentClientCapacity). These limits are geared towards controlling the storage
7070
// implications of persisting activity logs. If we hit a scenario where the
7171
// storage consequences are less important in comparison to the accuracy of
7272
// the client activity, these limits can be further relaxed or even be
7373
// removed.
74-
activityLogMaxSegmentPerMonth = 140
74+
activityLogMaxSegmentPerMonth = 184
7575

7676
// trackedTWESegmentPeriod is a time period of a little over a month, and represents
7777
// the amount of time that needs to pass after a 1.9 or later upgrade to result in
@@ -351,7 +351,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
351351
}
352352

353353
// Will all new entities fit? If not, roll over to a new segment.
354-
available := activitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients)
354+
available := ActivitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients)
355355
remaining := available - len(newEntities)
356356
excess := 0
357357
if remaining < 0 {
@@ -389,9 +389,9 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for
389389

390390
// Rotate to next segment
391391
a.currentSegment.clientSequenceNumber += 1
392-
if len(excessClients) > activitySegmentClientCapacity {
392+
if len(excessClients) > ActivitySegmentClientCapacity {
393393
a.logger.Warn("too many new active clients, dropping tail", "clients", len(excessClients))
394-
excessClients = excessClients[:activitySegmentClientCapacity]
394+
excessClients = excessClients[:ActivitySegmentClientCapacity]
395395
}
396396
a.currentSegment.currentClients.Clients = excessClients
397397
err := a.saveCurrentSegmentInternal(ctx, force)

vault/activity_log_test.go

+30-13
Original file line numberDiff line numberDiff line change
@@ -663,9 +663,11 @@ func TestActivityLog_availableLogs(t *testing.T) {
663663
}
664664
}
665665

666-
// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment and saves it and reads it. The test then
667-
// adds 4000 more clients and calls receivedFragment with 200 more entities. The current segment is saved to storage and
668-
// read back. The test verifies that there are 5000 clients in the first segment index, then the rest in the second index.
666+
// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment
667+
// and saves it and reads it. The test then adds 4000 more clients and calls
668+
// receivedFragment with 200 more entities. The current segment is saved to
669+
// storage and read back. The test verifies that there are ActivitySegmentClientCapacity clients in the
670+
// first and second segment index, then the rest in the third index.
669671
func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
670672
core, _, _ := TestCoreUnsealed(t)
671673
a := core.activityLog
@@ -685,14 +687,15 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
685687
startTimestamp := a.GetStartTimestamp()
686688
path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp)
687689
path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp)
690+
path2 := fmt.Sprintf("sys/counters/activity/log/entity/%d/2", startTimestamp)
688691
tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", startTimestamp)
689692

690693
genID := func(i int) string {
691694
return fmt.Sprintf("11111111-1111-1111-1111-%012d", i)
692695
}
693696
ts := time.Now().Unix()
694697

695-
// First 4000 should fit in one segment
698+
// First ActivitySegmentClientCapacity should fit in one segment
696699
for i := 0; i < 4000; i++ {
697700
a.AddEntityToFragment(genID(i), "root", ts)
698701
}
@@ -705,7 +708,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
705708
default:
706709
}
707710

708-
// Save incomplete segment
711+
// Save segment
709712
err := a.saveCurrentSegmentToStorage(context.Background(), false)
710713
if err != nil {
711714
t.Fatalf("got error writing entities to storage: %v", err)
@@ -717,8 +720,8 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
717720
if err != nil {
718721
t.Fatalf("could not unmarshal protobuf: %v", err)
719722
}
720-
if len(entityLog0.Clients) != 4000 {
721-
t.Fatalf("unexpected entity length. Expected %d, got %d", 4000, len(entityLog0.Clients))
723+
if len(entityLog0.Clients) != ActivitySegmentClientCapacity {
724+
t.Fatalf("unexpected entity length. Expected %d, got %d", ActivitySegmentClientCapacity, len(entityLog0.Clients))
722725
}
723726

724727
// 4000 more local entities
@@ -778,17 +781,17 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
778781
}
779782

780783
seqNum := a.GetEntitySequenceNumber()
781-
if seqNum != 1 {
782-
t.Fatalf("expected sequence number 1, got %v", seqNum)
784+
if seqNum != 2 {
785+
t.Fatalf("expected sequence number 2, got %v", seqNum)
783786
}
784787

785788
protoSegment0 = readSegmentFromStorage(t, core, path0)
786789
err = proto.Unmarshal(protoSegment0.Value, &entityLog0)
787790
if err != nil {
788791
t.Fatalf("could not unmarshal protobuf: %v", err)
789792
}
790-
if len(entityLog0.Clients) != activitySegmentClientCapacity {
791-
t.Fatalf("unexpected client length. Expected %d, got %d", activitySegmentClientCapacity,
793+
if len(entityLog0.Clients) != ActivitySegmentClientCapacity {
794+
t.Fatalf("unexpected client length. Expected %d, got %d", ActivitySegmentClientCapacity,
792795
len(entityLog0.Clients))
793796
}
794797

@@ -798,8 +801,19 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
798801
if err != nil {
799802
t.Fatalf("could not unmarshal protobuf: %v", err)
800803
}
801-
expectedCount := 8100 - activitySegmentClientCapacity
802-
if len(entityLog1.Clients) != expectedCount {
804+
if len(entityLog1.Clients) != ActivitySegmentClientCapacity {
805+
t.Fatalf("unexpected entity length. Expected %d, got %d", ActivitySegmentClientCapacity,
806+
len(entityLog1.Clients))
807+
}
808+
809+
protoSegment2 := readSegmentFromStorage(t, core, path2)
810+
entityLog2 := activity.EntityActivityLog{}
811+
err = proto.Unmarshal(protoSegment2.Value, &entityLog2)
812+
if err != nil {
813+
t.Fatalf("could not unmarshal protobuf: %v", err)
814+
}
815+
expectedCount := 8100 - (ActivitySegmentClientCapacity * 2)
816+
if len(entityLog2.Clients) != expectedCount {
803817
t.Fatalf("unexpected entity length. Expected %d, got %d", expectedCount,
804818
len(entityLog1.Clients))
805819
}
@@ -811,6 +825,9 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) {
811825
for _, e := range entityLog1.Clients {
812826
entityPresent[e.ClientID] = struct{}{}
813827
}
828+
for _, e := range entityLog2.Clients {
829+
entityPresent[e.ClientID] = struct{}{}
830+
}
814831
for i := 0; i < 8100; i++ {
815832
expectedID := genID(i)
816833
if _, present := entityPresent[expectedID]; !present {

0 commit comments

Comments
 (0)