Skip to content

Commit

Permalink
storage: fix Raft log size accounting
Browse files Browse the repository at this point in the history
We were accounting for sideloaded payloads (SSTs) when adding them to
the log, but were omitting them during truncations. As a result, the
tracked raft log size would permanently skyrocket which in turn would
lead to extremely aggressive truncations and resulted in pathological
amounts of Raft snapshots.

I'm still concerned about this logic as we're now relying on numbers
obtained from the file system to match exactly a prior in-mem
computation, and there may be other bugs that cause a divergence. But
this fixes the blatant obvious one, so it's a step in the right
direction.

The added tests highlight a likely omission in the sideloaded storage
code which doesn't access the file system through the RocksDB env as it
seems like it should, filed as cockroachdb#31913.

At this point it's unclear whether it fixes the below issues, but at the
very least it seems to address a major problem they encountered:

Touches cockroachdb#31732.
Touches cockroachdb#31740.
Touches cockroachdb#30261.
Touches cockroachdb#31768.
Touches cockroachdb#31745.

Release note (bug fix): avoid a performance degradation related to
overly aggressive Raft log truncations that could occur during RESTORE
or IMPORT operations.
  • Loading branch information
tbg committed Oct 26, 2018
1 parent fa7fb35 commit 51f40c5
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 68 deletions.
11 changes: 11 additions & 0 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ func loadTPCCBench(
db := c.Conn(ctx, 1)
defer db.Close()

if _, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.range_merge.queue_enabled = false`); err != nil {
return err
}

// Check if the dataset already exists and is already large enough to
// accommodate this benchmarking. If so, we can skip the fixture RESTORE.
if _, err := db.ExecContext(ctx, `USE tpcc`); err == nil {
Expand Down Expand Up @@ -686,6 +690,13 @@ func registerTPCCBench(r *registry) {
LoadWarehouses: 2000,
EstimatedMax: 1300,
},
{
Nodes: 9,
CPUs: 64,

LoadWarehouses: 10000,
EstimatedMax: 8000,
},
// objective 1, key result 1.
{
Nodes: 30,
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"

"github.com/pkg/errors"
"go.etcd.io/etcd/raft"

Expand Down Expand Up @@ -249,11 +251,12 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ *config.Syst
if shouldTruncate(truncatableIndexes, raftLogSize) {
r.mu.Lock()
raftLogSize := r.mu.raftLogSize
lastIndex := r.mu.lastIndex
r.mu.Unlock()

if log.V(1) {
log.Infof(ctx, "truncating raft log %d-%d: size=%d",
oldestIndex-truncatableIndexes, oldestIndex, raftLogSize)
log.Infof(ctx, "truncating raft log entries [%d-%d], resulting in log [%d,%d], reclaiming ~%s",
oldestIndex-truncatableIndexes, oldestIndex-1, oldestIndex, lastIndex, humanizeutil.IBytes(raftLogSize))
}
b := &client.Batch{}
b.AddRawRequest(&roachpb.TruncateLogRequest{
Expand Down
44 changes: 23 additions & 21 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,28 @@ func TestComputeTruncatableIndex(t *testing.T) {
}
}

func verifyLogSizeInSync(t *testing.T, r *Replica) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
raftLogSize := r.mu.raftLogSize
r.mu.Unlock()
start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1))
end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64))

var ms enginepb.MVCCStats
iter := r.store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key})
defer iter.Close()
ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */)
if err != nil {
t.Fatal(err)
}
actualRaftLogSize := ms.SysBytes
if actualRaftLogSize != raftLogSize {
t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize)
}
}

// TestGetTruncatableIndexes verifies that old raft log entries are correctly
// removed.
func TestGetTruncatableIndexes(t *testing.T) {
Expand Down Expand Up @@ -228,27 +250,7 @@ func TestGetTruncatableIndexes(t *testing.T) {
t.Errorf("expected oldestIndex to increase, instead it changed from %d -> %d", bOldest, cOldest)
}

func() {
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
raftLogSize := r.mu.raftLogSize
r.mu.Unlock()
start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1))
end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64))

var ms enginepb.MVCCStats
iter := store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key})
defer iter.Close()
ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */)
if err != nil {
t.Fatal(err)
}
actualRaftLogSize := ms.SysBytes
if actualRaftLogSize != raftLogSize {
t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize)
}
}()
verifyLogSizeInSync(t, r)

// Again, enable the raft log scanner and and force a truncation. This time
// we expect no truncation to occur.
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -6581,7 +6581,7 @@ func (r *Replica) maybeGossipFirstRange(ctx context.Context) *roachpb.Error {

// Gossip the cluster ID from all replicas of the first range; there
// is no expiration on the cluster ID.
if log.V(1) {
if false && log.V(1) {
log.Infof(ctx, "gossiping cluster id %q from store %d, r%d", r.store.ClusterID(),
r.store.StoreID(), r.RangeID)
}
Expand Down Expand Up @@ -6614,14 +6614,14 @@ func (r *Replica) gossipFirstRange(ctx context.Context) {
}
log.Event(ctx, "gossiping sentinel and first range")
if log.V(1) {
log.Infof(ctx, "gossiping sentinel from store %d, r%d", r.store.StoreID(), r.RangeID)
//log.Infof(ctx, "gossiping sentinel from store %d, r%d", r.store.StoreID(), r.RangeID)
}
if err := r.store.Gossip().AddInfo(
gossip.KeySentinel, r.store.ClusterID().GetBytes(),
r.store.cfg.SentinelGossipTTL()); err != nil {
log.Errorf(ctx, "failed to gossip sentinel: %s", err)
}
if log.V(1) {
if false && log.V(1) {
log.Infof(ctx, "gossiping first range from store %d, r%d: %s",
r.store.StoreID(), r.RangeID, r.mu.state.Desc.Replicas)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,12 @@ func (r *Replica) handleReplicatedEvalResult(
// could rot.
{
log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", newTruncState.Index)
if err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil {
if size, err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil {
// We don't *have* to remove these entries for correctness. Log a
// loud error, but keep humming along.
log.Errorf(ctx, "while removing sideloaded files during log truncation: %s", err)
} else {
rResult.RaftLogDelta -= size
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type sideloadStorage interface {
Clear(context.Context) error
// TruncateTo removes all files belonging to an index strictly smaller than
// the given one.
TruncateTo(_ context.Context, index uint64) error
TruncateTo(_ context.Context, index uint64) (int64, error)
// Returns an absolute path to the file that Get() would return the contents
// of. Does not check whether the file actually exists.
Filename(_ context.Context, index, term uint64) (string, error)
Expand Down
18 changes: 12 additions & 6 deletions pkg/storage/replica_sideload_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ func (ss *diskSideloadStorage) Clear(_ context.Context) error {
return err
}

func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) error {
func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) (int64, error) {
matches, err := filepath.Glob(filepath.Join(ss.dir, "i*.t*"))
if err != nil {
return err
return 0, err
}
var deleted int
var size int64
for _, match := range matches {
base := filepath.Base(match)
if len(base) < 1 || base[0] != 'i' {
Expand All @@ -144,22 +145,27 @@ func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) err
upToDot := strings.SplitN(base, ".", 2)
i, err := strconv.ParseUint(upToDot[0], 10, 64)
if err != nil {
return errors.Wrapf(err, "while parsing %q during TruncateTo", match)
return size, errors.Wrapf(err, "while parsing %q during TruncateTo", match)
}
if i >= index {
continue
}
var fi os.FileInfo
if fi, err = os.Stat(match); err != nil {
return size, errors.Wrapf(err, "while purging %q", match)
}
if err := ss.purgeFile(ctx, match); err != nil {
return errors.Wrapf(err, "while purging %q", match)
return size, errors.Wrapf(err, "while purging %q", match)
}
deleted++
size += fi.Size()
}

if deleted == len(matches) {
err = os.Remove(ss.dir)
if !os.IsNotExist(err) {
return errors.Wrapf(err, "while purging %q", ss.dir)
return size, errors.Wrapf(err, "while purging %q", ss.dir)
}
}
return nil
return size, nil
}
8 changes: 5 additions & 3 deletions pkg/storage/replica_sideload_inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ func (ss *inMemSideloadStorage) Clear(_ context.Context) error {
return nil
}

func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) error {
func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) (int64, error) {
// Not efficient, but this storage is for testing purposes only anyway.
for k := range ss.m {
var size int64
for k, v := range ss.m {
if k.index < index {
size += int64(len(v))
delete(ss.m, k)
}
}
return nil
return size, nil
}
Loading

0 comments on commit 51f40c5

Please sign in to comment.