Skip to content

Commit

Permalink
Merge #76902
Browse files Browse the repository at this point in the history
76902: kvserver: enable loosely coupled raft log truncation r=tbg a=sumeerbhola

The const override that disabled loosely coupled truncation
is removed. There is additional testing of both the end-to-end
path and the truncator.

Informs #36262

Release note: None

Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
  • Loading branch information
craig[bot] and sumeerbhola committed Feb 25, 2022
2 parents bc8cbe6 + 1bb58e5 commit 978744d
Show file tree
Hide file tree
Showing 13 changed files with 755 additions and 390 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,9 @@ func mergeCheckingTimestampCaches(
// the result to apply on the majority quorum.
testutils.SucceedsSoon(t, func() error {
for _, r := range lhsRepls[1:] {
// Loosely-coupled truncation requires an engine flush to advance
// guaranteed durability.
require.NoError(t, r.Engine().Flush())
firstIndex, err := r.GetFirstIndex()
require.NoError(t, err)
if firstIndex < truncIndex {
Expand Down Expand Up @@ -3980,6 +3983,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
if _, err := kv.SendWrapped(ctx, distSender, truncArgs); err != nil {
t.Fatal(err)
}
waitForTruncationForTesting(t, repl, index)
return index
}()

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func TestRaftLogQueue(t *testing.T) {
for i := range tc.Servers {
tc.GetFirstStoreFromServer(t, i).MustForceRaftLogScanAndProcess()
}
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, raftLeaderRepl.Engine().Flush())
// Ensure that firstIndex has increased indicating that the log
// truncation has occurred.
afterTruncationIndex, err = raftLeaderRepl.GetFirstIndex()
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,20 @@ func TestSnapshotAfterTruncation(t *testing.T) {
}
}

func waitForTruncationForTesting(t *testing.T, r *kvserver.Replica, newFirstIndex uint64) {
testutils.SucceedsSoon(t, func() error {
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, r.Engine().Flush())
// FirstIndex has changed.
firstIndex, err := r.GetFirstIndex()
require.NoError(t, err)
if firstIndex != newFirstIndex {
return errors.Errorf("expected firstIndex == %d, got %d", newFirstIndex, firstIndex)
}
return nil
})
}

// TestSnapshotAfterTruncationWithUncommittedTail is similar in spirit to
// TestSnapshotAfterTruncation/differentTerm. However, it differs in that we
// take care to ensure that the partitioned Replica has a long uncommitted tail
Expand Down Expand Up @@ -1009,6 +1023,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
}
return nil
})
waitForTruncationForTesting(t, newLeaderRepl, index+1)

snapsMetric := tc.GetFirstStoreFromServer(t, partStore).Metrics().RangeSnapshotsAppliedByVoters
snapsBefore := snapsMetric.Count()
Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,7 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time {
func isLooselyCoupledRaftLogTruncationEnabled(
ctx context.Context, settings *cluster.Settings,
) bool {
// TODO(sumeer): remove the false when hooking up the
// raftLogTruncator.durabilityAdvanced and fixing that method to do a
// durable read of RaftAppliedIndex.
return settings.Version.IsActive(
ctx, clusterversion.LooselyCoupledRaftLogTruncation) &&
looselyCoupledTruncationEnabled.Get(&settings.SV) && false
looselyCoupledTruncationEnabled.Get(&settings.SV)
}
203 changes: 112 additions & 91 deletions pkg/kv/kvserver/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
raft "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/tracker"
)
Expand Down Expand Up @@ -580,7 +581,7 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
{1, RaftLogQueueStaleSize},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
store, _ := createTestStore(ctx, t,
Expand All @@ -590,7 +591,8 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
createSystemRanges: false,
},
stopper)

st := store.ClusterSettings()
looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled)
// Note that turning off the replica scanner does not prevent the queues
// from processing entries (in this case specifically the raftLogQueue),
// just that the scanner will not try to push all replicas onto the queues.
Expand Down Expand Up @@ -618,6 +620,10 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
// fairly quickly, there is a slight race between this check and the
// truncation, especially when under stress.
testutils.SucceedsSoon(t, func() error {
if looselyCoupled {
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, store.engine.Flush())
}
newFirstIndex, err := r.GetFirstIndex()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -702,108 +708,105 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
func TestTruncateLog(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := testContext{}
ctx := context.Background()
cfg := TestStoreConfig(nil)
cfg.TestingKnobs.DisableRaftLogQueue = true
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(ctx, t, stopper, cfg)
testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) {
tc := testContext{}
ctx := context.Background()
cfg := TestStoreConfig(nil)
cfg.TestingKnobs.DisableRaftLogQueue = true
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(ctx, t, stopper, cfg)
st := tc.store.ClusterSettings()
looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled)

// Populate the log with 10 entries. Save the LastIndex after each write.
var indexes []uint64
for i := 0; i < 10; i++ {
args := incrementArgs([]byte("a"), int64(i))

if _, pErr := tc.SendWrapped(args); pErr != nil {
t.Fatal(pErr)
}
idx, err := tc.repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
indexes = append(indexes, idx)
}

// Populate the log with 10 entries. Save the LastIndex after each write.
var indexes []uint64
for i := 0; i < 10; i++ {
args := incrementArgs([]byte("a"), int64(i))
rangeID := tc.repl.RangeID

if _, pErr := tc.SendWrapped(args); pErr != nil {
// Discard the first half of the log.
truncateArgs := truncateLogArgs(indexes[5], rangeID)
if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil {
t.Fatal(pErr)
}
idx, err := tc.repl.GetLastIndex()

waitForTruncationForTesting(t, tc.repl, indexes[5], looselyCoupled)

// We can still get what remains of the log.
tc.repl.mu.Lock()
entries, err := tc.repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64)
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
indexes = append(indexes, idx)
}

rangeID := tc.repl.RangeID

// Discard the first half of the log.
truncateArgs := truncateLogArgs(indexes[5], rangeID)
if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil {
t.Fatal(pErr)
}

// FirstIndex has changed.
firstIndex, err := tc.repl.GetFirstIndex()
if err != nil {
t.Fatal(err)
}
if firstIndex != indexes[5] {
t.Errorf("expected firstIndex == %d, got %d", indexes[5], firstIndex)
}

// We can still get what remains of the log.
tc.repl.mu.Lock()
entries, err := tc.repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64)
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if len(entries) != int(indexes[9]-indexes[5]) {
t.Errorf("expected %d entries, got %d", indexes[9]-indexes[5], len(entries))
}
if len(entries) != int(indexes[9]-indexes[5]) {
t.Errorf("expected %d entries, got %d", indexes[9]-indexes[5], len(entries))
}

