Skip to content

Commit c0095c5

Browse files
committed
kvserver: enable loosely coupled raft log truncation
The const override that disabled loosely coupled truncation is removed. There is additional testing of both the end-to-end path and the truncator. Informs #36262 Release note: None
1 parent e2cd026 commit c0095c5

13 files changed

+744
-387
lines changed

pkg/kv/kvserver/client_merge_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,9 @@ func mergeCheckingTimestampCaches(
741741
// the result to apply on the majority quorum.
742742
testutils.SucceedsSoon(t, func() error {
743743
for _, r := range lhsRepls[1:] {
744+
// Loosely-coupled truncation requires an engine flush to advance
745+
// guaranteed durability.
746+
require.NoError(t, r.Engine().Flush())
744747
firstIndex, err := r.GetFirstIndex()
745748
require.NoError(t, err)
746749
if firstIndex < truncIndex {
@@ -3980,6 +3983,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
39803983
if _, err := kv.SendWrapped(ctx, distSender, truncArgs); err != nil {
39813984
t.Fatal(err)
39823985
}
3986+
waitForTruncationForTesting(t, repl, index)
39833987
return index
39843988
}()
39853989

pkg/kv/kvserver/client_raft_log_queue_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ func TestRaftLogQueue(t *testing.T) {
9696
for i := range tc.Servers {
9797
tc.GetFirstStoreFromServer(t, i).MustForceRaftLogScanAndProcess()
9898
}
99+
// Flush the engine to advance durability, which triggers truncation.
100+
require.NoError(t, raftLeaderRepl.Engine().Flush())
99101
// Ensure that firstIndex has increased indicating that the log
100102
// truncation has occurred.
101103
afterTruncationIndex, err = raftLeaderRepl.GetFirstIndex()

pkg/kv/kvserver/client_raft_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,20 @@ func TestSnapshotAfterTruncation(t *testing.T) {
831831
}
832832
}
833833

834+
func waitForTruncationForTesting(t *testing.T, r *kvserver.Replica, newFirstIndex uint64) {
835+
testutils.SucceedsSoon(t, func() error {
836+
// Flush the engine to advance durability, which triggers truncation.
837+
require.NoError(t, r.Engine().Flush())
838+
// FirstIndex has changed.
839+
firstIndex, err := r.GetFirstIndex()
840+
require.NoError(t, err)
841+
if firstIndex != newFirstIndex {
842+
return errors.Errorf("expected firstIndex == %d, got %d", newFirstIndex, firstIndex)
843+
}
844+
return nil
845+
})
846+
}
847+
834848
// TestSnapshotAfterTruncationWithUncommittedTail is similar in spirit to
835849
// TestSnapshotAfterTruncation/differentTerm. However, it differs in that we
836850
// take care to ensure that the partitioned Replica has a long uncommitted tail
@@ -1009,6 +1023,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) {
10091023
}
10101024
return nil
10111025
})
1026+
waitForTruncationForTesting(t, newLeaderRepl, index+1)
10121027

10131028
snapsMetric := tc.GetFirstStoreFromServer(t, partStore).Metrics().RangeSnapshotsAppliedByVoters
10141029
snapsBefore := snapsMetric.Count()

pkg/kv/kvserver/raft_log_queue.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -744,10 +744,7 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time {
744744
func isLooselyCoupledRaftLogTruncationEnabled(
745745
ctx context.Context, settings *cluster.Settings,
746746
) bool {
747-
// TODO(sumeer): remove the false when hooking up the
748-
// raftLogTruncator.durabilityAdvanced and fixing that method to do a
749-
// durable read of RaftAppliedIndex.
750747
return settings.Version.IsActive(
751748
ctx, clusterversion.LooselyCoupledRaftLogTruncation) &&
752-
looselyCoupledTruncationEnabled.Get(&settings.SV) && false
749+
looselyCoupledTruncationEnabled.Get(&settings.SV)
753750
}

pkg/kv/kvserver/raft_log_queue_test.go

Lines changed: 112 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3333
"github.com/cockroachdb/errors"
3434
"github.com/stretchr/testify/assert"
35+
"github.com/stretchr/testify/require"
3536
raft "go.etcd.io/etcd/raft/v3"
3637
"go.etcd.io/etcd/raft/v3/tracker"
3738
)
@@ -580,7 +581,7 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
580581
{1, RaftLogQueueStaleSize},
581582
}
582583
for _, c := range testCases {
583-
t.Run("", func(t *testing.T) {
584+
testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) {
584585
stopper := stop.NewStopper()
585586
defer stopper.Stop(ctx)
586587
store, _ := createTestStore(ctx, t,
@@ -590,7 +591,8 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
590591
createSystemRanges: false,
591592
},
592593
stopper)
593-
594+
st := store.ClusterSettings()
595+
looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled)
594596
// Note that turning off the replica scanner does not prevent the queues
595597
// from processing entries (in this case specifically the raftLogQueue),
596598
// just that the scanner will not try to push all replicas onto the queues.
@@ -618,6 +620,10 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
618620
// fairly quickly, there is a slight race between this check and the
619621
// truncation, especially when under stress.
620622
testutils.SucceedsSoon(t, func() error {
623+
if looselyCoupled {
624+
// Flush the engine to advance durability, which triggers truncation.
625+
require.NoError(t, store.engine.Flush())
626+
}
621627
newFirstIndex, err := r.GetFirstIndex()
622628
if err != nil {
623629
t.Fatal(err)
@@ -702,108 +708,105 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
702708
func TestTruncateLog(t *testing.T) {
703709
defer leaktest.AfterTest(t)()
704710
defer log.Scope(t).Close(t)
705-
tc := testContext{}
706-
ctx := context.Background()
707-
cfg := TestStoreConfig(nil)
708-
cfg.TestingKnobs.DisableRaftLogQueue = true
709-
stopper := stop.NewStopper()
710-
defer stopper.Stop(ctx)
711-
tc.StartWithStoreConfig(ctx, t, stopper, cfg)
711+
testutils.RunTrueAndFalse(t, "loosely-coupled", func(t *testing.T, looselyCoupled bool) {
712+
tc := testContext{}
713+
ctx := context.Background()
714+
cfg := TestStoreConfig(nil)
715+
cfg.TestingKnobs.DisableRaftLogQueue = true
716+
stopper := stop.NewStopper()
717+
defer stopper.Stop(ctx)
718+
tc.StartWithStoreConfig(ctx, t, stopper, cfg)
719+
st := tc.store.ClusterSettings()
720+
looselyCoupledTruncationEnabled.Override(ctx, &st.SV, looselyCoupled)
721+
722+
// Populate the log with 10 entries. Save the LastIndex after each write.
723+
var indexes []uint64
724+
for i := 0; i < 10; i++ {
725+
args := incrementArgs([]byte("a"), int64(i))
726+
727+
if _, pErr := tc.SendWrapped(args); pErr != nil {
728+
t.Fatal(pErr)
729+
}
730+
idx, err := tc.repl.GetLastIndex()
731+
if err != nil {
732+
t.Fatal(err)
733+
}
734+
indexes = append(indexes, idx)
735+
}
712736

713-
// Populate the log with 10 entries. Save the LastIndex after each write.
714-
var indexes []uint64
715-
for i := 0; i < 10; i++ {
716-
args := incrementArgs([]byte("a"), int64(i))
737+
rangeID := tc.repl.RangeID
717738

718-
if _, pErr := tc.SendWrapped(args); pErr != nil {
739+
// Discard the first half of the log.
740+
truncateArgs := truncateLogArgs(indexes[5], rangeID)
741+
if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil {
719742
t.Fatal(pErr)
720743
}
721-
idx, err := tc.repl.GetLastIndex()
744+
745+
waitForTruncationForTesting(t, tc.repl, indexes[5], looselyCoupled)
746+
747+
// We can still get what remains of the log.
748+
tc.repl.mu.Lock()
749+
entries, err := tc.repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64)
750+
tc.repl.mu.Unlock()
722751
if err != nil {
723752
t.Fatal(err)
724753
}
725-
indexes = append(indexes, idx)
726-
}
727-
728-
rangeID := tc.repl.RangeID
729-
730-
// Discard the first half of the log.
731-
truncateArgs := truncateLogArgs(indexes[5], rangeID)
732-
if _, pErr := tc.SendWrappedWith(roachpb.Header{RangeID: 1}, &truncateArgs); pErr != nil {
733-
t.Fatal(pErr)
734-
}
735-
736-
// FirstIndex has changed.
737-
firstIndex, err := tc.repl.GetFirstIndex()
738-
if err != nil {
739-
t.Fatal(err)
740-
}
741-
if firstIndex != indexes[5] {
742-
t.Errorf("expected firstIndex == %d, got %d", indexes[5], firstIndex)
743-
}
744-
745-
// We can still get what remains of the log.
746-
tc.repl.mu.Lock()
747-
entries, err := tc.repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64)
748-
tc.repl.mu.Unlock()
749-
if err != nil {
750-
t.Fatal(err)
751-
}
752-
if len(entries) != int(indexes[9]-indexes[5]) {
753-
t.Errorf("expected %d entries, got %d", indexes[9]-indexes[5], len(entries))
754-
}
754+
if len(entries) != int(indexes[9]-indexes[5]) {
755+
t.Errorf("expected %d entries, got %d", indexes[9]-indexes[5], len(entries))
756+
}
755757

756-
// But any range that includes the truncated entries returns an error.
757-
tc.repl.mu.Lock()
758-
_, err = tc.repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64)
759-
tc.repl.mu.Unlock()
760-
if !errors.Is(err, raft.ErrCompacted) {
761-
t.Errorf("expected ErrCompacted, got %s", err)
762-
}
758+
// But any range that includes the truncated entries returns an error.
759+
tc.repl.mu.Lock()
760+
_, err = tc.repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64)
761+
tc.repl.mu.Unlock()
762+
if !errors.Is(err, raft.ErrCompacted) {
763+
t.Errorf("expected ErrCompacted, got %s", err)
764+
}
763765

764-
// The term of the last truncated entry is still available.
765-
tc.repl.mu.Lock()
766-
term, err := tc.repl.raftTermRLocked(indexes[4])
767-
tc.repl.mu.Unlock()
768-
if err != nil {
769-
t.Fatal(err)
770-
}
771-
if term == 0 {
772-
t.Errorf("invalid term 0 for truncated entry")
773-
}
766+
// The term of the last truncated entry is still available.
767+
tc.repl.mu.Lock()
768+
term, err := tc.repl.raftTermRLocked(indexes[4])
769+
tc.repl.mu.Unlock()
770+
if err != nil {
771+
t.Fatal(err)
772+
}
773+
if term == 0 {
774+
t.Errorf("invalid term 0 for truncated entry")
775+
}
774776

775-
// The terms of older entries are gone.
776-
tc.repl.mu.Lock()
777-
_, err = tc.repl.raftTermRLocked(indexes[3])
778-
tc.repl.mu.Unlock()
779-
if !errors.Is(err, raft.ErrCompacted) {
780-
t.Errorf("expected ErrCompacted, got %s", err)
781-
}
777+
// The terms of older entries are gone.
778+
tc.repl.mu.Lock()
779+
_, err = tc.repl.raftTermRLocked(indexes[3])
780+
tc.repl.mu.Unlock()
781+
if !errors.Is(err, raft.ErrCompacted) {
782+
t.Errorf("expected ErrCompacted, got %s", err)
783+
}
782784

783-
// Truncating logs that have already been truncated should not return an
784-
// error.
785-
truncateArgs = truncateLogArgs(indexes[3], rangeID)
786-
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
787-
t.Fatal(pErr)
788-
}
785+
// Truncating logs that have already been truncated should not return an
786+
// error.
787+
truncateArgs = truncateLogArgs(indexes[3], rangeID)
788+
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
789+
t.Fatal(pErr)
790+
}
789791

790-
// Truncating logs that have the wrong rangeID included should not return
791-
// an error but should not truncate any logs.
792-
truncateArgs = truncateLogArgs(indexes[9], rangeID+1)
793-
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
794-
t.Fatal(pErr)
795-
}
792+
// Truncating logs that have the wrong rangeID included should not return
793+
// an error but should not truncate any logs.
794+
truncateArgs = truncateLogArgs(indexes[9], rangeID+1)
795+
if _, pErr := tc.SendWrapped(&truncateArgs); pErr != nil {
796+
t.Fatal(pErr)
797+
}
796798

797-
tc.repl.mu.Lock()
798-
// The term of the last truncated entry is still available.
799-
term, err = tc.repl.raftTermRLocked(indexes[4])
800-
tc.repl.mu.Unlock()
801-
if err != nil {
802-
t.Fatal(err)
803-
}
804-
if term == 0 {
805-
t.Errorf("invalid term 0 for truncated entry")
806-
}
799+
tc.repl.mu.Lock()
800+
// The term of the last truncated entry is still available.
801+
term, err = tc.repl.raftTermRLocked(indexes[4])
802+
tc.repl.mu.Unlock()
803+
if err != nil {
804+
t.Fatal(err)
805+
}
806+
if term == 0 {
807+
t.Errorf("invalid term 0 for truncated entry")
808+
}
809+
})
807810
}
808811

809812
func TestRaftLogQueueShouldQueueRecompute(t *testing.T) {
@@ -913,3 +916,21 @@ func TestTruncateLogRecompute(t *testing.T) {
913916
put() // make sure we remain trusted and in sync
914917
}
915918
}
919+
920+
func waitForTruncationForTesting(
921+
t *testing.T, r *Replica, newFirstIndex uint64, looselyCoupled bool,
922+
) {
923+
testutils.SucceedsSoon(t, func() error {
924+
if looselyCoupled {
925+
// Flush the engine to advance durability, which triggers truncation.
926+
require.NoError(t, r.Engine().Flush())
927+
}
928+
// FirstIndex has changed.
929+
firstIndex, err := r.GetFirstIndex()
930+
require.NoError(t, err)
931+
if firstIndex != newFirstIndex {
932+
return errors.Errorf("expected firstIndex == %d, got %d", newFirstIndex, firstIndex)
933+
}
934+
return nil
935+
})
936+
}

0 commit comments

Comments
 (0)