diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 4a361fbdadd6..7e2622c068ba 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -19,6 +19,8 @@ import ( "sort" "time" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/pkg/errors" "go.etcd.io/etcd/raft" @@ -249,11 +251,12 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ *config.Syst if shouldTruncate(truncatableIndexes, raftLogSize) { r.mu.Lock() raftLogSize := r.mu.raftLogSize + lastIndex := r.mu.lastIndex r.mu.Unlock() if log.V(1) { - log.Infof(ctx, "truncating raft log %d-%d: size=%d", - oldestIndex-truncatableIndexes, oldestIndex, raftLogSize) + log.Infof(ctx, "truncating raft log entries [%d-%d], resulting in log [%d,%d], reclaiming ~%s", + oldestIndex-truncatableIndexes, oldestIndex-1, oldestIndex, lastIndex, humanizeutil.IBytes(raftLogSize)) } b := &client.Batch{} b.AddRawRequest(&roachpb.TruncateLogRequest{ diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index 38b502fbcbd5..5c7750e094a1 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -143,6 +143,28 @@ func TestComputeTruncatableIndex(t *testing.T) { } } +func verifyLogSizeInSync(t *testing.T, r *Replica) { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + raftLogSize := r.mu.raftLogSize + r.mu.Unlock() + start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1)) + end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64)) + + var ms enginepb.MVCCStats + iter := r.store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key}) + defer iter.Close() + ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */) + if err != nil { + t.Fatal(err) + } + actualRaftLogSize := ms.SysBytes + if actualRaftLogSize != raftLogSize { + t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize) + } +} + // TestGetTruncatableIndexes verifies that old raft log entries are correctly // removed. func TestGetTruncatableIndexes(t *testing.T) { @@ -228,27 +250,7 @@ func TestGetTruncatableIndexes(t *testing.T) { t.Errorf("expected oldestIndex to increase, instead it changed from %d -> %d", bOldest, cOldest) } - func() { - r.raftMu.Lock() - defer r.raftMu.Unlock() - r.mu.Lock() - raftLogSize := r.mu.raftLogSize - r.mu.Unlock() - start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1)) - end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64)) - - var ms enginepb.MVCCStats - iter := store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key}) - defer iter.Close() - ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */) - if err != nil { - t.Fatal(err) - } - actualRaftLogSize := ms.SysBytes - if actualRaftLogSize != raftLogSize { - t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize) - } - }() + verifyLogSizeInSync(t, r) // Again, enable the raft log scanner and and force a truncation. This time // we expect no truncation to occur. diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index f5a5804b3f76..0c86b4f49379 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -565,10 +565,12 @@ func (r *Replica) handleReplicatedEvalResult( // could rot. { log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", newTruncState.Index) - if err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { + if size, err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { // We don't *have* to remove these entries for correctness. Log a // loud error, but keep humming along. log.Errorf(ctx, "while removing sideloaded files during log truncation: %s", err) + } else { + rResult.RaftLogDelta -= size } } } diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 43383ab4ca12..a6ce2287a7b6 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -48,8 +48,8 @@ type sideloadStorage interface { // Clear files that may have been written by this sideloadStorage. Clear(context.Context) error // TruncateTo removes all files belonging to an index strictly smaller than - // the given one. - TruncateTo(_ context.Context, index uint64) error + // the given one. Returns the number of bytes freed. + TruncateTo(_ context.Context, index uint64) (int64, error) // Returns an absolute path to the file that Get() would return the contents // of. Does not check whether the file actually exists. Filename(_ context.Context, index, term uint64) (string, error) diff --git a/pkg/storage/replica_sideload_disk.go b/pkg/storage/replica_sideload_disk.go index d86005f06144..31793bb6b90e 100644 --- a/pkg/storage/replica_sideload_disk.go +++ b/pkg/storage/replica_sideload_disk.go @@ -129,12 +129,13 @@ func (ss *diskSideloadStorage) Clear(_ context.Context) error { return err } -func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) error { +func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) (int64, error) { matches, err := filepath.Glob(filepath.Join(ss.dir, "i*.t*")) if err != nil { - return err + return 0, err } var deleted int + var size int64 for _, match := range matches { base := filepath.Base(match) if len(base) < 1 || base[0] != 'i' { @@ -144,22 +145,27 @@ func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) err upToDot := strings.SplitN(base, ".", 2) i, err := strconv.ParseUint(upToDot[0], 10, 64) if err != nil { - return errors.Wrapf(err, "while parsing %q during TruncateTo", match) + return size, errors.Wrapf(err, "while parsing %q during TruncateTo", match) } if i >= index { continue } + var fi os.FileInfo + if fi, err = os.Stat(match); err != nil { + return size, errors.Wrapf(err, "while purging %q", match) + } if err := ss.purgeFile(ctx, match); err != nil { - return errors.Wrapf(err, "while purging %q", match) + return size, errors.Wrapf(err, "while purging %q", match) } deleted++ + size += fi.Size() } if deleted == len(matches) { err = os.Remove(ss.dir) if !os.IsNotExist(err) { - return errors.Wrapf(err, "while purging %q", ss.dir) + return size, errors.Wrapf(err, "while purging %q", ss.dir) } } - return nil + return size, nil } diff --git a/pkg/storage/replica_sideload_inmem.go b/pkg/storage/replica_sideload_inmem.go index e6627786adeb..2a194862835a 100644 --- a/pkg/storage/replica_sideload_inmem.go +++ b/pkg/storage/replica_sideload_inmem.go @@ -99,12 +99,14 @@ func (ss *inMemSideloadStorage) Clear(_ context.Context) error { return nil } -func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) error { +func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) (int64, error) { // Not efficient, but this storage is for testing purposes only anyway. - for k := range ss.m { + var size int64 + for k, v := range ss.m { if k.index < index { + size += int64(len(v)) delete(ss.m, k) } } - return nil + return size, nil } diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index b31d5bc9483c..bd441c864d7d 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -190,7 +190,8 @@ func testSideloadingSideloadedStorage( { err: nil, fun: func() error { - return ss.TruncateTo(ctx, 123) + _, err := ss.TruncateTo(ctx, 123) + return err }, }, { @@ -261,7 +262,7 @@ func testSideloadingSideloadedStorage( for n := range payloads { // Truncate indexes <= payloads[n] (payloads is sorted in increasing order). - if err := ss.TruncateTo(ctx, payloads[n]); err != nil { + if _, err := ss.TruncateTo(ctx, payloads[n]); err != nil { t.Fatalf("%d: %s", n, err) } // Index payloads[n] and above are still there (truncation is exclusive) @@ -281,16 +282,20 @@ func testSideloadingSideloadedStorage( } } - if !isInMem { + func() { + if isInMem { + return + } // First add a file that shouldn't be in the sideloaded storage to ensure // sane behavior when directory can't be removed after full truncate. nonRemovableFile := filepath.Join(ss.(*diskSideloadStorage).dir, "cantremove.xx") - _, err := os.Create(nonRemovableFile) + f, err := os.Create(nonRemovableFile) if err != nil { t.Fatalf("could not create non i*.t* file in sideloaded storage: %v", err) } + defer f.Close() - err = ss.TruncateTo(ctx, math.MaxUint64) + _, err = ss.TruncateTo(ctx, math.MaxUint64) if err == nil { t.Fatalf("sideloaded directory should not have been removable due to extra file %s", nonRemovableFile) } @@ -305,7 +310,7 @@ func testSideloadingSideloadedStorage( } // Test that directory is removed when filepath.Glob returns 0 matches. - if err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { + if _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { t.Fatal(err) } // Ensure directory is removed, now that all files should be gone. @@ -329,7 +334,7 @@ func testSideloadingSideloadedStorage( } } assertCreated(true) - if err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { + if _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { t.Fatal(err) } // Ensure directory is removed when all records are removed. @@ -342,7 +347,7 @@ func testSideloadingSideloadedStorage( t.Fatalf("expected %q to be removed: %v", ss.(*diskSideloadStorage).dir, err) } } - } + }() if err := ss.Clear(ctx); err != nil { t.Fatal(err) @@ -351,7 +356,7 @@ func testSideloadingSideloadedStorage( assertCreated(false) // Sanity check that we can call TruncateTo without the directory existing. - if err := ss.TruncateTo(ctx, 1); err != nil { + if _, err := ss.TruncateTo(ctx, 1); err != nil { t.Fatal(err) } @@ -621,10 +626,40 @@ func makeInMemSideloaded(repl *Replica) { // TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command. func TestRaftSSTableSideloadingProposal(t *testing.T) { defer leaktest.AfterTest(t)() + + testutils.RunTrueAndFalse(t, "engineInMem", func(t *testing.T, engineInMem bool) { + testutils.RunTrueAndFalse(t, "mockSideloaded", func(t *testing.T, mockSideloaded bool) { + if engineInMem && !mockSideloaded { + t.Skip("https://github.com/cockroachdb/cockroach/issues/31913") + } + testRaftSSTableSideloadingProposal(t, engineInMem, mockSideloaded) + }) + }) +} + +// TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command. +func testRaftSSTableSideloadingProposal(t *testing.T, engineInMem, mockSideloaded bool) { + defer leaktest.AfterTest(t)() defer SetMockAddSSTable()() - tc := testContext{} + dir, cleanup := testutils.TempDir(t) + defer cleanup() stopper := stop.NewStopper() + tc := testContext{} + if !engineInMem { + cfg := engine.RocksDBConfig{ + Dir: dir, + Settings: cluster.MakeTestingClusterSettings(), + } + var err error + cache := engine.NewRocksDBCache(1 << 20) + defer cache.Release() + tc.engine, err = engine.NewRocksDB(cfg, cache) + if err != nil { + t.Fatal(err) + } + stopper.AddCloser(tc.engine) + } defer stopper.Stop(context.TODO()) tc.Start(t, stopper) @@ -632,11 +667,14 @@ func TestRaftSSTableSideloadingProposal(t *testing.T) { defer cancel() const ( - key = "foo" - val = "bar" + key = "foo" + entrySize = 128 ) + val := strings.Repeat("x", entrySize) - makeInMemSideloaded(tc.repl) + if mockSideloaded { + makeInMemSideloaded(tc.repl) + } ts := hlc.Timestamp{Logical: 1} @@ -665,27 +703,57 @@ func TestRaftSSTableSideloadingProposal(t *testing.T) { } } - tc.repl.raftMu.Lock() - defer tc.repl.raftMu.Unlock() - if ss := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); len(ss.m) < 1 { - t.Fatal("sideloaded storage is empty") - } + func() { + tc.repl.raftMu.Lock() + defer tc.repl.raftMu.Unlock() + if ss, ok := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); ok && len(ss.m) < 1 { + t.Fatal("sideloaded storage is empty") + } - if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "sideloadable proposal detected", "ingested SSTable"); err != nil { - t.Fatal(err) - } + if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "sideloadable proposal detected", "ingested SSTable"); err != nil { + t.Fatal(err) + } - if n := tc.store.metrics.AddSSTableProposals.Count(); n == 0 { - t.Fatalf("expected metric to show at least one AddSSTable proposal, but got %d", n) - } + if n := tc.store.metrics.AddSSTableProposals.Count(); n == 0 { + t.Fatalf("expected metric to show at least one AddSSTable proposal, but got %d", n) + } + + if n := tc.store.metrics.AddSSTableApplications.Count(); n == 0 { + t.Fatalf("expected metric to show at least one AddSSTable application, but got %d", n) + } + // We usually don't see copies because we hardlink and ingest the original SST. However, this + // depends on luck and the file system, so don't try to assert it. We should, however, see + // no more than one. + expMaxCopies := int64(1) + if engineInMem { + // We don't count in-memory env SST writes as copies. + expMaxCopies = 0 + } + if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n > expMaxCopies { + t.Fatalf("expected metric to show <= %d AddSSTable copies, but got %d", expMaxCopies, n) + } + }() + + // Force a log truncation followed by verification of the tracked raft log size. This exercises a + // former bug in which the raft log size took the sideloaded payload into account when adding + // to the log, but not when truncating. - if n := tc.store.metrics.AddSSTableApplications.Count(); n == 0 { - t.Fatalf("expected metric to show at least one AddSSTable application, but got %d", n) + // Write enough keys to the range to make sure that a truncation will happen. + for i := 0; i < RaftLogQueueStaleThreshold+1; i++ { + key := roachpb.Key(fmt.Sprintf("key%02d", i)) + args := putArgs(key, []byte(fmt.Sprintf("value%02d", i))) + if _, err := client.SendWrapped(context.Background(), tc.store.TestSender(), &args); err != nil { + t.Fatal(err) + } } - // We don't count in-memory env SST writes as copies. - if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n != 0 { - t.Fatalf("expected metric to show 0 AddSSTable copy, but got %d", n) + + if _, err := tc.store.raftLogQueue.Add(tc.repl, 99.99 /* priority */); err != nil { + t.Fatal(err) } + tc.store.ForceRaftLogScanAndProcess() + // SST is definitely truncated now, so recomputing the Raft log keys should match up with + // the tracked size. + verifyLogSizeInSync(t, tc.repl) } type mockSender struct {