From 51f40c5265fc32bfced692fdf6cc8b9c7fb3e25d Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 24 Oct 2018 16:58:00 +0200 Subject: [PATCH] storage: fix Raft log size accounting We were accounting for sideloaded payloads (SSTs) when adding them to the log, but were omitting them during truncations. As a result, the tracked raft log size would permanently skyrocket which in turn would lead to extremely aggressive truncations and resulted in pathological amounts of Raft snapshots. I'm still concerned about this logic as we're now relying on numbers obtained from the file system to match exactly a prior in-mem computation, and there may be other bugs that cause a divergence. But this fixes the blatant obvious one, so it's a step in the right direction. The added tests highlight a likely omission in the sideloaded storage code which doesn't access the file system through the RocksDB env as it seems like it should, filed as #31913. At this point it's unclear whether it fixes the below issues, but at the very least it seems to address a major problem they encountered: Touches #31732. Touches #31740. Touches #30261. Touches #31768. Touches #31745. Release note (bug fix): avoid a performance degradation related to overly aggressive Raft log truncations that could occur during RESTORE or IMPORT operations. --- pkg/cmd/roachtest/tpcc.go | 11 +++ pkg/storage/raft_log_queue.go | 7 +- pkg/storage/raft_log_queue_test.go | 44 ++++----- pkg/storage/replica.go | 6 +- pkg/storage/replica_proposal.go | 4 +- pkg/storage/replica_sideload.go | 2 +- pkg/storage/replica_sideload_disk.go | 18 ++-- pkg/storage/replica_sideload_inmem.go | 8 +- pkg/storage/replica_sideload_test.go | 130 ++++++++++++++++++++------ 9 files changed, 162 insertions(+), 68 deletions(-) diff --git a/pkg/cmd/roachtest/tpcc.go b/pkg/cmd/roachtest/tpcc.go index 55b277894f42..3aa088c0e754 100644 --- a/pkg/cmd/roachtest/tpcc.go +++ b/pkg/cmd/roachtest/tpcc.go @@ -357,6 +357,10 @@ func loadTPCCBench( db := c.Conn(ctx, 1) defer db.Close() + if _, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.range_merge.queue_enabled = false`); err != nil { + return err + } + // Check if the dataset already exists and is already large enough to // accommodate this benchmarking. If so, we can skip the fixture RESTORE. if _, err := db.ExecContext(ctx, `USE tpcc`); err == nil { @@ -686,6 +690,13 @@ func registerTPCCBench(r *registry) { LoadWarehouses: 2000, EstimatedMax: 1300, }, + { + Nodes: 9, + CPUs: 64, + + LoadWarehouses: 10000, + EstimatedMax: 8000, + }, // objective 1, key result 1. { Nodes: 30, diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 4a361fbdadd6..7e2622c068ba 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -19,6 +19,8 @@ import ( "sort" "time" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/pkg/errors" "go.etcd.io/etcd/raft" @@ -249,11 +251,12 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ *config.Syst if shouldTruncate(truncatableIndexes, raftLogSize) { r.mu.Lock() raftLogSize := r.mu.raftLogSize + lastIndex := r.mu.lastIndex r.mu.Unlock() if log.V(1) { - log.Infof(ctx, "truncating raft log %d-%d: size=%d", - oldestIndex-truncatableIndexes, oldestIndex, raftLogSize) + log.Infof(ctx, "truncating raft log entries [%d-%d], resulting in log [%d,%d], reclaiming ~%s", + oldestIndex-truncatableIndexes, oldestIndex-1, oldestIndex, lastIndex, humanizeutil.IBytes(raftLogSize)) } b := &client.Batch{} b.AddRawRequest(&roachpb.TruncateLogRequest{ diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index 38b502fbcbd5..5c7750e094a1 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -143,6 +143,28 @@ func TestComputeTruncatableIndex(t *testing.T) { } } +func verifyLogSizeInSync(t *testing.T, r *Replica) { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + raftLogSize := r.mu.raftLogSize + r.mu.Unlock() + start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1)) + end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64)) + + var ms enginepb.MVCCStats + iter := r.store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key}) + defer iter.Close() + ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */) + if err != nil { + t.Fatal(err) + } + actualRaftLogSize := ms.SysBytes + if actualRaftLogSize != raftLogSize { + t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize) + } +} + // TestGetTruncatableIndexes verifies that old raft log entries are correctly // removed. func TestGetTruncatableIndexes(t *testing.T) { @@ -228,27 +250,7 @@ func TestGetTruncatableIndexes(t *testing.T) { t.Errorf("expected oldestIndex to increase, instead it changed from %d -> %d", bOldest, cOldest) } - func() { - r.raftMu.Lock() - defer r.raftMu.Unlock() - r.mu.Lock() - raftLogSize := r.mu.raftLogSize - r.mu.Unlock() - start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1)) - end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64)) - - var ms enginepb.MVCCStats - iter := store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key}) - defer iter.Close() - ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */) - if err != nil { - t.Fatal(err) - } - actualRaftLogSize := ms.SysBytes - if actualRaftLogSize != raftLogSize { - t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize) - } - }() + verifyLogSizeInSync(t, r) // Again, enable the raft log scanner and and force a truncation. This time // we expect no truncation to occur. diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index d1d41114b87b..01e04686f0ed 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -6581,7 +6581,7 @@ func (r *Replica) maybeGossipFirstRange(ctx context.Context) *roachpb.Error { // Gossip the cluster ID from all replicas of the first range; there // is no expiration on the cluster ID. - if log.V(1) { + if false && log.V(1) { log.Infof(ctx, "gossiping cluster id %q from store %d, r%d", r.store.ClusterID(), r.store.StoreID(), r.RangeID) } @@ -6614,14 +6614,14 @@ func (r *Replica) gossipFirstRange(ctx context.Context) { } log.Event(ctx, "gossiping sentinel and first range") if log.V(1) { - log.Infof(ctx, "gossiping sentinel from store %d, r%d", r.store.StoreID(), r.RangeID) + //log.Infof(ctx, "gossiping sentinel from store %d, r%d", r.store.StoreID(), r.RangeID) } if err := r.store.Gossip().AddInfo( gossip.KeySentinel, r.store.ClusterID().GetBytes(), r.store.cfg.SentinelGossipTTL()); err != nil { log.Errorf(ctx, "failed to gossip sentinel: %s", err) } - if log.V(1) { + if false && log.V(1) { log.Infof(ctx, "gossiping first range from store %d, r%d: %s", r.store.StoreID(), r.RangeID, r.mu.state.Desc.Replicas) } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index f5a5804b3f76..0c86b4f49379 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -565,10 +565,12 @@ func (r *Replica) handleReplicatedEvalResult( // could rot. { log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", newTruncState.Index) - if err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { + if size, err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { // We don't *have* to remove these entries for correctness. Log a // loud error, but keep humming along. log.Errorf(ctx, "while removing sideloaded files during log truncation: %s", err) + } else { + rResult.RaftLogDelta -= size } } } diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 43383ab4ca12..23fd3e0f20a3 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -49,7 +49,7 @@ type sideloadStorage interface { Clear(context.Context) error // TruncateTo removes all files belonging to an index strictly smaller than // the given one. - TruncateTo(_ context.Context, index uint64) error + TruncateTo(_ context.Context, index uint64) (int64, error) // Returns an absolute path to the file that Get() would return the contents // of. Does not check whether the file actually exists. Filename(_ context.Context, index, term uint64) (string, error) diff --git a/pkg/storage/replica_sideload_disk.go b/pkg/storage/replica_sideload_disk.go index d86005f06144..31793bb6b90e 100644 --- a/pkg/storage/replica_sideload_disk.go +++ b/pkg/storage/replica_sideload_disk.go @@ -129,12 +129,13 @@ func (ss *diskSideloadStorage) Clear(_ context.Context) error { return err } -func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) error { +func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) (int64, error) { matches, err := filepath.Glob(filepath.Join(ss.dir, "i*.t*")) if err != nil { - return err + return 0, err } var deleted int + var size int64 for _, match := range matches { base := filepath.Base(match) if len(base) < 1 || base[0] != 'i' { @@ -144,22 +145,27 @@ func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) err upToDot := strings.SplitN(base, ".", 2) i, err := strconv.ParseUint(upToDot[0], 10, 64) if err != nil { - return errors.Wrapf(err, "while parsing %q during TruncateTo", match) + return size, errors.Wrapf(err, "while parsing %q during TruncateTo", match) } if i >= index { continue } + var fi os.FileInfo + if fi, err = os.Stat(match); err != nil { + return size, errors.Wrapf(err, "while purging %q", match) + } if err := ss.purgeFile(ctx, match); err != nil { - return errors.Wrapf(err, "while purging %q", match) + return size, errors.Wrapf(err, "while purging %q", match) } deleted++ + size += fi.Size() } if deleted == len(matches) { err = os.Remove(ss.dir) if !os.IsNotExist(err) { - return errors.Wrapf(err, "while purging %q", ss.dir) + return size, errors.Wrapf(err, "while purging %q", ss.dir) } } - return nil + return size, nil } diff --git a/pkg/storage/replica_sideload_inmem.go b/pkg/storage/replica_sideload_inmem.go index e6627786adeb..2a194862835a 100644 --- a/pkg/storage/replica_sideload_inmem.go +++ b/pkg/storage/replica_sideload_inmem.go @@ -99,12 +99,14 @@ func (ss *inMemSideloadStorage) Clear(_ context.Context) error { return nil } -func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) error { +func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) (int64, error) { // Not efficient, but this storage is for testing purposes only anyway. - for k := range ss.m { + var size int64 + for k, v := range ss.m { if k.index < index { + size += int64(len(v)) delete(ss.m, k) } } - return nil + return size, nil } diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index b31d5bc9483c..f521f20aba25 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -190,7 +190,8 @@ func testSideloadingSideloadedStorage( { err: nil, fun: func() error { - return ss.TruncateTo(ctx, 123) + _, err := ss.TruncateTo(ctx, 123) + return err }, }, { @@ -261,7 +262,7 @@ func testSideloadingSideloadedStorage( for n := range payloads { // Truncate indexes <= payloads[n] (payloads is sorted in increasing order). - if err := ss.TruncateTo(ctx, payloads[n]); err != nil { + if _, err := ss.TruncateTo(ctx, payloads[n]); err != nil { t.Fatalf("%d: %s", n, err) } // Index payloads[n] and above are still there (truncation is exclusive) @@ -281,16 +282,20 @@ func testSideloadingSideloadedStorage( } } - if !isInMem { + func() { + if isInMem { + return + } // First add a file that shouldn't be in the sideloaded storage to ensure // sane behavior when directory can't be removed after full truncate. nonRemovableFile := filepath.Join(ss.(*diskSideloadStorage).dir, "cantremove.xx") - _, err := os.Create(nonRemovableFile) + f, err := os.Create(nonRemovableFile) if err != nil { t.Fatalf("could not create non i*.t* file in sideloaded storage: %v", err) } + defer f.Close() - err = ss.TruncateTo(ctx, math.MaxUint64) + _, err = ss.TruncateTo(ctx, math.MaxUint64) if err == nil { t.Fatalf("sideloaded directory should not have been removable due to extra file %s", nonRemovableFile) } @@ -305,7 +310,7 @@ func testSideloadingSideloadedStorage( } // Test that directory is removed when filepath.Glob returns 0 matches. - if err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { + if _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { t.Fatal(err) } // Ensure directory is removed, now that all files should be gone. @@ -329,7 +334,7 @@ func testSideloadingSideloadedStorage( } } assertCreated(true) - if err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { + if _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { t.Fatal(err) } // Ensure directory is removed when all records are removed. @@ -342,7 +347,7 @@ func testSideloadingSideloadedStorage( t.Fatalf("expected %q to be removed: %v", ss.(*diskSideloadStorage).dir, err) } } - } + }() if err := ss.Clear(ctx); err != nil { t.Fatal(err) @@ -351,7 +356,7 @@ func testSideloadingSideloadedStorage( assertCreated(false) // Sanity check that we can call TruncateTo without the directory existing. - if err := ss.TruncateTo(ctx, 1); err != nil { + if _, err := ss.TruncateTo(ctx, 1); err != nil { t.Fatal(err) } @@ -620,11 +625,40 @@ func makeInMemSideloaded(repl *Replica) { // TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command. func TestRaftSSTableSideloadingProposal(t *testing.T) { + // engineInMem=true mockSideloaded=false + testutils.RunTrueAndFalse(t, "engineInMem", func(t *testing.T, engineInMem bool) { + testutils.RunTrueAndFalse(t, "mockSideloaded", func(t *testing.T, mockSideloaded bool) { + if engineInMem && !mockSideloaded { + t.Skip("https://github.com/cockroachdb/cockroach/issues/31913") + } + testRaftSSTableSideloadingProposal(t, engineInMem, mockSideloaded) + }) + }) +} + +// TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command. +func testRaftSSTableSideloadingProposal(t *testing.T, engineInMem, mockSideloaded bool) { defer leaktest.AfterTest(t)() defer SetMockAddSSTable()() - tc := testContext{} + dir, cleanup := testutils.TempDir(t) + defer cleanup() stopper := stop.NewStopper() + tc := testContext{} + if !engineInMem { + cfg := engine.RocksDBConfig{ + Dir: dir, + Settings: cluster.MakeTestingClusterSettings(), + } + var err error + cache := engine.NewRocksDBCache(1 << 20) + defer cache.Release() + tc.engine, err = engine.NewRocksDB(cfg, cache) + if err != nil { + t.Fatal(err) + } + stopper.AddCloser(tc.engine) + } defer stopper.Stop(context.TODO()) tc.Start(t, stopper) @@ -632,11 +666,14 @@ func TestRaftSSTableSideloadingProposal(t *testing.T) { defer cancel() const ( - key = "foo" - val = "bar" + key = "foo" + minEntrySize = 128 ) + val := strings.Repeat("x", minEntrySize) - makeInMemSideloaded(tc.repl) + if mockSideloaded { + makeInMemSideloaded(tc.repl) + } ts := hlc.Timestamp{Logical: 1} @@ -665,27 +702,58 @@ func TestRaftSSTableSideloadingProposal(t *testing.T) { } } - tc.repl.raftMu.Lock() - defer tc.repl.raftMu.Unlock() - if ss := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); len(ss.m) < 1 { - t.Fatal("sideloaded storage is empty") - } + func() { + tc.repl.raftMu.Lock() + defer tc.repl.raftMu.Unlock() + if ss, ok := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); ok && len(ss.m) < 1 { + t.Fatal("sideloaded storage is empty") + } - if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "sideloadable proposal detected", "ingested SSTable"); err != nil { - t.Fatal(err) - } + if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "sideloadable proposal detected", "ingested SSTable"); err != nil { + t.Fatal(err) + } - if n := tc.store.metrics.AddSSTableProposals.Count(); n == 0 { - t.Fatalf("expected metric to show at least one AddSSTable proposal, but got %d", n) - } + if n := tc.store.metrics.AddSSTableProposals.Count(); n == 0 { + t.Fatalf("expected metric to show at least one AddSSTable proposal, but got %d", n) + } - if n := tc.store.metrics.AddSSTableApplications.Count(); n == 0 { - t.Fatalf("expected metric to show at least one AddSSTable application, but got %d", n) - } - // We don't count in-memory env SST writes as copies. - if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n != 0 { - t.Fatalf("expected metric to show 0 AddSSTable copy, but got %d", n) - } + if n := tc.store.metrics.AddSSTableApplications.Count(); n == 0 { + t.Fatalf("expected metric to show at least one AddSSTable application, but got %d", n) + } + // We usually don't see copies because we hardlink and ingest the original SST. However, this + // depends on luck and the file system, so don't try to assert it. We should, however, see + // no more than one. + expMaxCopies := int64(1) + if engineInMem { + // We don't count in-memory env SST writes as copies. + expMaxCopies = 0 + } + if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n > expMaxCopies { + t.Fatalf("expected metric to show <= %d AddSSTable copies, but got %d", expMaxCopies, n) + } + }() + // Force a log truncation followed by verification of the tracked raft log size. This exercises a + // former bug in which the raft log size took the sideloaded payload into account when adding + // to the log, but not when truncating. + testutils.SucceedsSoon(t, func() error { + // Write enough keys to the range to make sure that a truncation will happen. + for i := 0; i < RaftLogQueueStaleThreshold+1; i++ { + key := roachpb.Key(fmt.Sprintf("key%02d", i)) + args := putArgs(key, []byte(fmt.Sprintf("value%02d", i))) + if _, err := client.SendWrapped(context.Background(), tc.store.TestSender(), &args); err != nil { + t.Fatal(err) + } + } + + if _, err := tc.store.raftLogQueue.Add(tc.repl, 99.99 /* priority */); err != nil { + t.Fatal(err) + } + tc.store.ForceRaftLogScanAndProcess() + // SST is definitely truncated now, so recomputing the Raft log keys should match up with + // the tracked size. + verifyLogSizeInSync(t, tc.repl) + return nil + }) } type mockSender struct {