Skip to content

Commit

Permalink
Random selection of keyspace based on available tablet (#13359)
Browse files Browse the repository at this point in the history
Co-authored-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
harshit-gangal and GuptaManan100 authored Jul 13, 2023
1 parent 6d4d00a commit 6e77777
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 19 deletions.
12 changes: 12 additions & 0 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,18 @@ func (fhc *FakeHealthCheck) GetAllTablets() map[string]*topodatapb.Tablet {
return res
}

// BroadcastAll broadcasts all the tablets' healthchecks
func (fhc *FakeHealthCheck) BroadcastAll() {
if fhc.ch == nil {
return
}
fhc.mu.Lock()
defer fhc.mu.Unlock()
for _, item := range fhc.items {
fhc.ch <- simpleCopy(item.ts)
}
}

func simpleCopy(th *TabletHealth) *TabletHealth {
return &TabletHealth{
Conn: th.Conn,
Expand Down
26 changes: 26 additions & 0 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,18 @@ func (kss *keyspaceState) onSrvKeyspace(newKeyspace *topodatapb.SrvKeyspace, new
return true
}

// isServing returns whether a keyspace has at least one serving shard or not.
func (kss *keyspaceState) isServing() bool {
kss.mu.Lock()
defer kss.mu.Unlock()
for _, state := range kss.shards {
if state.serving {
return true
}
}
return false
}

// newKeyspaceState allocates the internal state required to keep track of availability incidents
// in this keyspace, and starts up a SrvKeyspace watcher on our topology server which will update
// our keyspaceState with any topology changes in real time.
Expand Down Expand Up @@ -471,3 +483,17 @@ func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(target *query.Target) (*top
}
return nil, false
}

// GetServingKeyspaces gets the serving keyspaces from the keyspace event watcher.
func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
kew.mu.Lock()
defer kew.mu.Unlock()

var servingKeyspaces []string
for ksName, state := range kew.keyspaces {
if state.isServing() {
servingKeyspaces = append(servingKeyspaces, ksName)
}
}
return servingKeyspaces
}
6 changes: 4 additions & 2 deletions go/vt/srvtopo/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package srvtopo

import (
"context"
"sort"

"vitess.io/vitess/go/sqltypes"

"context"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/key"
Expand All @@ -43,6 +42,9 @@ type Gateway interface {

// QueryServiceByAlias returns a QueryService
QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error)

// GetServingKeyspaces returns list of serving keyspaces.
GetServingKeyspaces() []string
}

// A Resolver can resolve keyspace ids and key ranges into ResolvedShard*
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func init() {

func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn) {
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
hc := discovery.NewFakeHealthCheck(make(chan *discovery.TabletHealth))
s := createSandbox(KsTestSharded)
s.VSchema = executorVSchema
serv := newSandboxForCells([]string{cell})
Expand Down Expand Up @@ -162,7 +162,6 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
_ = hc.AddTestTablet(cell, "c0-e0", 1, "TestExecutor", "c0-e0", topodatapb.TabletType_PRIMARY, true, 1, nil)
_ = hc.AddTestTablet(cell, "e0-", 1, "TestExecutor", "e0-", topodatapb.TabletType_PRIMARY, true, 1, nil)
// Below is needed so that SendAnyWherePlan doesn't fail
_ = hc.AddTestTablet(cell, "random", 1, "TestXBadVSchema", "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)

createSandbox(KsTestUnsharded)
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down
46 changes: 45 additions & 1 deletion go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
Expand Down Expand Up @@ -832,7 +833,7 @@ func TestExecutorShow(t *testing.T) {
Fields: buildVarCharFields("Cell", "Keyspace", "Shard", "TabletType", "State", "Alias", "Hostname", "PrimaryTermStartTime"),
Rows: [][]sqltypes.Value{
buildVarCharRow("aa", "TestExecutor", "-20", "PRIMARY", "SERVING", "aa-0000000001", "-20", "1970-01-01T00:00:01Z"),
buildVarCharRow("aa", "TestXBadVSchema", "-20", "PRIMARY", "SERVING", "aa-0000000009", "random", "1970-01-01T00:00:01Z"),
buildVarCharRow("aa", "TestUnsharded", "0", "REPLICA", "SERVING", "aa-0000000010", "2", "1970-01-01T00:00:01Z"),
},
}
utils.MustMatch(t, wantqr, qr, query)
Expand Down Expand Up @@ -2063,6 +2064,49 @@ func TestExecutorClearsWarnings(t *testing.T) {
require.Empty(t, session.Warnings)
}

// TestServingKeyspaces tests that the dual queries are routed to the correct keyspaces from the list of serving keyspaces.
func TestServingKeyspaces(t *testing.T) {
executor, sbc1, _, sbclookup := createExecutorEnv()
executor.pv = querypb.ExecuteOptions_Gen4
gw, ok := executor.resolver.resolver.GetGateway().(*TabletGateway)
require.True(t, ok)
hc := gw.hc.(*discovery.FakeHealthCheck)

// We broadcast twice because we want to ensure the keyspace event watcher has processed all the healthcheck updates
// from the first broadcast. Since we use a channel for broadcasting, it is blocking and hence the second call ensures
// all the updates (specifically the last one) has been processed by the keyspace-event-watcher.
hc.BroadcastAll()
hc.BroadcastAll()

sbc1.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace", "varchar"), "TestExecutor"),
})
sbclookup.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("keyspace", "varchar"), "TestUnsharded"),
})

