Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete discovery gateway #9500

Merged
merged 12 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ func main() {
vtgate.QueryLogHandler = "/debug/vtgate/querylog"
vtgate.QueryLogzHandler = "/debug/vtgate/querylogz"
vtgate.QueryzHandler = "/debug/vtgate/queryz"
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait)
// pass nil for healthcheck, it will get created
vtg := vtgate.Init(context.Background(), nil, resilientServer, tpb.Cells[0], tabletTypesToWait)

// vtctld configuration and init
err = vtctld.InitVtctld(ts)
Expand Down
12 changes: 3 additions & 9 deletions go/cmd/vtgate/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ func addStatusParts(vtg *vtgate.VTGate) {
servenv.AddStatusPart("Gateway Status", vtgate.StatusTemplate, func() interface{} {
return vtg.GetGatewayCacheStatus()
})
if vtgate.UsingLegacyGateway() {
servenv.AddStatusPart("Health Check Cache", discovery.LegacyHealthCheckTemplate, func() interface{} {
return legacyHealthCheck.CacheStatus()
})
} else {
servenv.AddStatusPart("Health Check Cache", discovery.HealthCheckTemplate, func() interface{} {
return vtg.Gateway().TabletsCacheStatus()
})
}
servenv.AddStatusPart("Health Check Cache", discovery.HealthCheckTemplate, func() interface{} {
return vtg.Gateway().TabletsCacheStatus()
})
}
17 changes: 2 additions & 15 deletions go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var (
)

var resilientServer *srvtopo.ResilientServer
var legacyHealthCheck discovery.LegacyHealthCheck

func init() {
rand.Seed(time.Now().UnixNano())
Expand Down Expand Up @@ -143,17 +142,8 @@ func main() {
log.Exitf("cells_to_watch validation failed: %v", err)
}

var vtg *vtgate.VTGate
if *vtgate.GatewayImplementation == vtgate.GatewayImplementationDiscovery {
// default value
legacyHealthCheck = discovery.NewLegacyHealthCheck(*vtgate.HealthCheckRetryDelay, *vtgate.HealthCheckTimeout)
legacyHealthCheck.RegisterStats()

vtg = vtgate.LegacyInit(context.Background(), legacyHealthCheck, resilientServer, *cell, *vtgate.RetryCount, tabletTypes)
} else {
// use new Init otherwise
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes)
}
// pass nil for HealthCheck and it will be created
vtg := vtgate.Init(context.Background(), nil, resilientServer, *cell, tabletTypes)

servenv.OnRun(func() {
// Flags are parsed now. Parse the template using the actual flag value and overwrite the current template.
Expand All @@ -162,9 +152,6 @@ func main() {
})
servenv.OnClose(func() {
_ = vtg.Gateway().Close(context.Background())
if legacyHealthCheck != nil {
_ = legacyHealthCheck.Close()
}
})
servenv.RunDefault()
}
21 changes: 9 additions & 12 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
Expand All @@ -35,10 +34,6 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var (
currentTabletUID sync2.AtomicInt32
)

