Skip to content

Commit

Permalink
Merge pull request #9500 from planetscale/delete-discovery-gateway
Browse files Browse the repository at this point in the history
Delete discovery gateway
  • Loading branch information
frouioui authored Mar 10, 2022
2 parents f6f0ae0 + 87fe76d commit 4162b2f
Show file tree
Hide file tree
Showing 36 changed files with 451 additions and 1,925 deletions.
3 changes: 2 additions & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ func main() {
vtgate.QueryLogHandler = "/debug/vtgate/querylog"
vtgate.QueryLogzHandler = "/debug/vtgate/querylogz"
vtgate.QueryzHandler = "/debug/vtgate/queryz"
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait)
// pass nil for healthcheck, it will get created
vtg := vtgate.Init(context.Background(), nil, resilientServer, tpb.Cells[0], tabletTypesToWait)

// vtctld configuration and init
err = vtctld.InitVtctld(ts)
Expand Down
12 changes: 3 additions & 9 deletions go/cmd/vtgate/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ func addStatusParts(vtg *vtgate.VTGate) {
servenv.AddStatusPart("Gateway Status", vtgate.StatusTemplate, func() interface{} {
return vtg.GetGatewayCacheStatus()
})
if vtgate.UsingLegacyGateway() {
servenv.AddStatusPart("Health Check Cache", discovery.LegacyHealthCheckTemplate, func() interface{} {
return legacyHealthCheck.CacheStatus()
})
} else {
servenv.AddStatusPart("Health Check Cache", discovery.HealthCheckTemplate, func() interface{} {
return vtg.Gateway().TabletsCacheStatus()
})
}
servenv.AddStatusPart("Health Check Cache", discovery.HealthCheckTemplate, func() interface{} {
return vtg.Gateway().TabletsCacheStatus()
})
}
17 changes: 2 additions & 15 deletions go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var (
)

var resilientServer *srvtopo.ResilientServer
var legacyHealthCheck discovery.LegacyHealthCheck

func init() {
rand.Seed(time.Now().UnixNano())
Expand Down Expand Up @@ -143,17 +142,8 @@ func main() {
log.Exitf("cells_to_watch validation failed: %v", err)
}

var vtg *vtgate.VTGate
if *vtgate.GatewayImplementation == vtgate.GatewayImplementationDiscovery {
// default value
legacyHealthCheck = discovery.NewLegacyHealthCheck(*vtgate.HealthCheckRetryDelay, *vtgate.HealthCheckTimeout)
legacyHealthCheck.RegisterStats()

vtg = vtgate.LegacyInit(context.Background(), legacyHealthCheck, resilientServer, *cell, *vtgate.RetryCount, tabletTypes)
} else {
// use new Init otherwise
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes)
}
// pass nil for HealthCheck and it will be created
vtg := vtgate.Init(context.Background(), nil, resilientServer, *cell, tabletTypes)

servenv.OnRun(func() {
// Flags are parsed now. Parse the template using the actual flag value and overwrite the current template.
Expand All @@ -162,9 +152,6 @@ func main() {
})
servenv.OnClose(func() {
_ = vtg.Gateway().Close(context.Background())
if legacyHealthCheck != nil {
_ = legacyHealthCheck.Close()
}
})
servenv.RunDefault()
}
21 changes: 9 additions & 12 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
Expand All @@ -35,10 +34,6 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var (
currentTabletUID sync2.AtomicInt32
)