// But any range that includes the truncated entries returns an error.
tc.repl.mu.Lock()
_, err = tc.repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64)
tc.repl.mu.Unlock()
if !errors.Is(err, raft.ErrCompacted) {
t.Errorf("expected ErrCompacted, got %s", err)
}
// But any range that includes the truncated entries returns an error.
tc.repl.mu.Lock()
_, err = tc.repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64)
tc.repl.mu.Unlock()
if !errors.Is(err, raft.ErrCompacted) {
t.Errorf("expected ErrCompacted, got %s", err)
}

// The term of the last truncated entry is still available.
tc.repl.mu.Lock()
term, err := tc.repl.raftTermRLocked(indexes[4])
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if term == 0 {
t.Errorf("invalid term 0 for truncated entry")
}
// The term of the last truncated entry is still available.
tc.repl.mu.Lock()
term, err := tc.repl.raftTermRLocked(indexes[4])
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if term == 0 {
t.Errorf("invalid term 0 for truncated entry")
}

// The terms of older entries are gone.
tc.repl.mu.Lock()
_, err = tc.repl.raftTermRLocked(indexes[3])
tc.repl.mu.Unlock()
if !errors.Is(err, raft.ErrCompacted) {
t.Errorf("expected ErrCompacted, got %s", err)
}
// The terms of older entries are gone.
tc.repl.mu.Lock()
_, err = tc.repl.raftTermRLocked(indexes[3])
tc.repl.mu.Unlock()
if !errors.Is(err, raft.ErrCompacted) {
t.Errorf("expected ErrCompacted, got %s", err)
}

// Truncating logs that have already been truncated should not return an
// error.
truncateArgs = truncateLogArgs(indexes[3], rangeID)
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
t.Fatal(pErr)
}
// Truncating logs that have already been truncated should not return an
// error.
truncateArgs = truncateLogArgs(indexes[3], rangeID)
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
t.Fatal(pErr)
}

// Truncating logs that have the wrong rangeID included should not return
// an error but should not truncate any logs.
truncateArgs = truncateLogArgs(indexes[9], rangeID+1)
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
t.Fatal(pErr)
}
// Truncating logs that have the wrong rangeID included should not return
// an error but should not truncate any logs.
truncateArgs = truncateLogArgs(indexes[9], rangeID+1)
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
t.Fatal(pErr)
}

tc.repl.mu.Lock()
// The term of the last truncated entry is still available.
term, err = tc.repl.raftTermRLocked(indexes[4])
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if term == 0 {
t.Errorf("invalid term 0 for truncated entry")
}
tc.repl.mu.Lock()
// The term of the last truncated entry is still available.
term, err = tc.repl.raftTermRLocked(indexes[4])
tc.repl.mu.Unlock()
if err != nil {
t.Fatal(err)
}
if term == 0 {
t.Errorf("invalid term 0 for truncated entry")
}
})
}

func TestRaftLogQueueShouldQueueRecompute(t *testing.T) {
Expand Down Expand Up @@ -913,3 +916,21 @@ func TestTruncateLogRecompute(t *testing.T) {
put() // make sure we remain trusted and in sync
}
}

func waitForTruncationForTesting(
t *testing.T, r *Replica, newFirstIndex uint64, looselyCoupled bool,
) {
testutils.SucceedsSoon(t, func() error {
if looselyCoupled {
// Flush the engine to advance durability, which triggers truncation.
require.NoError(t, r.Engine().Flush())
}
// FirstIndex has changed.
firstIndex, err := r.GetFirstIndex()
require.NoError(t, err)
if firstIndex != newFirstIndex {
return errors.Errorf("expected firstIndex == %d, got %d", newFirstIndex, firstIndex)
}
return nil
})
}
Loading

0 comments on commit 978744d

Please sign in to comment.