Skip to content

Commit

Permalink
add keyrange support for vtorc clusters_to_watch
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <pbibra@slack-corp.com>
  • Loading branch information
pbibra committed Aug 6, 2024
1 parent bebddba commit a4f4141
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Flags:
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config string config file name
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
Expand Down
106 changes: 79 additions & 27 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"slices"
"strings"
"sync"
Expand All @@ -31,6 +32,7 @@ import (
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand All @@ -57,7 +59,7 @@ var (

// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards or keyrange values that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}

Expand All @@ -81,6 +83,76 @@ func refreshAllTablets() {
}, false /* forceRefresh */)
}

// getKeyspaceShardsToWatch converts the input clustersToWatch into a list of individual keyspace/shards.
// This handles both individual shards or key ranges using TabletFilter from the discovery package.
func getKeyspaceShardsToWatch() ([]*topo.KeyspaceShard, error) {
// Parse input and build list of keyspaces / shards
var keyspaceShards []*topo.KeyspaceShard

keyspaces := make(map[string]map[string]string)
filters := make(map[string][]string)

keyspaces["ranged"] = map[string]string{}
keyspaces["full"] = map[string]string{}

for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") {
// This is a keyspace/shard specification
input := strings.Split(ks, "/")
keyspaces["ranged"][input[0]] = "ranged"
// filter creation expects a pipe separator between keyspace and shard
filters[input[0]] = append(filters[input[0]], fmt.Sprintf("%s|%s", input[0], input[1]))

} else {
keyspaces["full"][ks] = "full"
}
}

// Copy function will combine the two maps. It will override any keyspaces in ranged that also exist in full with the
// full designation because we assume that the full keyspace will take precedence over a keyspace/shard specification within the same input.
// e.g. If the clustersToWatch is `ks1,...ks1/10-20`, all tablets in ks1 should be watched.
maps.Copy(keyspaces["ranged"], keyspaces["full"])

if len(keyspaces["ranged"]) > 0 {
for ks, filterType := range keyspaces["ranged"] {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()

shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
// Log the errr and continue
log.Errorf("Error fetching shards for keyspace: %v", ks)
continue
}

if len(shards) == 0 {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
}

if filterType == "ranged" {
shardFilter, err := discovery.NewFilterByShard(filters[ks])
if err != nil {
log.Error(err)
return keyspaceShards, err
}

for _, s := range shards {
if shardFilter.IsIncluded(&topodatapb.Tablet{Keyspace: ks, Shard: s}) {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
}
}
} else {
for _, s := range shards {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
}
}
}
}

return keyspaceShards, nil
}

func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
if !IsLeaderOrActive() {
return
Expand All @@ -106,33 +178,13 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}
wg.Wait()
} else {
// Parse input and build list of keyspaces / shards
var keyspaceShards []*topo.KeyspaceShard
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") {
// This is a keyspace/shard specification
input := strings.Split(ks, "/")
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
} else {
// Assume this is a keyspace and find all shards in keyspace
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
// Log the errr and continue
log.Errorf("Error fetching shards for keyspace: %v", ks)
continue
}
if len(shards) == 0 {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
}
for _, s := range shards {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s})
}
}
keyspaceShards, err := getKeyspaceShardsToWatch()
if err != nil {
log.Error(err)
return
}
if len(keyspaceShards) == 0 {

if len(keyspaceShards) == 0 || keyspaceShards == nil {
log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch)
return
}
Expand Down
126 changes: 126 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/external/golib/sqlutils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtorc/db"
Expand Down Expand Up @@ -274,6 +275,131 @@ func TestShardPrimary(t *testing.T) {
}
}

func TestGetKeyspaceShardsToWatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ts = memorytopo.NewServer(ctx, "test_cell")

keyspaces := []string{"test_keyspace", "test_keyspace2", "test_keyspace3", "test_keyspace4"}
for _, k := range keyspaces {
if err := ts.CreateKeyspace(ctx, k, &topodatapb.Keyspace{}); err != nil {
t.Fatalf("cannot create keyspace: %v", err)
}
}

shards1 := []string{"-40", "40-50", "50-60", "60-70", "70-80", "80-"}
shards2 := []string{"-1000", "1000-1100", "1100-1200", "1200-1300", "1300-"}