// This file contains the definitions for a FakeHealthCheck class to
// simulate a HealthCheck module. Note it is not in a sub-package because
// otherwise it couldn't be used in this package's tests because of
Expand All @@ -55,9 +50,9 @@ func NewFakeHealthCheck(ch chan *TabletHealth) *FakeHealthCheck {
// FakeHealthCheck implements discovery.HealthCheck.
type FakeHealthCheck struct {
// mu protects the items map
mu sync.RWMutex
items map[string]*fhcItem

mu sync.RWMutex
items map[string]*fhcItem
currentTabletUID int
// channel to return on subscribe. Pass nil if no subscribe should not return a channel
ch chan *TabletHealth
}
Expand Down Expand Up @@ -251,25 +246,27 @@ func (fhc *FakeHealthCheck) Reset() {
defer fhc.mu.Unlock()

fhc.items = make(map[string]*fhcItem)
fhc.currentTabletUID = 0
}

// AddFakeTablet inserts a fake entry into FakeHealthCheck.
// The Tablet can be talked to using the provided connection.
// The Listener is called, as if AddTablet had been called.
// For flexibility the connection is created via a connFactory callback
func (fhc *FakeHealthCheck) AddFakeTablet(cell, host string, port int32, keyspace, shard string, tabletType topodatapb.TabletType, serving bool, reparentTS int64, err error, connFactory func(*topodatapb.Tablet) queryservice.QueryService) queryservice.QueryService {
fhc.mu.Lock()
defer fhc.mu.Unlock()

// tabletUID must be unique
currentTabletUID.Add(1)
uid := currentTabletUID.Get()
fhc.currentTabletUID++
uid := fhc.currentTabletUID
t := topo.NewTablet(uint32(uid), cell, host)
t.Keyspace = keyspace
t.Shard = shard
t.Type = tabletType
t.PortMap["vt"] = port
key := TabletToMapKey(t)

fhc.mu.Lock()
defer fhc.mu.Unlock()
item := fhc.items[key]
if item == nil {
item = &fhcItem{
Expand Down
33 changes: 16 additions & 17 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,28 @@ var (
TabletURLTemplateString = flag.String("tablet_url_template", "http://{{.GetTabletHostPort}}", "format string describing debug tablet url formatting. See the Go code for getTabletDebugURL() how to customize this.")
tabletURLTemplate *template.Template

//TODO(deepthi): change these vars back to unexported when discoveryGateway is removed

// AllowedTabletTypes is the list of allowed tablet types. e.g. {PRIMARY, REPLICA}
AllowedTabletTypes []topodata.TabletType
// TabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets
TabletFilters flagutil.StringListValue
// KeyspacesToWatch - if provided this specifies which keyspaces should be
// visible to the healthcheck. By default the healthcheck will watch all keyspaces.
KeyspacesToWatch flagutil.StringListValue
// RefreshInterval is the interval at which healthcheck refreshes its list of tablets from topo
RefreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval")
// RefreshKnownTablets tells us whether to process all tablets or only new tablets
RefreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
// TopoReadConcurrency tells us how many topo reads are allowed in parallel
TopoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")

// tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets
tabletFilters flagutil.StringListValue
// refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo
refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval")
// refreshKnownTablets tells us whether to process all tablets or only new tablets
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
// topoReadConcurrency tells us how many topo reads are allowed in parallel
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
const (
DefaultHealthCheckRetryDelay = 5 * time.Second
DefaultHealthCheckTimeout = 1 * time.Minute
defaultHealthCheckRetryDelay = 5 * time.Second
defaultHealthCheckTimeout = 1 * time.Minute

// DefaultTopoReadConcurrency is used as the default value for the TopoReadConcurrency parameter of a TopologyWatcher.
// DefaultTopoReadConcurrency is used as the default value for the topoReadConcurrency parameter of a TopologyWatcher.
DefaultTopoReadConcurrency int = 5
// DefaultTopologyWatcherRefreshInterval is used as the default value for
// the refresh interval of a topology watcher.
Expand Down Expand Up @@ -141,7 +140,7 @@ func ParseTabletURLTemplateFromFlag() {
func init() {
// Flags are not parsed at this point and the default value of the flag (just the hostname) will be used.
ParseTabletURLTemplateFromFlag()
flag.Var(&TabletFilters, "tablet_filters", "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch")
flag.Var(&tabletFilters, "tablet_filters", "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch")
topoproto.TabletTypeListVar(&AllowedTabletTypes, "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to")
flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema")
}
Expand Down Expand Up @@ -283,20 +282,20 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
if len(TabletFilters) > 0 {
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}

fbs, err := NewFilterByShard(TabletFilters)
fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *RefreshInterval, *RefreshKnownTablets, *TopoReadConcurrency))
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/legacy_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ type legacyTabletHealth struct {

// NewLegacyDefaultHealthCheck creates a new LegacyHealthCheck object with a default configuration.
func NewLegacyDefaultHealthCheck() LegacyHealthCheck {
return NewLegacyHealthCheck(DefaultHealthCheckRetryDelay, DefaultHealthCheckTimeout)
return NewLegacyHealthCheck(defaultHealthCheckRetryDelay, defaultHealthCheckTimeout)
}

// NewLegacyHealthCheck creates a new LegacyHealthCheck object.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/legacy_topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewLegacyCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, t
// NewLegacyShardReplicationWatcher returns a LegacyTopologyWatcher that
// monitors the tablets in a cell/keyspace/shard, and starts refreshing.
func NewLegacyShardReplicationWatcher(ctx context.Context, topoServer *topo.Server, tr LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) *LegacyTopologyWatcher {
return NewLegacyTopologyWatcher(ctx, topoServer, tr, cell, refreshInterval, true /* RefreshKnownTablets */, topoReadConcurrency, func(tw *LegacyTopologyWatcher) ([]*topodatapb.TabletAlias, error) {
return NewLegacyTopologyWatcher(ctx, topoServer, tr, cell, refreshInterval, true /* refreshKnownTablets */, topoReadConcurrency, func(tw *LegacyTopologyWatcher) ([]*topodatapb.TabletAlias, error) {
sri, err := tw.topoServer.GetShardReplication(ctx, tw.cell, keyspace, shard)
switch {
case err == nil:
Expand Down
8 changes: 4 additions & 4 deletions go/vt/discovery/legacy_topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func checkLegacyWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
}
tw.loadTablets()

// If RefreshKnownTablets is disabled, only the new tablet is read
// If refreshKnownTablets is disabled, only the new tablet is read
// from the topo
if refreshKnownTablets {
counts = checkLegacyOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
Expand All @@ -154,7 +154,7 @@ func checkLegacyWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2)
}

// Load the tablets again to show that when RefreshKnownTablets is disabled,
// Load the tablets again to show that when refreshKnownTablets is disabled,
// only the list is read from the topo and the checksum doesn't change
tw.loadTablets()
if refreshKnownTablets {
Expand All @@ -167,7 +167,7 @@ func checkLegacyWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
// same tablet, different port, should update (previous
// one should go away, new one be added)
//
// if RefreshKnownTablets is disabled, this case is *not*
// if refreshKnownTablets is disabled, this case is *not*
// detected and the tablet remains in the topo using the
// old key
origTablet := proto.Clone(tablet).(*topodatapb.Tablet)
Expand Down Expand Up @@ -209,7 +209,7 @@ func checkLegacyWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) {
// trigger a ReplaceTablet in loadTablets because the uid does not
// match.
//
// This case *is* detected even if RefreshKnownTablets is false
// This case *is* detected even if refreshKnownTablets is false
// because the delete tablet / create tablet sequence causes the
// list of tablets to change and therefore the change is detected.
if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
}
tw.loadTablets()

// If RefreshKnownTablets is disabled, only the new tablet is read
// If refreshKnownTablets is disabled, only the new tablet is read
// from the topo
if refreshKnownTablets {
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1})
Expand All @@ -185,7 +185,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2)
}

// Load the tablets again to show that when RefreshKnownTablets is disabled,
// Load the tablets again to show that when refreshKnownTablets is disabled,
// only the list is read from the topo and the checksum doesn't change
tw.loadTablets()
if refreshKnownTablets {
Expand All @@ -198,7 +198,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
// same tablet, different port, should update (previous
// one should go away, new one be added)
//
// if RefreshKnownTablets is disabled, this case is *not*
// if refreshKnownTablets is disabled, this case is *not*
// detected and the tablet remains in the topo using the
// old key
origTablet := proto.Clone(tablet).(*topodatapb.Tablet)
Expand Down
56 changes: 0 additions & 56 deletions go/vt/vtgate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,59 +142,3 @@ func initAPI(hc discovery.HealthCheck) {
return nil, fmt.Errorf("cannot find health for: %s", itemPath)
})
}

func legacyInitAPI(hc discovery.LegacyHealthCheck) {
// Healthcheck real time status per (cell, keyspace, tablet type, metric).
handleCollection("health-check", func(r *http.Request) (interface{}, error) {
cacheStatus := hc.CacheStatus()

itemPath := getItemPath(r.URL.Path)
if itemPath == "" {
return cacheStatus, nil
}
parts := strings.SplitN(itemPath, "/", 2)
collectionFilter := parts[0]
if collectionFilter == "" {
return cacheStatus, nil
}
if len(parts) != 2 {
return nil, fmt.Errorf("invalid health-check path: %q expected path: / or /cell/<cell> or /keyspace/<keyspace> or /tablet/<tablet|mysql_hostname>", itemPath)
}
value := parts[1]

switch collectionFilter {
case "cell":
{
filteredStatus := make(discovery.LegacyTabletsCacheStatusList, 0)
for _, tabletCacheStatus := range cacheStatus {
if tabletCacheStatus.Cell == value {
filteredStatus = append(filteredStatus, tabletCacheStatus)
}
}
return filteredStatus, nil
}
case "keyspace":
{
filteredStatus := make(discovery.LegacyTabletsCacheStatusList, 0)
for _, tabletCacheStatus := range cacheStatus {
if tabletCacheStatus.Target.Keyspace == value {
filteredStatus = append(filteredStatus, tabletCacheStatus)
}
}
return filteredStatus, nil
}
case "tablet":
{
// Return a _specific tablet_
for _, tabletCacheStatus := range cacheStatus {
for _, tabletStats := range tabletCacheStatus.TabletsStats {
if tabletStats.Name == value || tabletStats.Tablet.MysqlHostname == value {
return tabletStats, nil
}
}
}
}
}
return nil, fmt.Errorf("cannot find health for: %s", itemPath)
})
}
Loading

0 comments on commit 4162b2f

Please sign in to comment.