diff --git a/.github/workflows/cluster_endtoend_vtgate_tablet_healthcheck_cache.yml b/.github/workflows/cluster_endtoend_vtgate_tablet_healthcheck_cache.yml new file mode 100644 index 00000000000..1d646bb03fc --- /dev/null +++ b/.github/workflows/cluster_endtoend_vtgate_tablet_healthcheck_cache.yml @@ -0,0 +1,53 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + + name: Cluster (vtgate_tablet_healthcheck_cache) + on: [push, pull_request] + concurrency: + group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vtgate_tablet_healthcheck_cache)') + cancel-in-progress: true + + jobs: + build: + name: Run endtoend tests on Cluster (vtgate_tablet_healthcheck_cache) + runs-on: ubuntu-18.04 + + steps: + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.17 + + - name: Tune the OS + run: | + echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range + + # TEMPORARY WHILE GITHUB FIXES THIS https://github.com/actions/virtual-environments/issues/3185 + - name: Add the current IP address, long hostname and short hostname record to /etc/hosts file + run: | + echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts + # DON'T FORGET TO REMOVE CODE ABOVE WHEN ISSUE IS ADRESSED! + + - name: Check out code + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + sudo apt-get update + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get install -y gnupg2 + sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get update + sudo apt-get install percona-xtrabackup-24 + + - name: Run cluster endtoend test + timeout-minutes: 30 + run: | + source build.env + eatmydata -- go run test.go -docker=false -print-log -follow -shard vtgate_tablet_healthcheck_cache diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index b6d4598e804..508bde9a99e 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -198,8 +198,14 @@ func (vttablet *VttabletProcess) GetStatusDetails() string { } // WaitForStatus waits till desired status of tablet is reached -func (vttablet *VttabletProcess) WaitForStatus(status string) bool { - return vttablet.GetTabletStatus() == status +func (vttablet *VttabletProcess) WaitForStatus(status string, howLong time.Duration) bool { + ticker := time.NewTicker(howLong) + for range ticker.C { + if vttablet.GetTabletStatus() == status { + return true + } + } + return false } // GetTabletStatus returns the tablet state as seen in /debug/vars TabletStateName diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index af0e81f1595..4e3ac83187e 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -196,11 +196,16 @@ func TestReplicaTransactions(t *testing.T) { time.Sleep(2 * time.Second) exec(t, readConn, fetchAllCustomers, "is either down or nonexistent") - // bring up tablet again - // query using same transaction will fail - _ = replicaTablet.VttabletProcess.Setup() - exec(t, readConn, fetchAllCustomers, "not found") - exec(t, readConn, "commit", "") + // bring up the tablet again + // trying to use the same session/transaction should fail as the vtgate has + // been restarted and the session lost + replicaTablet.VttabletProcess.ServingStatus = "SERVING" + err = replicaTablet.VttabletProcess.Setup() + require.Nil(t, err) + serving := replicaTablet.VttabletProcess.WaitForStatus("SERVING", time.Duration(60*time.Second)) + assert.Equal(t, serving, true, "Tablet did not become ready within a reasonable time") + exec(t, readConn, fetchAllCustomers, "is either down or nonexistent") + // create a new connection, should be able to query again readConn, err = mysql.Connect(ctx, &vtParams) require.NoError(t, err) diff --git a/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go b/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go new file mode 100644 index 00000000000..4769cd41e6f --- /dev/null +++ b/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go @@ -0,0 +1,230 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tablethealthcheckcache + +import ( + "context" + "flag" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gotest.tools/assert" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "healthcheck_test_ks" + cell = "healthcheck_test_cell" + shards = []string{"-80", "80-"} + schemaSQL = ` +create table customer( + customer_id bigint not null auto_increment, + email varbinary(128), + primary key(customer_id) +) ENGINE=InnoDB; +create table corder( + order_id bigint not null auto_increment, + customer_id bigint, + sku varbinary(128), + price bigint, + primary key(order_id) +) ENGINE=InnoDB; +` + + vSchema = ` +{ + "sharded": true, + "vindexes": { + "hash": { + "type": "hash" + } + }, + "tables": { + "customer": { + "column_vindexes": [ + { + "column": "customer_id", + "name": "hash" + } + ] + }, + "corder": { + "column_vindexes": [ + { + "column": "customer_id", + "name": "hash" + } + ] + } + } +} +` +) + +// TestMain sets up the vitess cluster for any subsequent tests +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + VSchema: vSchema, + } + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, []string{"-health_check_interval", "1s"}...) + err = clusterInstance.StartKeyspace(*keyspace, shards, 1, false) + if err != nil { + return 1 + } + + clusterInstance.VtGateExtraArgs = []string{} + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +// TestHealthCheckCacheWithTabletChurn verifies that the tablet healthcheck cache has the correct number of records +// after many rounds of adding and removing tablets in quick succession. This verifies that we don't have any race +// conditions with these operations and their interactions with the cache. +func TestHealthCheckCacheWithTabletChurn(t *testing.T) { + ctx := context.Background() + tries := 10 + numShards := len(shards) + // 1 for primary,replica + expectedTabletHCcacheEntries := numShards * 2 + churnTabletUID := 9999 + churnTabletType := "rdonly" + + // verify output of SHOW VITESS_TABLETS + vtgateConn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer vtgateConn.Close() + query := "show vitess_tablets" + + // starting with two shards, each with 1 primary and 1 replica tablet) + // we'll be adding and removing a tablet of type churnTabletType with churnTabletUID + qr, _ := vtgateConn.ExecuteFetch(query, 100, true) + assert.Equal(t, expectedTabletHCcacheEntries, len(qr.Rows), "wrong number of tablet records in healthcheck cache, expected %d but had %d. Results: %v", expectedTabletHCcacheEntries, len(qr.Rows), qr.Rows) + + for i := 0; i < tries; i++ { + tablet := addTablet(t, churnTabletUID, churnTabletType) + expectedTabletHCcacheEntries++ + + qr, _ := vtgateConn.ExecuteFetch(query, 100, true) + assert.Equal(t, expectedTabletHCcacheEntries, len(qr.Rows), "wrong number of tablet records in healthcheck cache, expected %d but had %d. Results: %v", expectedTabletHCcacheEntries, len(qr.Rows), qr.Rows) + + killTablet(t, tablet) + expectedTabletHCcacheEntries-- + + qr, _ = vtgateConn.ExecuteFetch(query, 100, true) + assert.Equal(t, expectedTabletHCcacheEntries, len(qr.Rows), "wrong number of tablet records in healthcheck cache, expected %d but had %d. Results: %v", expectedTabletHCcacheEntries, len(qr.Rows), qr.Rows) + } + + // one final time, w/o the churning tablet + qr, _ = vtgateConn.ExecuteFetch(query, 100, true) + assert.Equal(t, expectedTabletHCcacheEntries, len(qr.Rows), "wrong number of tablet records in healthcheck cache, expected %d but had %d", expectedTabletHCcacheEntries, len(qr.Rows)) +} + +func addTablet(t *testing.T, tabletUID int, tabletType string) *cluster.Vttablet { + tablet := &cluster.Vttablet{ + TabletUID: tabletUID, + Type: tabletType, + HTTPPort: clusterInstance.GetAndReservePort(), + GrpcPort: clusterInstance.GetAndReservePort(), + MySQLPort: clusterInstance.GetAndReservePort(), + Alias: fmt.Sprintf("%s-%010d", cell, tabletUID), + } + // Start Mysqlctl process + tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory, !clusterInstance.ReusingVTDATAROOT) + proc, err := tablet.MysqlctlProcess.StartProcess() + require.Nil(t, err) + + // Start vttablet process + tablet.VttabletProcess = cluster.VttabletProcessInstance(tablet.HTTPPort, + tablet.GrpcPort, + tabletUID, + cell, + shards[0], + keyspaceName, + clusterInstance.VtctldProcess.Port, + tablet.Type, + clusterInstance.TopoProcess.Port, + clusterInstance.Hostname, + clusterInstance.TmpDirectory, + clusterInstance.VtTabletExtraArgs, + clusterInstance.EnableSemiSync) + + // wait for mysqld to be ready + err = proc.Wait() + require.Nil(t, err) + + err = tablet.VttabletProcess.Setup() + require.Nil(t, err) + + serving := tablet.VttabletProcess.WaitForStatus("SERVING", time.Duration(60*time.Second)) + assert.Equal(t, serving, true, "Tablet did not become ready within a reasonable time") + err = clusterInstance.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.%s", + tablet.VttabletProcess.Keyspace, tablet.VttabletProcess.Shard, tablet.Type), 1) + require.Nil(t, err) + + t.Logf("Added tablet: %s", tablet.Alias) + return tablet +} + +func killTablet(t *testing.T, tablet *cluster.Vttablet) { + t.Logf("Killing tablet: %s", tablet.Alias) + var wg sync.WaitGroup + wg.Add(1) + go func(tablet *cluster.Vttablet) { + defer wg.Done() + _ = tablet.VttabletProcess.TearDown() + _ = tablet.MysqlctlProcess.Stop() + tablet.MysqlctlProcess.CleanupFiles(tablet.TabletUID) + }(tablet) + wg.Wait() + + err := clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", keyspaceName) + require.Nil(t, err) +} diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index f519d98e9c0..d0469f44a54 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -169,7 +169,7 @@ type TabletRecorder interface { type keyspaceShardTabletType string type tabletAliasString string -//HealthCheck declares what the TabletGateway needs from the HealthCheck +// HealthCheck declares what the TabletGateway needs from the HealthCheck type HealthCheck interface { // CacheStatus returns a displayable version of the health check cache. CacheStatus() TabletsCacheStatusList @@ -250,18 +250,27 @@ type HealthCheckImpl struct { // NewHealthCheck creates a new HealthCheck object. // Parameters: // retryDelay. -// The duration to wait before retrying to connect (e.g. after a failed connection -// attempt). +// +// The duration to wait before retrying to connect (e.g. after a failed connection +// attempt). +// // healthCheckTimeout. -// The duration for which we consider a health check response to be 'fresh'. If we don't get -// a health check response from a tablet for more than this duration, we consider the tablet -// not healthy. +// +// The duration for which we consider a health check response to be 'fresh'. If we don't get +// a health check response from a tablet for more than this duration, we consider the tablet +// not healthy. +// // topoServer. -// The topology server that this healthcheck object can use to retrieve cell or tablet information +// +// The topology server that this healthcheck object can use to retrieve cell or tablet information +// // localCell. -// The localCell for this healthcheck +// +// The localCell for this healthcheck +// // callback. -// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering. +// +// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering. func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl { log.Infof("loading tablets for cells: %v", cellsToWatch) @@ -385,8 +394,29 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { hc.mu.Lock() defer hc.mu.Unlock() - key := hc.keyFromTablet(tablet) tabletAlias := tabletAliasString(topoproto.TabletAliasString(tablet.Alias)) + defer func() { + // We want to be sure the tablet is gone from the secondary + // maps even if it's already gone from the authoritative map. + // The tablet's type also may have recently changed as well, + // so ensure that the tablet we're removing is removed from + // any possible secondary map keys: + // key: keyspace.shard.tabletType -> val: map[tabletAlias]tabletHealth + for _, tabletType := range topoproto.AllTabletTypes { + key := keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tabletType))) + // delete from map by keyspace.shard.tabletType + ths, ok := hc.healthData[key] + if !ok { + continue + } + delete(ths, tabletAlias) + // delete from healthy list + healthy, ok := hc.healthy[key] + if ok && len(healthy) > 0 { + hc.recomputeHealthy(key) + } + } + }() // delete from authoritative map th, ok := hc.healthByAlias[tabletAlias] if !ok { @@ -397,18 +427,6 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { // which will call finalizeConn, which will close the connection. th.cancelFunc() delete(hc.healthByAlias, tabletAlias) - // delete from map by keyspace.shard.tabletType - ths, ok := hc.healthData[key] - if !ok { - log.Warningf("We have no health data for target: %v", key) - return - } - delete(ths, tabletAlias) - // delete from healthy list - healthy, ok := hc.healthy[key] - if ok && len(healthy) > 0 { - hc.recomputeHealthy(key) - } } func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Target, trivialUpdate bool, up bool) { @@ -546,6 +564,7 @@ func (hc *HealthCheckImpl) cacheStatusMap() map[string]*TabletsCacheStatus { for _, ths := range hc.healthData { for _, th := range ths { key := fmt.Sprintf("%v.%v.%v.%v", th.Tablet.Alias.Cell, th.Target.Keyspace, th.Target.Shard, th.Target.TabletType.String()) + var tcs *TabletsCacheStatus var ok bool if tcs, ok = tcsMap[key]; !ok { diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index d2f6d25eaf9..bbe3e4b8d8c 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -681,7 +681,7 @@ func TestRemoveTablet(t *testing.T) { // there will be a first result, get and discard it <-resultChan - shr := &querypb.StreamHealthResponse{ + shrReplica := &querypb.StreamHealthResponse{ TabletAlias: tablet.Alias, Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, Serving: true, @@ -695,7 +695,7 @@ func TestRemoveTablet(t *testing.T) { Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, PrimaryTermStartTime: 0, }} - input <- shr + input <- shrReplica <-resultChan // check it's there a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}) @@ -705,6 +705,59 @@ func TestRemoveTablet(t *testing.T) { hc.RemoveTablet(tablet) a = hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}) assert.Empty(t, a, "wrong result, expected empty list") + + // Now confirm that when a tablet's type changes between when it's added to the cache + // and when it's removed, that the tablet is entirely removed from the cache since + // in the secondary maps it's keyed in part by tablet type. + // Note: we are using GetTabletStats here to check the healthData map (rather than + // the healthy map that we checked above) because that is the data structure that + // is used when printing the contents of the healthcheck cache in the /debug/status + // endpoint and in the SHOW VITESS_TABLETS; SQL command output. + + // Add the tablet back. + hc.AddTablet(tablet) + // Receive and discard the initial result. + <-resultChan + input <- shrReplica + // Confirm it's there in the cache. + a = hc.getTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}) + mustMatch(t, want, a, "unexpected result") + // Change the tablet type to RDONLY. + tablet.Type = topodatapb.TabletType_RDONLY + shrRdonly := &querypb.StreamHealthResponse{ + TabletAlias: tablet.Alias, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_RDONLY}, + Serving: true, + TabletExternallyReparentedTimestamp: 0, + RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 2, CpuUsage: 0.4}, + } + // Now replace it, which does a Remove and Add. The tablet should + // be removed from the cache and all its maps even though the + // tablet type had changed in-between the initial Add and Remove. + hc.ReplaceTablet(tablet, tablet) + // Confirm that the old entry is gone. + a = hc.getTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}) + assert.Empty(t, a, "wrong result, expected empty list") + // Receive and discard the initial result. + <-resultChan + input <- shrRdonly + // Confirm that the new entry is there in the cache. + want = []*TabletHealth{{ + Tablet: tablet, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_RDONLY}, + Serving: true, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 2, CpuUsage: 0.4}, + PrimaryTermStartTime: 0, + }} + a = hc.getTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_RDONLY}) + mustMatch(t, want, a, "unexpected result") + // Delete the tablet, confirm again that it's gone in both + // tablet type forms. + hc.RemoveTablet(tablet) + a = hc.getTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}) + assert.Empty(t, a, "wrong result, expected empty list") + a = hc.getTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_RDONLY}) + assert.Empty(t, a, "wrong result, expected empty list") } // TestGetHealthyTablets tests the functionality of GetHealthyTabletStats. diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index a090025302f..f2d36f9886c 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -289,7 +289,12 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { if err != nil { hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 1) - if strings.Contains(err.Error(), "health stats mismatch") { + // We have reason to suspect the tablet healthcheck record is corrupted or invalid so let's remove the tablet's record + // from the healthcheck cache and it will get re-added again if the tablet is reachable + if strings.Contains(err.Error(), "health stats mismatch") || + strings.HasSuffix(err.Error(), context.Canceled.Error()) || + strings.Contains(err.Error(), `"error reading from server: EOF", received prior goaway`) { + log.Warningf("tablet %s had a suspect healthcheck error: %s -- clearing cache record", thc.Tablet.Alias, err.Error()) hc.deleteTablet(thc.Tablet) return } diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index d806c5422f0..b487be15ce0 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -464,7 +464,7 @@ func TestFilterByKeyspace(t *testing.T) { // - does not add or remove these filtered out tablets from the its healtcheck func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { ts := memorytopo.NewServer("aa") - fhc := NewFakeHealthCheck(nil) + fhc := NewFakeHealthCheck() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := NewFilterByKeyspace(testKeyspacesToWatch) diff --git a/test/config.json b/test/config.json index 78a3b8195ff..27f21754d11 100644 --- a/test/config.json +++ b/test/config.json @@ -777,6 +777,15 @@ "RetryMax": 1, "Tags": [] }, + "vtgate_tablet_healthcheck_cache": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/tablet_healthcheck_cache", "-timeout", "45m"], + "Command": [], + "Manual": false, + "Shard": "vtgate_tablet_healthcheck_cache", + "RetryMax": 2, + "Tags": [] + }, "vtgate_transaction": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction"],