Skip to content

Commit

Permalink
Merge pull request #16068 from CaojiamingAlan/release-3.5
Browse files Browse the repository at this point in the history
[3.5] etcdserver: backport check scheduledCompactKeyName and finishedCompac…
  • Loading branch information
ahrtr authored Jul 15, 2023
2 parents 9ac1d73 + 6ac9d94 commit 8f4b6c9
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
36 changes: 32 additions & 4 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,27 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
return nil, compactMainRev, nil
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
func (s *store) checkPrevCompactionCompleted() bool {
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
}

_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
finishedCompact := int64(0)
if len(finishedCompactBytes) != 0 {
finishedCompact = bytesToRev(finishedCompactBytes[0]).main

}
return scheduledCompact == finishedCompact
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -240,7 +260,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(context.TODO(), ch)
return
}
s.hashes.Store(hash)
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
s.hashes.Store(hash)
} else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
}
close(ch)
}

Expand All @@ -250,17 +276,19 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev)
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()

prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -269,7 +297,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev)
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Commit() {
Expand Down
4 changes: 4 additions & 0 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ func TestStoreCompact(t *testing.T) {
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(revision{1, 0}, false)
key2 := newTestKeyBytes(revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}

s.Compact(traceutil.TODO(), 3)
Expand All @@ -344,6 +346,8 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
Expand Down
4 changes: 4 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct {
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
WatchProcessNotifyInterval time.Duration
CompactionBatchLimit int
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -339,6 +340,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
if cfg.WatchProcessNotifyInterval != 0 {
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
}
if cfg.CompactionBatchLimit != 0 {
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
}

etcdCfgs[i] = &etcdServerProcessConfig{
lg: lg,
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/datadir"
Expand Down Expand Up @@ -180,3 +181,61 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
}

func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
checkTime := time.Second
BeforeTest(t)

slowCompactionNodeIndex := 1

// Start a new cluster, with compact hash check enabled.
t.Log("creating a new cluster with 3 nodes...")

epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 3,
keepDataDir: true,
CompactHashCheckEnabled: true,
CompactHashCheckTime: checkTime,
logLevel: "info",
CompactionBatchLimit: 1,
})
require.NoError(t, err)
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})

// Put 200 identical keys to the cluster, so that the compaction will drop some stale values.
// We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it.
t.Log("putting 200 values to the identical key...")
cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false)

for i := 0; i < 200; i++ {
err = cc.Put("key", fmt.Sprint(i))
require.NoError(t, err, "error on put")
}

t.Log("compaction started...")
_, err = cc.Compact(200)

t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
err = epc.procs[slowCompactionNodeIndex].Restart()
require.NoError(t, err)

// Wait until the node finished compaction.
_, err = epc.procs[slowCompactionNodeIndex].Logs().Expect("finished scheduled compaction")
require.NoError(t, err, "can't get log indicating finished scheduled compaction")

// Wait for compaction hash check
time.Sleep(checkTime * 5)

alarmResponse, err := cc.AlarmList()
require.NoError(t, err, "error on alarm list")
for _, alarm := range alarmResponse.Alarms {
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
t.Fatal("there should be no corruption after resuming the compaction, but corruption detected")
}
}
t.Log("no corruption detected.")
}

0 comments on commit 8f4b6c9

Please sign in to comment.