Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete discovery gateway #9500

Merged
merged 12 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ func main() {
vtgate.QueryLogHandler = "/debug/vtgate/querylog"
vtgate.QueryLogzHandler = "/debug/vtgate/querylogz"
vtgate.QueryzHandler = "/debug/vtgate/queryz"
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait)
// pass nil for healthcheck, it will get created
vtg := vtgate.Init(context.Background(), nil, resilientServer, tpb.Cells[0], tabletTypesToWait)

// vtctld configuration and init
err = vtctld.InitVtctld(ts)
Expand Down
12 changes: 3 additions & 9 deletions go/cmd/vtgate/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ func addStatusParts(vtg *vtgate.VTGate) {
servenv.AddStatusPart("Gateway Status", vtgate.StatusTemplate, func() interface{} {
return vtg.GetGatewayCacheStatus()
})
if vtgate.UsingLegacyGateway() {
servenv.AddStatusPart("Health Check Cache", discovery.LegacyHealthCheckTemplate, func() interface{} {
return legacyHealthCheck.CacheStatus()
})
} else {
servenv.AddStatusPart("Health Check Cache", discovery.HealthCheckTemplate, func() interface{} {
return vtg.Gateway().TabletsCacheStatus()
})
}
servenv.AddStatusPart("Health Check Cache", discovery.HealthCheckTemplate, func() interface{} {
return vtg.Gateway().TabletsCacheStatus()
})
}
17 changes: 2 additions & 15 deletions go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var (
)

var resilientServer *srvtopo.ResilientServer
var legacyHealthCheck discovery.LegacyHealthCheck

func init() {
rand.Seed(time.Now().UnixNano())
Expand Down Expand Up @@ -143,17 +142,8 @@ func main() {
log.Exitf("cells_to_watch validation failed: %v", err)
}

var vtg *vtgate.VTGate
if *vtgate.GatewayImplementation == vtgate.GatewayImplementationDiscovery {
// default value
legacyHealthCheck = discovery.NewLegacyHealthCheck(*vtgate.HealthCheckRetryDelay, *vtgate.HealthCheckTimeout)
legacyHealthCheck.RegisterStats()

vtg = vtgate.LegacyInit(context.Background(), legacyHealthCheck, resilientServer, *cell, *vtgate.RetryCount, tabletTypes)
} else {
// use new Init otherwise
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes)
}
// pass nil for HealthCheck and it will be created
vtg := vtgate.Init(context.Background(), nil, resilientServer, *cell, tabletTypes)

servenv.OnRun(func() {
// Flags are parsed now. Parse the template using the actual flag value and overwrite the current template.
Expand All @@ -162,9 +152,6 @@ func main() {
})
servenv.OnClose(func() {
_ = vtg.Gateway().Close(context.Background())
if legacyHealthCheck != nil {
_ = legacyHealthCheck.Close()
}
})
servenv.RunDefault()
}
21 changes: 9 additions & 12 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
Expand All @@ -35,10 +34,6 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var (
currentTabletUID sync2.AtomicInt32
)

