Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Revert "Merge pull request vitessio#6721 from tinyspeck/fixes-long-wa…
Browse files Browse the repository at this point in the history
…it-filter-keyspace-upstream"

This reverts commit 6b9229e.
  • Loading branch information
rafael committed Oct 1, 2020
1 parent fea82c1 commit 6dfdfdc
Show file tree
Hide file tree
Showing 6 changed files with 1 addition and 210 deletions.
27 changes: 0 additions & 27 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,30 +612,8 @@ func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets
return hc.waitForTablets(ctx, targets, true)
}

// FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces
func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target {
filteredTargets := make([]*query.Target, 0)

// Keep them all if there are no keyspaces to watch
if len(KeyspacesToWatch) == 0 {
return append(filteredTargets, targets...)
}

// Let's remove from the target shards that are not in the keyspaceToWatch list.
for _, target := range targets {
for _, keyspaceToWatch := range keyspaces {
if target.Keyspace == keyspaceToWatch {
filteredTargets = append(filteredTargets, target)
}
}
}
return filteredTargets
}

// waitForTablets is the internal method that polls for tablets.
func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error {
targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets)

for {
// We nil targets as we find them.
allPresent := true
Expand Down Expand Up @@ -667,11 +645,6 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.
select {
case <-ctx.Done():
timer.Stop()
for _, target := range targets {
if target != nil {
log.Infof("couldn't find tablets for target: %v", target)
}
}
return ctx.Err()
case <-timer.C:
}
Expand Down
89 changes: 0 additions & 89 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/vt/proto/query"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand Down Expand Up @@ -426,94 +425,6 @@ func TestHealthCheckTimeout(t *testing.T) {
mustMatch(t, want, result, "Wrong TabletHealth data")
}

func TestWaitForAllServingTablets(t *testing.T) {
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
defer hc.Close()
tablet := createTestTablet(0, "cell", "a")
tablet.Type = topodatapb.TabletType_REPLICA
targets := []*query.Target{
{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}
input := make(chan *querypb.StreamHealthResponse)
createFakeConn(tablet, input)

// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// there will be a first result, get and discard it
<-resultChan
// empty
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := hc.WaitForAllServingTablets(ctx, targets)
assert.NotNil(t, err, "error should not be nil")

shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
}

input <- shr
<-resultChan
// // check it's there

targets = []*query.Target{
{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.Nil(t, err, "error should be nil. Targets are found")

targets = []*query.Target{
{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
{
Keyspace: "newkeyspace",
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.NotNil(t, err, "error should not be nil (there are no tablets on this keyspace")

targets = []*query.Target{
{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
{
Keyspace: "newkeyspace",
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

KeyspacesToWatch = []string{tablet.Keyspace}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.Nil(t, err, "error should be nil. Keyspace with no tablets is filtered")

KeyspacesToWatch = []string{}
}

// TestGetHealthyTablets tests the functionality of GetHealthyTabletStats.
func TestGetHealthyTablets(t *testing.T) {
ts := memorytopo.NewServer("cell")
Expand Down
6 changes: 0 additions & 6 deletions go/vt/discovery/legacy_tablet_stats_cache_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"golang.org/x/net/context"

"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand Down Expand Up @@ -85,11 +84,6 @@ func (tc *LegacyTabletStatsCache) waitForTablets(ctx context.Context, targets []
timer := time.NewTimer(waitAvailableTabletInterval)
select {
case <-ctx.Done():
for _, target := range targets {
if target != nil {
log.Infof("couldn't find tablets for target: %v", target)
}
}
timer.Stop()
return ctx.Err()
case <-timer.C:
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ func (dg *DiscoveryGateway) WaitForTablets(ctx context.Context, tabletTypesToWai
return err
}

filteredTargets := discovery.FilterTargetsByKeyspaces(discovery.KeyspacesToWatch, targets)
return dg.tsc.WaitForAllServingTablets(ctx, filteredTargets)
return dg.tsc.WaitForAllServingTablets(ctx, targets)
}

// Close shuts down underlying connections.
Expand Down
78 changes: 0 additions & 78 deletions go/vt/vtgate/discoverygateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"strings"
"testing"
"time"

"vitess.io/vitess/go/vt/log"

Expand All @@ -35,7 +34,6 @@ import (
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand Down Expand Up @@ -136,82 +134,6 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) {
}
}

func TestDiscoveryGatewayWaitForTablets(t *testing.T) {
keyspace := "ks"
shard := "0"
cell := "local"
hc := discovery.NewFakeLegacyHealthCheck()
ts := memorytopo.NewServer("local")
srvTopo := srvtopotest.NewPassthroughSrvTopoServer()
srvTopo.TopoServer = ts
srvTopo.SrvKeyspaceNames = []string{keyspace}
srvTopo.SrvKeyspace = &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodata.TabletType_MASTER,
ShardReferences: []*topodatapb.ShardReference{
{
Name: shard,
},
},
},
{
ServedType: topodata.TabletType_REPLICA,
ShardReferences: []*topodatapb.ShardReference{
{
Name: shard,
},
},
},
{
ServedType: topodata.TabletType_RDONLY,
ShardReferences: []*topodatapb.ShardReference{
{
Name: shard,
},
},
},
},
}

dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2)

// replica should only use local ones
hc.Reset()
dg.tsc.ResetForTesting()
hc.AddTestTablet(cell, "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil)
hc.AddTestTablet(cell, "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_MASTER, true, 5, nil)
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
err := dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_MASTER})
if err != nil {
t.Errorf("want %+v, got %+v", nil, err)
}

// fails if there are no available tablets for the desired TabletType
err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_RDONLY})
if err == nil {
t.Errorf("expected error, got nil")
}

// errors because there is no primary on ks2
ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
srvTopo.SrvKeyspaceNames = []string{keyspace, "ks2"}
err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER})
if err == nil {
t.Errorf("expected error, got nil")
}

discovery.KeyspacesToWatch = []string{keyspace}
// does not wait for ks2 if it's not part of the filter
ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER})
if err != nil {
t.Errorf("want %+v, got %+v", nil, err)
}

discovery.KeyspacesToWatch = []string{}
}

func TestShuffleTablets(t *testing.T) {
ts1 := discovery.LegacyTabletStats{
Key: "t1",
Expand Down
8 changes: 0 additions & 8 deletions go/vt/vttablet/tabletconn/tablet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package tabletconn

import (
"flag"
"sync"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -51,14 +50,9 @@ type TabletDialer func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast)

var dialers = make(map[string]TabletDialer)

// mu This mutex helps us prevent data races when registering / getting dialers
var mu sync.Mutex

// RegisterDialer is meant to be used by TabletDialer implementations
// to self register.
func RegisterDialer(name string, dialer TabletDialer) {
mu.Lock()
defer mu.Unlock()
if _, ok := dialers[name]; ok {
log.Fatalf("Dialer %s already exists", name)
}
Expand All @@ -67,8 +61,6 @@ func RegisterDialer(name string, dialer TabletDialer) {

// GetDialer returns the dialer to use, described by the command line flag
func GetDialer() TabletDialer {
mu.Lock()
defer mu.Unlock()
td, ok := dialers[*TabletProtocol]
if !ok {
log.Exitf("No dialer registered for tablet protocol %s", *TabletProtocol)
Expand Down

0 comments on commit 6dfdfdc

Please sign in to comment.