// This file contains the definitions for a FakeHealthCheck class to
// simulate a HealthCheck module. Note it is not in a sub-package because
// otherwise it couldn't be used in this package's tests because of
Expand All @@ -55,9 +50,9 @@ func NewFakeHealthCheck(ch chan *TabletHealth) *FakeHealthCheck {
// FakeHealthCheck implements discovery.HealthCheck.
type FakeHealthCheck struct {
// mu protects the items map
mu sync.RWMutex
items map[string]*fhcItem

mu sync.RWMutex
items map[string]*fhcItem
currentTabletUID int
// channel to return on subscribe. Pass nil if no subscribe should not return a channel
ch chan *TabletHealth
}
Expand Down Expand Up @@ -251,25 +246,27 @@ func (fhc *FakeHealthCheck) Reset() {
defer fhc.mu.Unlock()

fhc.items = make(map[string]*fhcItem)
fhc.currentTabletUID = 0
}

// AddFakeTablet inserts a fake entry into FakeHealthCheck.
// The Tablet can be talked to using the provided connection.
// The Listener is called, as if AddTablet had been called.
// For flexibility the connection is created via a connFactory callback
func (fhc *FakeHealthCheck) AddFakeTablet(cell, host string, port int32, keyspace, shard string, tabletType topodatapb.TabletType, serving bool, reparentTS int64, err error, connFactory func(*topodatapb.Tablet) queryservice.QueryService) queryservice.QueryService {
fhc.mu.Lock()
defer fhc.mu.Unlock()

// tabletUID must be unique
currentTabletUID.Add(1)
uid := currentTabletUID.Get()
fhc.currentTabletUID++
uid := fhc.currentTabletUID
t := topo.NewTablet(uint32(uid), cell, host)
t.Keyspace = keyspace
t.Shard = shard
t.Type = tabletType
t.PortMap["vt"] = port
key := TabletToMapKey(t)

fhc.mu.Lock()
defer fhc.mu.Unlock()
item := fhc.items[key]
if item == nil {
item = &fhcItem{
Expand Down
33 changes: 16 additions & 17 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,28 @@ var (
TabletURLTemplateString = flag.String("tablet_url_template", "http://{{.GetTabletHostPort}}", "format string describing debug tablet url formatting. See the Go code for getTabletDebugURL() how to customize this.")
tabletURLTemplate *template.Template

//TODO(deepthi): change these vars back to unexported when discoveryGateway is removed

// AllowedTabletTypes is the list of allowed tablet types. e.g. {PRIMARY, REPLICA}
AllowedTabletTypes []topodata.TabletType
// TabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets
TabletFilters flagutil.StringListValue
// KeyspacesToWatch - if provided this specifies which keyspaces should be
// visible to the healthcheck. By default the healthcheck will watch all keyspaces.
KeyspacesToWatch flagutil.StringListValue
// RefreshInterval is the interval at which healthcheck refreshes its list of tablets from topo
RefreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval")
// RefreshKnownTablets tells us whether to process all tablets or only new tablets
RefreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
// TopoReadConcurrency tells us how many topo reads are allowed in parallel
TopoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")

// tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets
tabletFilters flagutil.StringListValue
// refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo
refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval")
// refreshKnownTablets tells us whether to process all tablets or only new tablets
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
// topoReadConcurrency tells us how many topo reads are allowed in parallel
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
const (
DefaultHealthCheckRetryDelay = 5 * time.Second
DefaultHealthCheckTimeout = 1 * time.Minute
defaultHealthCheckRetryDelay = 5 * time.Second
defaultHealthCheckTimeout = 1 * time.Minute

// DefaultTopoReadConcurrency is used as the default value for the TopoReadConcurrency parameter of a TopologyWatcher.
// DefaultTopoReadConcurrency is used as the default value for the topoReadConcurrency parameter of a TopologyWatcher.
DefaultTopoReadConcurrency int = 5
// DefaultTopologyWatcherRefreshInterval is used as the default value for
// the refresh interval of a topology watcher.
Expand Down Expand Up @@ -141,7 +140,7 @@ func ParseTabletURLTemplateFromFlag() {
func init() {
// Flags are not parsed at this point and the default value of the flag (just the hostname) will be used.
ParseTabletURLTemplateFromFlag()
flag.Var(&TabletFilters, "tablet_filters", "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch")
flag.Var(&tabletFilters, "tablet_filters", "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch")
topoproto.TabletTypeListVar(&AllowedTabletTypes, "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to")
flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema")
}
Expand Down Expand Up @@ -283,20 +282,20 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
if len(TabletFilters) > 0 {
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}

fbs, err := NewFilterByShard(TabletFilters)
fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *RefreshInterval, *RefreshKnownTablets, *TopoReadConcurrency))
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/legacy_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ type legacyTabletHealth struct {

// NewLegacyDefaultHealthCheck creates a new LegacyHealthCheck object with a default configuration.
func NewLegacyDefaultHealthCheck() LegacyHealthCheck {
return NewLegacyHealthCheck(DefaultHealthCheckRetryDelay, DefaultHealthCheckTimeout)
return NewLegacyHealthCheck(defaultHealthCheckRetryDelay, defaultHealthCheckTimeout)
}

// NewLegacyHealthCheck creates a new LegacyHealthCheck object.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/legacy_topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewLegacyCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, t
// NewLegacyShardReplicationWatcher returns a LegacyTopologyWatcher that
// monitors the tablets in a cell/keyspace/shard, and starts refreshing.
func NewLegacyShardReplicationWatcher(ctx context.Context, topoServer *topo.Server, tr LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) *LegacyTopologyWatcher {
return NewLegacyTopologyWatcher(ctx, topoServer, tr, cell, refreshInterval, true /* RefreshKnownTablets */, topoReadConcurrency, func(tw *LegacyTopologyWatcher) ([]*topodatapb.TabletAlias, error) {
return NewLegacyTopologyWatcher(ctx, topoServer, tr, cell, refreshInterval, true /* refreshKnownTablets */, topoReadConcurrency, func(tw *LegacyTopologyWatcher) ([]*topodatapb.TabletAlias, error) {
sri, err := tw.topoServer.GetShardReplication(ctx, tw.cell, keyspace, shard)
switch {
case err == nil:
Expand Down
8 changes: 4 additions & 4 deletions go/vt/discovery/legacy_topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func checkLegacyWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
}
tw.loadTablets()

// If RefreshKnownTablets is disabled, only the new tablet is read
// If refreshKnownTablets is disabled, only the new tablet is read
// from the topo
if refreshKnownTablets {
counts = checkLegacyOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
Expand All @@ -154,7 +154,7 @@ func checkLegacyWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2)
}

// Load the tablets again to show that when RefreshKnownTablets is disabled,
// Load the tablets again to show that when refreshKnownTablets is disabled,
// only the list is read from the topo and the checksum doesn't change
tw.loadTablets()
if refreshKnownTablets {
Expand All @@ -167,7 +167,7 @@ func checkLegacyWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
// same tablet, different port, should update (previous
// one should go away, new one be added)
//
// if RefreshKnownTablets is disabled, this case is *not*
// if refreshKnownTablets is disabled, this case is *not*
// detected and the tablet remains in the topo using the
// old key
origTablet := proto.Clone(tablet).(*topodatapb.Tablet)
Expand Down Expand Up @@ -209,7 +209,7 @@ func checkLegacyWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
// trigger a ReplaceTablet in loadTablets because the uid does not
// match.
//
// This case *is* detected even if RefreshKnownTablets is false
// This case *is* detected even if refreshKnownTablets is false
// because the delete tablet / create tablet sequence causes the
// list of tablets to change and therefore the change is detected.
if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
}
tw.loadTablets()

// If RefreshKnownTablets is disabled, only the new tablet is read
// If refreshKnownTablets is disabled, only the new tablet is read
// from the topo
if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
Expand All @@ -185,7 +185,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2)
}

// Load the tablets again to show that when RefreshKnownTablets is disabled,
// Load the tablets again to show that when refreshKnownTablets is disabled,
// only the list is read from the topo and the checksum doesn't change
tw.loadTablets()
if refreshKnownTablets {
Expand All @@ -198,7 +198,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
// same tablet, different port, should update (previous
// one should go away, new one be added)
//
// if RefreshKnownTablets is disabled, this case is *not*
// if refreshKnownTablets is disabled, this case is *not*
// detected and the tablet remains in the topo using the
// old key
origTablet := proto.Clone(tablet).(*topodatapb.Tablet)
Expand Down
56 changes: 0 additions & 56 deletions go/vt/vtgate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,59 +142,3 @@ func initAPI(hc discovery.HealthCheck) {
return nil, fmt.Errorf("cannot find health for: %s", itemPath)
})
}

func legacyInitAPI(hc discovery.LegacyHealthCheck) {
// Healthcheck real time status per (cell, keyspace, tablet type, metric).
handleCollection("health-check", func(r *http.Request) (interface{}, error) {
cacheStatus := hc.CacheStatus()

itemPath := getItemPath(r.URL.Path)
if itemPath == "" {
return cacheStatus, nil
}
parts := strings.SplitN(itemPath, "/", 2)
collectionFilter := parts[0]
if collectionFilter == "" {
return cacheStatus, nil
}
if len(parts) != 2 {
return nil, fmt.Errorf("invalid health-check path: %q expected path: / or /cell/<cell> or /keyspace/<keyspace> or /tablet/<tablet|mysql_hostname>", itemPath)
}
value := parts[1]

switch collectionFilter {
case "cell":
{
filteredStatus := make(discovery.LegacyTabletsCacheStatusList, 0)
for _, tabletCacheStatus := range cacheStatus {
if tabletCacheStatus.Cell == value {
filteredStatus = append(filteredStatus, tabletCacheStatus)
}
}
return filteredStatus, nil
}
case "keyspace":
{
filteredStatus := make(discovery.LegacyTabletsCacheStatusList, 0)
for _, tabletCacheStatus := range cacheStatus {
if tabletCacheStatus.Target.Keyspace == value {
filteredStatus = append(filteredStatus, tabletCacheStatus)
}
}
return filteredStatus, nil
}
case "tablet":
{
// Return a _specific tablet_
for _, tabletCacheStatus := range cacheStatus {
for _, tabletStats := range tabletCacheStatus.TabletsStats {
if tabletStats.Name == value || tabletStats.Tablet.MysqlHostname == value {
return tabletStats, nil
}
}
}
}
}
return nil, fmt.Errorf("cannot find health for: %s", itemPath)
})
}
Loading