for _, shard := range shards1 {
if err := ts.CreateShard(ctx, keyspaces[0], shard); err != nil {
t.Fatalf("cannot create shard: %v", err)
}
}

for _, shard := range shards2 {
if err := ts.CreateShard(ctx, keyspaces[1], shard); err != nil {
t.Fatalf("cannot create shard: %v", err)
}
}

if err := ts.CreateShard(ctx, keyspaces[2], "-"); err != nil {
t.Fatalf("cannot create shard: %v", err)
}

if err := ts.CreateShard(ctx, keyspaces[3], "0"); err != nil {
t.Fatalf("cannot create shard: %v", err)
}

testcases := []*struct {
name string
clusters []string
expected []*topo.KeyspaceShard
}{
{
name: "single shard and range",
clusters: []string{fmt.Sprintf("%s/40-50", keyspaces[0]), fmt.Sprintf("%s/60-80", keyspaces[0])},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspaces[0], Shard: "40-50"},
{Keyspace: keyspaces[0], Shard: "60-70"},
{Keyspace: keyspaces[0], Shard: "70-80"},
},
}, {
name: "single shard",
clusters: []string{fmt.Sprintf("%s/40-50", keyspaces[0])},
expected: []*topo.KeyspaceShard{{Keyspace: keyspaces[0], Shard: "40-50"}},
}, {
name: "full keyspace",
clusters: []string{keyspaces[0]},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspaces[0], Shard: "-40"},
{Keyspace: keyspaces[0], Shard: "40-50"},
{Keyspace: keyspaces[0], Shard: "50-60"},
{Keyspace: keyspaces[0], Shard: "60-70"},
{Keyspace: keyspaces[0], Shard: "70-80"},
{Keyspace: keyspaces[0], Shard: "80-"},
},
}, {
name: "full keyspace with keyrange",
clusters: []string{keyspaces[0], fmt.Sprintf("%s/60-80", keyspaces[0])},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspaces[0], Shard: "-40"},
{Keyspace: keyspaces[0], Shard: "40-50"},
{Keyspace: keyspaces[0], Shard: "50-60"},
{Keyspace: keyspaces[0], Shard: "60-70"},
{Keyspace: keyspaces[0], Shard: "70-80"},
{Keyspace: keyspaces[0], Shard: "80-"},
},
}, {
name: "multi keyspace",
clusters: []string{keyspaces[0], fmt.Sprintf("%s/1100-1300", keyspaces[1])},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspaces[1], Shard: "1100-1200"},
{Keyspace: keyspaces[1], Shard: "1200-1300"},
{Keyspace: keyspaces[0], Shard: "-40"},
{Keyspace: keyspaces[0], Shard: "40-50"},
{Keyspace: keyspaces[0], Shard: "50-60"},
{Keyspace: keyspaces[0], Shard: "60-70"},
{Keyspace: keyspaces[0], Shard: "70-80"},
{Keyspace: keyspaces[0], Shard: "80-"},
},
}, {
name: "partial success with non-existent shard",
clusters: []string{"non-existent/10-20", fmt.Sprintf("%s/1100-1300", keyspaces[1])},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspaces[1], Shard: "1100-1200"},
{Keyspace: keyspaces[1], Shard: "1200-1300"},
},
}, {
name: "empty result",
clusters: []string{"non-existent/10-20"},
expected: nil,
}, {
name: "single keyspace -",
clusters: []string{keyspaces[2]},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspaces[2], Shard: "-"},
},
}, {
name: "single keyspace 0",
clusters: []string{keyspaces[3]},
expected: []*topo.KeyspaceShard{
{Keyspace: keyspaces[3], Shard: "0"},
},
},
}

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
clustersToWatch = testcase.clusters
res, err := getKeyspaceShardsToWatch()

assert.NoError(t, err)
assert.EqualValues(t, testcase.expected, res)
})
}
}

// verifyRefreshTabletsInKeyspaceShard calls refreshTabletsInKeyspaceShard with the forceRefresh parameter provided and verifies that
// the number of instances refreshed matches the parameter and all the tablets match the ones provided
func verifyRefreshTabletsInKeyspaceShard(t *testing.T, forceRefresh bool, instanceRefreshRequired int, tablets []*topodatapb.Tablet, tabletsToIgnore []string) {
Expand Down

0 comments on commit a4f4141

Please sign in to comment.