// This file contains the definitions for a FakeHealthCheck class to
// simulate a HealthCheck module. Note it is not in a sub-package because
// otherwise it couldn't be used in this package's tests because of
Expand All @@ -55,9 +50,9 @@ func NewFakeHealthCheck(ch chan *TabletHealth) *FakeHealthCheck {
// FakeHealthCheck implements discovery.HealthCheck.
type FakeHealthCheck struct {
// mu protects the items map
mu sync.RWMutex
items map[string]*fhcItem

mu sync.RWMutex
items map[string]*fhcItem
currentTabletUID int
// channel to return on subscribe. Pass nil if no subscribe should not return a channel
ch chan *TabletHealth
}
Expand Down Expand Up @@ -251,25 +246,27 @@ func (fhc *FakeHealthCheck) Reset() {
defer fhc.mu.Unlock()

fhc.items = make(map[string]*fhcItem)
fhc.currentTabletUID = 0
}

// AddFakeTablet inserts a fake entry into FakeHealthCheck.
// The Tablet can be talked to using the provided connection.
// The Listener is called, as if AddTablet had been called.
// For flexibility the connection is created via a connFactory callback
func (fhc *FakeHealthCheck) AddFakeTablet(cell, host string, port int32, keyspace, shard string, tabletType topodatapb.TabletType, serving bool, reparentTS int64, err error, connFactory func(*topodatapb.Tablet) queryservice.QueryService) queryservice.QueryService {
fhc.mu.Lock()
defer fhc.mu.Unlock()

// tabletUID must be unique
currentTabletUID.Add(1)
uid := currentTabletUID.Get()
fhc.currentTabletUID++
uid := fhc.currentTabletUID
t := topo.NewTablet(uint32(uid), cell, host)
t.Keyspace = keyspace
t.Shard = shard
t.Type = tabletType
t.PortMap["vt"] = port
key := TabletToMapKey(t)

fhc.mu.Lock()
defer fhc.mu.Unlock()
item := fhc.items[key]
if item == nil {
item = &fhcItem{
Expand Down
56 changes: 0 additions & 56 deletions go/vt/vtgate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,59 +142,3 @@ func initAPI(hc discovery.HealthCheck) {
return nil, fmt.Errorf("cannot find health for: %s", itemPath)
})
}

func legacyInitAPI(hc discovery.LegacyHealthCheck) {
// Healthcheck real time status per (cell, keyspace, tablet type, metric).
handleCollection("health-check", func(r *http.Request) (interface{}, error) {
cacheStatus := hc.CacheStatus()

itemPath := getItemPath(r.URL.Path)
if itemPath == "" {
return cacheStatus, nil
}
parts := strings.SplitN(itemPath, "/", 2)
collectionFilter := parts[0]
if collectionFilter == "" {
return cacheStatus, nil
}
if len(parts) != 2 {
return nil, fmt.Errorf("invalid health-check path: %q expected path: / or /cell/<cell> or /keyspace/<keyspace> or /tablet/<tablet|mysql_hostname>", itemPath)
}
value := parts[1]

switch collectionFilter {
case "cell":
{
filteredStatus := make(discovery.LegacyTabletsCacheStatusList, 0)
for _, tabletCacheStatus := range cacheStatus {
if tabletCacheStatus.Cell == value {
filteredStatus = append(filteredStatus, tabletCacheStatus)
}
}
return filteredStatus, nil
}
case "keyspace":
{
filteredStatus := make(discovery.LegacyTabletsCacheStatusList, 0)
for _, tabletCacheStatus := range cacheStatus {
if tabletCacheStatus.Target.Keyspace == value {
filteredStatus = append(filteredStatus, tabletCacheStatus)
}
}
return filteredStatus, nil
}
case "tablet":
{
// Return a _specific tablet_
for _, tabletCacheStatus := range cacheStatus {
for _, tabletStats := range tabletCacheStatus.TabletsStats {
if tabletStats.Name == value || tabletStats.Tablet.MysqlHostname == value {
return tabletStats, nil
}
}
}
}
}
return nil, fmt.Errorf("cannot find health for: %s", itemPath)
})
}
34 changes: 17 additions & 17 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

// TestAutocommitUpdateSharded: instant-commit.
func TestAutocommitUpdateSharded(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "update user set a=2 where id = 1")
require.NoError(t, err)
Expand All @@ -52,7 +52,7 @@ func TestAutocommitUpdateSharded(t *testing.T) {

// TestAutocommitUpdateLookup: transaction: select before update.
func TestAutocommitUpdateLookup(t *testing.T) {
executor, sbc1, _, sbclookup := createLegacyExecutorEnv()
executor, sbc1, _, sbclookup := createExecutorEnv()
sbclookup.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("b|a", "int64|varbinary"),
"2|1",
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestAutocommitUpdateLookup(t *testing.T) {

// TestAutocommitUpdateVindexChange: transaction: select & update before final update.
func TestAutocommitUpdateVindexChange(t *testing.T) {
executor, sbc, _, sbclookup := createLegacyExecutorEnv()
executor, sbc, _, sbclookup := createExecutorEnv()
sbc.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|name|lastname|name_lastname_keyspace_id_map", "int64|int32|varchar|int64"),
"1|1|foo|0",
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestAutocommitUpdateVindexChange(t *testing.T) {

// TestAutocommitDeleteSharded: instant-commit.
func TestAutocommitDeleteSharded(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete from user_extra where user_id = 1")
require.NoError(t, err)
Expand All @@ -137,7 +137,7 @@ func TestAutocommitDeleteSharded(t *testing.T) {

// TestAutocommitDeleteLookup: transaction: select before update.
func TestAutocommitDeleteLookup(t *testing.T) {
executor, sbc1, _, sbclookup := createLegacyExecutorEnv()
executor, sbc1, _, sbclookup := createExecutorEnv()
sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|name|lastname", "int64|int32|varchar"),
"1|1|foo",
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestAutocommitDeleteLookup(t *testing.T) {

// TestAutocommitDeleteIn: instant-commit.
func TestAutocommitDeleteIn(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete from user_extra where user_id in (1, 2)")
require.NoError(t, err)
Expand All @@ -196,7 +196,7 @@ func TestAutocommitDeleteIn(t *testing.T) {

// TestAutocommitDeleteMultiShard: instant-commit.
func TestAutocommitDeleteMultiShard(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete from user_extra where user_id = user_id + 1")
require.NoError(t, err)
Expand All @@ -216,7 +216,7 @@ func TestAutocommitDeleteMultiShard(t *testing.T) {

// TestAutocommitDeleteMultiShardAutoCommit: instant-commit.
func TestAutocommitDeleteMultiShardAutoCommit(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "delete /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ from user_extra where user_id = user_id + 1")
require.NoError(t, err)
Expand All @@ -236,7 +236,7 @@ func TestAutocommitDeleteMultiShardAutoCommit(t *testing.T) {

// TestAutocommitInsertSharded: instant-commit.
func TestAutocommitInsertSharded(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "insert into user_extra(user_id, v) values (1, 2)")
require.NoError(t, err)
Expand All @@ -255,7 +255,7 @@ func TestAutocommitInsertSharded(t *testing.T) {

// TestAutocommitInsertLookup: transaction: select before update.
func TestAutocommitInsertLookup(t *testing.T) {
executor, sbc1, _, sbclookup := createLegacyExecutorEnv()
executor, sbc1, _, sbclookup := createExecutorEnv()

_, err := autocommitExec(executor, "insert into user(id, v, name) values (1, 2, 'myname')")
require.NoError(t, err)
Expand All @@ -282,7 +282,7 @@ func TestAutocommitInsertLookup(t *testing.T) {

// TestAutocommitInsertShardAutoCommit: instant-commit.
func TestAutocommitInsertMultishardAutoCommit(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "insert /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ into user_extra(user_id, v) values (1, 2), (3, 4)")
require.NoError(t, err)
Expand All @@ -305,7 +305,7 @@ func TestAutocommitInsertMultishardAutoCommit(t *testing.T) {
}})
testCommitCount(t, "sbc2", sbc2, 0)

executor, sbc1, sbc2, _ = createLegacyExecutorEnv()
executor, sbc1, sbc2, _ = createExecutorEnv()
// Make the first shard fail - the second completes anyway
sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1
_, err = autocommitExec(executor, "insert /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ into user_extra(user_id, v) values (1, 2), (3, 4)")
Expand All @@ -326,7 +326,7 @@ func TestAutocommitInsertMultishardAutoCommit(t *testing.T) {
}

func TestAutocommitInsertMultishard(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
executor, sbc1, sbc2, _ := createExecutorEnv()

_, err := autocommitExec(executor, "insert into user_extra(user_id, v) values (1, 2), (3, 4)")
require.NoError(t, err)
Expand All @@ -352,7 +352,7 @@ func TestAutocommitInsertMultishard(t *testing.T) {

// TestAutocommitInsertAutoinc: instant-commit: sequence fetch is not transactional.
func TestAutocommitInsertAutoinc(t *testing.T) {
executor, _, _, sbclookup := createLegacyExecutorEnv()
executor, _, _, sbclookup := createExecutorEnv()

_, err := autocommitExec(executor, "insert into main1(id, name) values (null, 'myname')")
require.NoError(t, err)
Expand All @@ -371,7 +371,7 @@ func TestAutocommitInsertAutoinc(t *testing.T) {

// TestAutocommitTransactionStarted: no instant-commit.
func TestAutocommitTransactionStarted(t *testing.T) {
executor, sbc1, _, _ := createLegacyExecutorEnv()
executor, sbc1, _, _ := createExecutorEnv()

session := &vtgatepb.Session{
TargetString: "@primary",
Expand All @@ -393,7 +393,7 @@ func TestAutocommitTransactionStarted(t *testing.T) {

// TestAutocommitDirectTarget: instant-commit.
func TestAutocommitDirectTarget(t *testing.T) {
executor, _, _, sbclookup := createLegacyExecutorEnv()
executor, _, _, sbclookup := createExecutorEnv()

session := &vtgatepb.Session{
TargetString: "TestUnsharded/0@primary",
Expand All @@ -414,7 +414,7 @@ func TestAutocommitDirectTarget(t *testing.T) {

// TestAutocommitDirectRangeTarget: no instant-commit.
func TestAutocommitDirectRangeTarget(t *testing.T) {
executor, sbc1, _, _ := createLegacyExecutorEnv()
executor, sbc1, _, _ := createExecutorEnv()

session := &vtgatepb.Session{
TargetString: "TestExecutor[-]@primary",
Expand Down
Loading