require.ElementsMatch(t, []string{"TestExecutor", "TestUnsharded"}, gw.GetServingKeyspaces())
result, err := executor.Execute(ctx, nil, "TestServingKeyspaces", NewSafeSession(&vtgatepb.Session{}), "select keyspace_name from dual", nil)
require.NoError(t, err)
require.Equal(t, `[[VARCHAR("TestExecutor")]]`, fmt.Sprintf("%v", result.Rows))

for _, tablet := range hc.GetAllTablets() {
if tablet.Keyspace == "TestExecutor" {
hc.SetServing(tablet, false)
}
}
// Two broadcast calls for the same reason as above.
hc.BroadcastAll()
hc.BroadcastAll()

// Clear plan cache, to force re-planning of the query.
executor.plans.Clear()
require.ElementsMatch(t, []string{"TestUnsharded"}, gw.GetServingKeyspaces())
result, err = executor.Execute(ctx, nil, "TestServingKeyspaces", NewSafeSession(&vtgatepb.Session{}), "select keyspace_name from dual", nil)
require.NoError(t, err)
require.Equal(t, `[[VARCHAR("TestUnsharded")]]`, fmt.Sprintf("%v", result.Rows))
}

func TestExecutorOtherRead(t *testing.T) {
executor, sbc1, sbc2, sbclookup := createExecutorEnv()

Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, targ
return queryservice.Wrap(qs, gw.withShardError), NewShardError(err, target)
}

// GetServingKeyspaces returns list of serving keyspaces.
func (gw *TabletGateway) GetServingKeyspaces() []string {
if gw.kev == nil {
return nil
}
return gw.kev.GetServingKeyspaces()
}

// RegisterStats registers the stats to export the lag since the last refresh
// and the checksum of the topology
func (gw *TabletGateway) RegisterStats() {
Expand Down
42 changes: 28 additions & 14 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,7 @@ func (vc *vcursorImpl) AnyKeyspace() (*vindexes.Keyspace, error) {
return nil, errNoDbAvailable
}

var keyspaces = make([]*vindexes.Keyspace, 0, len(vc.vschema.Keyspaces))
for _, ks := range vc.vschema.Keyspaces {
keyspaces = append(keyspaces, ks.Keyspace)
}
sort.Slice(keyspaces, func(i, j int) bool {
return keyspaces[i].Name < keyspaces[j].Name
})
keyspaces := vc.getSortedServingKeyspaces()

// Look for any sharded keyspace if present, otherwise take the first keyspace,
// sorted alphabetically
Expand All @@ -360,18 +354,38 @@ func (vc *vcursorImpl) AnyKeyspace() (*vindexes.Keyspace, error) {
return keyspaces[0], nil
}

// getSortedServingKeyspaces gets the sorted serving keyspaces
func (vc *vcursorImpl) getSortedServingKeyspaces() []*vindexes.Keyspace {
var keyspaces []*vindexes.Keyspace

if vc.resolver != nil && vc.resolver.GetGateway() != nil {
keyspaceNames := vc.resolver.GetGateway().GetServingKeyspaces()
for _, ksName := range keyspaceNames {
ks, exists := vc.vschema.Keyspaces[ksName]
if exists {
keyspaces = append(keyspaces, ks.Keyspace)
}
}
}

if len(keyspaces) == 0 {
for _, ks := range vc.vschema.Keyspaces {
keyspaces = append(keyspaces, ks.Keyspace)
}
}
sort.Slice(keyspaces, func(i, j int) bool {
return keyspaces[i].Name < keyspaces[j].Name
})
return keyspaces
}

func (vc *vcursorImpl) FirstSortedKeyspace() (*vindexes.Keyspace, error) {
if len(vc.vschema.Keyspaces) == 0 {
return nil, errNoDbAvailable
}
kss := vc.vschema.Keyspaces
keys := make([]string, 0, len(kss))
for ks := range kss {
keys = append(keys, ks)
}
sort.Strings(keys)
keyspaces := vc.getSortedServingKeyspaces()

return kss[keys[0]].Keyspace, nil
return keyspaces[0], nil
}

// SysVarSetEnabled implements the ContextVSchema interface
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type SandboxConn struct {
// These errors work for all functions.
MustFailCodes map[vtrpcpb.Code]int

// ServingKeyspaces is a list of serving keyspaces
ServingKeyspaces []string

// These errors are triggered only for specific functions.
// For now these are just for the 2PC functions.
MustFailPrepare int
Expand Down Expand Up @@ -509,6 +512,11 @@ func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *queryp
return sbc, nil
}

// GetServingKeyspaces returns list of serving keyspaces.
func (sbc *SandboxConn) GetServingKeyspaces() []string {
return sbc.ServingKeyspaces
}

// HandlePanic is part of the QueryService interface.
func (sbc *SandboxConn) HandlePanic(err *error) {
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,11 @@ func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *que
panic("not implemented")
}

// GetServingKeyspaces returns list of serving keyspaces.
func (f *FakeQueryService) GetServingKeyspaces() []string {
panic("not implemented")
}

// ReserveBeginExecute satisfies the Gateway interface
func (f *FakeQueryService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (queryservice.ReservedTransactionState, *sqltypes.Result, error) {
panic("implement me")
Expand Down

0 comments on commit 6e77777

Please sign in to comment.