Skip to content

Commit

Permalink
[v14.0.5]: Backport upstream 13856 - reshard state check fix (#321)
Browse files Browse the repository at this point in the history
* Backport upstream 13856 - reshard state check fix

* revert comment change

* fix null check
  • Loading branch information
tanjinx authored May 6, 2024
1 parent b878269 commit f4025ed
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ require (

require (
github.com/bndr/gotabulate v1.1.2
go.uber.org/goleak v1.2.1
modernc.org/sqlite v1.20.3
)

Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,8 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
93 changes: 93 additions & 0 deletions go/test/utils/noleak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"context"
"testing"
"time"

"go.uber.org/goleak"
)

// LeakCheckContext returns a Context that will be automatically cancelled at the end
// of this test. If the test has finished successfully, it will be checked for goroutine
// leaks after context cancellation.
func LeakCheckContext(t testing.TB) context.Context {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
EnsureNoLeaks(t)
})
return ctx
}

// LeakCheckContextTimeout behaves like LeakCheckContext but the returned Context will
// be cancelled after `timeout`, or after the test finishes, whichever happens first.
func LeakCheckContextTimeout(t testing.TB, timeout time.Duration) context.Context {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(func() {
cancel()
EnsureNoLeaks(t)
})
return ctx
}

// EnsureNoLeaks checks for goroutine and socket leaks and fails the test if any are found.
func EnsureNoLeaks(t testing.TB) {
if t.Failed() {
return
}
if err := ensureNoLeaks(); err != nil {
t.Fatal(err)
}
}

// GetLeaks checks for goroutine and socket leaks and returns an error if any are found.
// One use case is in TestMain()s to ensure that all tests are cleaned up.
func GetLeaks() error {
return ensureNoLeaks()
}

func ensureNoLeaks() error {
if err := ensureNoGoroutines(); err != nil {
return err
}
return nil
}

func ensureNoGoroutines() error {
var ignored = []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/dbconfigs.init.0.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.resetAggregators"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vtgate.processQueryInfo"),
goleak.IgnoreTopFunction("github.com/patrickmn/go-cache.(*janitor).Run"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/logutil.(*ThrottledLogger).log.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.NewBackgroundClient.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("testing.tRunner.func1"),
}

var err error
for i := 0; i < 5; i++ {
err = goleak.Find(ignored...)
if err == nil {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return err
}
31 changes: 21 additions & 10 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/query"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -65,7 +66,7 @@ type KeyspaceEvent struct {

type ShardEvent struct {
Tablet *topodatapb.TabletAlias
Target *query.Target
Target *querypb.Target
Serving bool
}

Expand Down Expand Up @@ -125,16 +126,26 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
defer kss.mu.Unlock()

// if the keyspace is gone, or if it has no known availability events, the keyspace
// cannot be in the middle of a resharding operation
// cannot be in the middle of a resharding operation.
if kss.deleted || kss.consistent {
return false
}

// for all the known shards, try to find a primary shard besides the one we're trying to access
// and which is currently healthy. if there are other healthy primaries in the keyspace, it means
// we're in the middle of a resharding operation
// If there are unequal and overlapping shards in the keyspace and any of them are
// currently serving then we assume that we are in the middle of a Reshard.
_, ckr, err := topo.ValidateShardName(currentShard)
if err != nil || ckr == nil { // Assume not and avoid potential panic
return false
}
for shard, sstate := range kss.shards {
if shard != currentShard && sstate.serving {
if !sstate.serving || shard == currentShard {
continue
}
_, skr, err := topo.ValidateShardName(shard)
if err != nil || skr == nil { // Assume not and avoid potential panic
return false
}
if key.KeyRangesIntersect(ckr, skr) {
return true
}
}
Expand All @@ -143,7 +154,7 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
}

type shardState struct {
target *query.Target
target *querypb.Target
serving bool
externallyReparented int64
currentPrimary *topodatapb.TabletAlias
Expand Down Expand Up @@ -426,7 +437,7 @@ func (kew *KeyspaceEventWatcher) getKeyspaceStatus(keyspace string) *keyspaceSta
// This is not a fully accurate heuristic, but it's good enough that we'd want to buffer the
// request for the given target under the assumption that the reason why it cannot be completed
// right now is transitory.
func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *query.Target) bool {
func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *querypb.Target) bool {
if target.TabletType != topodatapb.TabletType_PRIMARY {
return false
}
Expand All @@ -446,7 +457,7 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(target *query.Target) bo
// The shard state keeps track of the current primary and the last externally reparented time, which we can use
// to determine that there was a serving primary which now became non serving. This is only possible in a DemotePrimary
// RPC which are only called from ERS and PRS. So buffering will stop when these operations succeed.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) bool {
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *querypb.Target) bool {
if target.TabletType != topodatapb.TabletType_PRIMARY {
return false
}
Expand Down
Loading

0 comments on commit f4025ed

Please sign in to comment.