Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] etcdserver: backport check scheduledCompactKeyName and finishedCompac… #16068

Merged
merged 1 commit into from
Jul 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,27 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
return nil, compactMainRev, nil
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
func (s *store) checkPrevCompactionCompleted() bool {
tx := s.b.ReadTx()
tx.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why not use tx.RLock() but understand the PR is a backport of #15985

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock() for ReadTx() should be by default a Rlock().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, do you mean the following RWMutex Lock is equal to RLock to protect the read buffer?

func (rt *readTx) Lock() { rt.mu.Lock() }
func (rt *readTx) Unlock() { rt.mu.Unlock() }
func (rt *readTx) RLock() { rt.mu.RLock() }
func (rt *readTx) RUnlock() { rt.mu.RUnlock() }

Copy link
Contributor Author

@CaojiamingAlan CaojiamingAlan Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After investigating a little bit, I think you are right, we should use RLock() instead. Lock() should only be used when the read buffer is updated by the writes committed by the BatchTxs. However, there are several positions where use read-only operations but Lock() is used. I think it is good to open a separate PR to correct them all.

Please correct me if I'm wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That aligns with my understanding.

I also don't think it blocks this PR to be merged. This can be corrected separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old implementation uses stricter lock, which is sub-optimal, but is not a bug.

stricter lock is true, it's also wrong lock. It's a bug to me.

It should also have impact on performance for Non-K8s use case when auth is enabled. [Of course, it will be better if we have some performance comparison, but I will NOT force contributor to do it given it's a straightforward change]

To ensure release stability, we need to assume that the list you proposed can have a mistake.

That's why we need careful review. I wouldn't be afraid backporting it give it's a simple change and if we have confidence.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I prefer to backporting it, but I am not insist on backporting it if there is strong objection from other maintainers.

Copy link
Member

@serathius serathius Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stricter lock is true, it's also wrong lock. It's a bug to me.

Without the change system behaves correctly, with the change only thing you get is performance improvement. How this is a bug?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say it's harmless (on functionality) bug, obviously the lock isn't correctly be used. Let alone it has impact on performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugs is when locks don't protect the critical section, relaxing locks is performance improvement. Nothing is harmless until proven, let alone concurrency change.

defer tx.Unlock()
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
}

_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
finishedCompact := int64(0)
if len(finishedCompactBytes) != 0 {
finishedCompact = bytesToRev(finishedCompactBytes[0]).main

}
Comment on lines +235 to +246
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a UnsafeReadScheduledCompact and UnsafeReadFinishedCompact in server/mvcc/util.go for 3.5.

The two functions should be reused in multiple places, (Please search scheduledCompactKeyName and finishedCompactKeyName in the code base)

It's accepted to fix it in a separate PR. Please also raise a followup ticket.

return scheduledCompact == finishedCompact
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -240,7 +260,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(context.TODO(), ch)
return
}
s.hashes.Store(hash)
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
s.hashes.Store(hash)
} else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
}
close(ch)
}

Expand All @@ -250,17 +276,19 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev)
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()

prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -269,7 +297,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev)
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Commit() {
Expand Down
4 changes: 4 additions & 0 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ func TestStoreCompact(t *testing.T) {
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(revision{1, 0}, false)
key2 := newTestKeyBytes(revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}

s.Compact(traceutil.TODO(), 3)
Expand All @@ -344,6 +346,8 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
Expand Down
4 changes: 4 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct {
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
WatchProcessNotifyInterval time.Duration
CompactionBatchLimit int
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -339,6 +340,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
if cfg.WatchProcessNotifyInterval != 0 {
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
}
if cfg.CompactionBatchLimit != 0 {
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
}

etcdCfgs[i] = &etcdServerProcessConfig{
lg: lg,
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/datadir"
Expand Down Expand Up @@ -180,3 +181,61 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
}

func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
checkTime := time.Second
BeforeTest(t)

slowCompactionNodeIndex := 1

// Start a new cluster, with compact hash check enabled.
t.Log("creating a new cluster with 3 nodes...")

epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 3,
keepDataDir: true,
CompactHashCheckEnabled: true,
CompactHashCheckTime: checkTime,
logLevel: "info",
CompactionBatchLimit: 1,
})
require.NoError(t, err)
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})

// Put 200 identical keys to the cluster, so that the compaction will drop some stale values.
// We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it.
CaojiamingAlan marked this conversation as resolved.
Show resolved Hide resolved
t.Log("putting 200 values to the identical key...")
cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false)

for i := 0; i < 200; i++ {
err = cc.Put("key", fmt.Sprint(i))
require.NoError(t, err, "error on put")
}

t.Log("compaction started...")
_, err = cc.Compact(200)

t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
err = epc.procs[slowCompactionNodeIndex].Restart()
require.NoError(t, err)

// Wait until the node finished compaction.
_, err = epc.procs[slowCompactionNodeIndex].Logs().Expect("finished scheduled compaction")
require.NoError(t, err, "can't get log indicating finished scheduled compaction")

// Wait for compaction hash check
time.Sleep(checkTime * 5)

alarmResponse, err := cc.AlarmList()
require.NoError(t, err, "error on alarm list")
for _, alarm := range alarmResponse.Alarms {
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
t.Fatal("there should be no corruption after resuming the compaction, but corruption detected")
}
}
t.Log("no corruption detected.")
}