From 17737ab16d98662ae5abf8b253c1ccc8c280aa40 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 15 Nov 2021 16:03:33 -0500 Subject: [PATCH 1/5] Prevent race condition betweeen tablet delete and update There was a race condition between deleting a tablet's healthcheck record from the authoritative map (hc.healthByAlias) and updating the same tablet's health data (hc.healthData) record. This could cause us to effectively re-add a an updated copy of the tablet healthcheck record after it's been deleted. This then leads to "zombie" tablet records in the SHOW VITESS_TABLETS output as it is based on what is in the hc.healthData map: https://github.com/vitessio/vitess/blob/693c5dbdeacdd7a705b46ebce6776a5256c8cfef/go/vt/discovery/healthcheck.go#L537-L557 And purge all potential healthcheck records for a tablet alias by type on delete. Signed-off-by: Matt Lord --- go/test/endtoend/tabletgateway/vtgate_test.go | 2 +- go/vt/discovery/healthcheck.go | 37 +++++++++++++++---- go/vt/discovery/tablet_health_check.go | 8 ++-- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index 4e3ac83187e..18e66b81f1c 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -204,7 +204,7 @@ func TestReplicaTransactions(t *testing.T) { require.Nil(t, err) serving := replicaTablet.VttabletProcess.WaitForStatus("SERVING", time.Duration(60*time.Second)) assert.Equal(t, serving, true, "Tablet did not become ready within a reasonable time") - exec(t, readConn, fetchAllCustomers, "is either down or nonexistent") + exec(t, readConn, fetchAllCustomers, "not found") // create a new connection, should be able to query again readConn, err = mysql.Connect(ctx, &vtParams) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 0044477c5be..6649b96626d 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -381,7 +381,6 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { hc.mu.Lock() defer hc.mu.Unlock() - key := hc.keyFromTablet(tablet) tabletAlias := tabletAliasString(topoproto.TabletAliasString(tablet.Alias)) // delete from authoritative map th, ok := hc.healthByAlias[tabletAlias] @@ -393,14 +392,18 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { // which will call finalizeConn, which will close the connection. th.cancelFunc() delete(hc.healthByAlias, tabletAlias) - // delete from map by keyspace.shard.tabletType - ths, ok := hc.healthData[key] - if !ok { - log.Warningf("We have no health data for target: %v", key) - return + + // the tablet has been deleted from the authoritative healthByAlias map so let's ensure it's deleted + // from the healthData map as well, which means we need to delete any existing combinations of + // keyspace.shard.tabletType we may have had for the tablet + for _, key := range hc.keysFromTablet(tablet) { + if ths, ok := hc.healthData[key]; ok { + delete(ths, tabletAlias) + } } - delete(ths, tabletAlias) - // delete from healthy list + + // We only need to recompute the healthy record for the current state of the tablet + key := hc.keyFromTablet(tablet) healthy, ok := hc.healthy[key] if ok && len(healthy) > 0 { hc.recomputeHealthy(key) @@ -413,6 +416,15 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ defer hc.mu.Unlock() tabletAlias := tabletAliasString(topoproto.TabletAliasString(th.Tablet.Alias)) + // let's be sure that this tablet hasn't been deleted from the authortative map + // so that we're not racing to update it and in effect re-adding a copy of the + // tablet record that was deleted + _, ok := hc.healthByAlias[tabletAlias] + if !ok { + log.Infof("Tablet %s has been deleted, skipping health update", tabletAlias) + return + } + targetKey := hc.keyFromTarget(th.Target) targetChanged := prevTarget.TabletType != th.Target.TabletType || prevTarget.Keyspace != th.Target.Keyspace || prevTarget.Shard != th.Target.Shard if targetChanged { @@ -723,6 +735,15 @@ func (hc *HealthCheckImpl) keyFromTablet(tablet *topodata.Tablet) keyspaceShardT return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tablet.Type))) } +// keysFromTablet returns a slice of potential type based keys for a tablet +func (hc *HealthCheckImpl) keysFromTablet(tablet *topodata.Tablet) []keyspaceShardTabletType { + tabletTypeKeys := make([]keyspaceShardTabletType, len(topoproto.AllTabletTypes)) + for i, tabletType := range topoproto.AllTabletTypes { + tabletTypeKeys[i] = keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tabletType))) + } + return tabletTypeKeys +} + // getAliasByCell should only be called while holding hc.mu func (hc *HealthCheckImpl) getAliasByCell(cell string) string { if alias, ok := hc.cellAliases[cell]; ok { diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index f2d36f9886c..f6f89715c9b 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -290,11 +290,9 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { if err != nil { hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 1) // We have reason to suspect the tablet healthcheck record is corrupted or invalid so let's remove the tablet's record - // from the healthcheck cache and it will get re-added again if the tablet is reachable - if strings.Contains(err.Error(), "health stats mismatch") || - strings.HasSuffix(err.Error(), context.Canceled.Error()) || - strings.Contains(err.Error(), `"error reading from server: EOF", received prior goaway`) { - log.Warningf("tablet %s had a suspect healthcheck error: %s -- clearing cache record", thc.Tablet.Alias, err.Error()) + // from the healthcheck cache and the corrected tablet record will be fetched from the topology and added to the + // healthcheck cache again via the topology watcher. + if strings.Contains(err.Error(), "health stats mismatch") { hc.deleteTablet(thc.Tablet) return } From 175f400985de69ae996a9d74149203ca5cf4792b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 15 Nov 2021 19:25:26 -0500 Subject: [PATCH 2/5] Improve logging and flip if/else for readability Signed-off-by: Matt Lord --- go/vt/discovery/healthcheck.go | 4 ++-- go/vt/discovery/tablet_health_check.go | 11 +++++++---- go/vt/discovery/topology_watcher.go | 13 +++++++------ 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 6649b96626d..ae8200496ae 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -385,7 +385,7 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { // delete from authoritative map th, ok := hc.healthByAlias[tabletAlias] if !ok { - log.Infof("We have no health data for tablet: %v, it might have been deleted already", tabletAlias) + log.Infof("We have no health data for tablet: %v, it might have been deleted already", tablet) return } // Calling this will end the context associated with th.checkConn, @@ -421,7 +421,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // tablet record that was deleted _, ok := hc.healthByAlias[tabletAlias] if !ok { - log.Infof("Tablet %s has been deleted, skipping health update", tabletAlias) + log.Infof("Tablet %v has been deleted, skipping health update", th.Tablet) return } diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index f6f89715c9b..f0ad9b0a2ac 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -289,10 +289,13 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { if err != nil { hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 1) - // We have reason to suspect the tablet healthcheck record is corrupted or invalid so let's remove the tablet's record - // from the healthcheck cache and the corrected tablet record will be fetched from the topology and added to the - // healthcheck cache again via the topology watcher. + // This means that another tablet has taken over the host:port that we were connected to. + // So let's remove the tablet's data from the healthcheck, and if it is still a part of the + // cluster, the new tablet record will be fetched from the topology server and re-added to + // the healthcheck cache again via the topology watcher. + // WARNING: Under no other circumstances should we be deleting the tablet here. if strings.Contains(err.Error(), "health stats mismatch") { + log.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet) hc.deleteTablet(thc.Tablet) return } @@ -329,7 +332,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { } func (thc *tabletHealthCheck) closeConnection(ctx context.Context, err error) { - log.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet.Alias, err) + log.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err) thc.setServingState(false, err.Error()) thc.LastError = err _ = thc.Conn.Close(ctx) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index d81abb4bae8..8c699a99e6f 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -216,19 +216,20 @@ func (tw *TopologyWatcher) loadTablets() { for alias, newVal := range newTablets { // trust the alias from topo and add it if it doesn't exist - if val, ok := tw.tablets[alias]; !ok { - tw.tabletRecorder.AddTablet(newVal.tablet) - topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) - } else { - // check if the host and port have changed. If yes, replace tablet + if val, ok := tw.tablets[alias]; ok { + // check if the host and port have changed. If yes, replace tablet. oldKey := TabletToMapKey(val.tablet) newKey := TabletToMapKey(newVal.tablet) if oldKey != newKey { // This is the case where the same tablet alias is now reporting - // a different address key. + // a different address (host:port) key. tw.tabletRecorder.ReplaceTablet(val.tablet, newVal.tablet) topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } + } else { + // This is a new tablet record, let's add it to the healthcheck + tw.tabletRecorder.AddTablet(newVal.tablet) + topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) } } From a9e1b47566cf742447190242267fa7ed1e2f6426 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 15 Nov 2021 21:42:16 -0500 Subject: [PATCH 3/5] Correct test now that we aren't aggresively pruning HC cache Signed-off-by: Matt Lord --- .../tablet_healthcheck_cache/correctness_test.go | 16 +++++++++++----- test/config.json | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go b/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go index 4769cd41e6f..a094e975a4f 100644 --- a/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go +++ b/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go @@ -130,7 +130,7 @@ func TestMain(m *testing.M) { // conditions with these operations and their interactions with the cache. func TestHealthCheckCacheWithTabletChurn(t *testing.T) { ctx := context.Background() - tries := 10 + tries := 5 numShards := len(shards) // 1 for primary,replica expectedTabletHCcacheEntries := numShards * 2 @@ -155,9 +155,14 @@ func TestHealthCheckCacheWithTabletChurn(t *testing.T) { qr, _ := vtgateConn.ExecuteFetch(query, 100, true) assert.Equal(t, expectedTabletHCcacheEntries, len(qr.Rows), "wrong number of tablet records in healthcheck cache, expected %d but had %d. Results: %v", expectedTabletHCcacheEntries, len(qr.Rows), qr.Rows) - killTablet(t, tablet) + deleteTablet(t, tablet) expectedTabletHCcacheEntries-- + // We need to sleep for at least vtgate's -tablet_refresh_interval to be sure we + // have resynchronized the healthcheck cache with the topo server via the topology + // watcher and pruned the deleted tablet from the healthcheck cache. + time.Sleep(1 * time.Minute) + qr, _ = vtgateConn.ExecuteFetch(query, 100, true) assert.Equal(t, expectedTabletHCcacheEntries, len(qr.Rows), "wrong number of tablet records in healthcheck cache, expected %d but had %d. Results: %v", expectedTabletHCcacheEntries, len(qr.Rows), qr.Rows) } @@ -213,8 +218,7 @@ func addTablet(t *testing.T, tabletUID int, tabletType string) *cluster.Vttablet return tablet } -func killTablet(t *testing.T, tablet *cluster.Vttablet) { - t.Logf("Killing tablet: %s", tablet.Alias) +func deleteTablet(t *testing.T, tablet *cluster.Vttablet) { var wg sync.WaitGroup wg.Add(1) go func(tablet *cluster.Vttablet) { @@ -225,6 +229,8 @@ func killTablet(t *testing.T, tablet *cluster.Vttablet) { }(tablet) wg.Wait() - err := clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", keyspaceName) + err := clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteTablet", tablet.Alias) require.Nil(t, err) + + t.Logf("Deleted tablet: %s", tablet.Alias) } diff --git a/test/config.json b/test/config.json index 85effbf1bbe..d8deb21f65e 100644 --- a/test/config.json +++ b/test/config.json @@ -801,7 +801,7 @@ "Command": [], "Manual": false, "Shard": "vtgate_tablet_healthcheck_cache", - "RetryMax": 2, + "RetryMax": 1, "Tags": [] }, "vtgate_transaction": { From 51060242ad7375d029f766ef45bcffc0078b82d8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 19 Nov 2021 14:54:48 -0500 Subject: [PATCH 4/5] Remove unnecessary map deletions and add nil check Signed-off-by: Matt Lord --- go/vt/discovery/healthcheck.go | 43 ++++++++++++---------------------- 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index ae8200496ae..dfe0df6e56f 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -351,13 +351,10 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { } hc.healthByAlias[tabletAliasString(tabletAlias)] = thc res := thc.SimpleCopy() - if ths, ok := hc.healthData[key]; !ok { + if _, ok := hc.healthData[key]; !ok { hc.healthData[key] = make(map[tabletAliasString]*TabletHealth) - hc.healthData[key][tabletAliasString(tabletAlias)] = res - } else { - // just overwrite it if it exists already - ths[tabletAliasString(tabletAlias)] = res } + hc.healthData[key][tabletAliasString(tabletAlias)] = res hc.broadcast(res) hc.connsWG.Add(1) @@ -381,6 +378,7 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { hc.mu.Lock() defer hc.mu.Unlock() + key := hc.keyFromTablet(tablet) tabletAlias := tabletAliasString(topoproto.TabletAliasString(tablet.Alias)) // delete from authoritative map th, ok := hc.healthByAlias[tabletAlias] @@ -392,18 +390,14 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { // which will call finalizeConn, which will close the connection. th.cancelFunc() delete(hc.healthByAlias, tabletAlias) - - // the tablet has been deleted from the authoritative healthByAlias map so let's ensure it's deleted - // from the healthData map as well, which means we need to delete any existing combinations of - // keyspace.shard.tabletType we may have had for the tablet - for _, key := range hc.keysFromTablet(tablet) { - if ths, ok := hc.healthData[key]; ok { - delete(ths, tabletAlias) - } + // delete from map by keyspace.shard.tabletType + ths, ok := hc.healthData[key] + if !ok { + log.Warningf("We have no health data for target: %v", key) + return } - - // We only need to recompute the healthy record for the current state of the tablet - key := hc.keyFromTablet(tablet) + delete(ths, tabletAlias) + // delete from healthy list healthy, ok := hc.healthy[key] if ok && len(healthy) > 0 { hc.recomputeHealthy(key) @@ -419,8 +413,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // let's be sure that this tablet hasn't been deleted from the authortative map // so that we're not racing to update it and in effect re-adding a copy of the // tablet record that was deleted - _, ok := hc.healthByAlias[tabletAlias] - if !ok { + if _, ok := hc.healthByAlias[tabletAlias]; !ok { log.Infof("Tablet %v has been deleted, skipping health update", th.Tablet) return } @@ -439,7 +432,10 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ hc.healthData[targetKey] = make(map[tabletAliasString]*TabletHealth) } } - // add it to the map by target + // add it to the map by target and create the map record if needed + if _, ok := hc.healthData[targetKey]; !ok { + hc.healthData[targetKey] = make(map[tabletAliasString]*TabletHealth) + } hc.healthData[targetKey][tabletAlias] = th isPrimary := th.Target.TabletType == topodata.TabletType_PRIMARY @@ -735,15 +731,6 @@ func (hc *HealthCheckImpl) keyFromTablet(tablet *topodata.Tablet) keyspaceShardT return keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tablet.Type))) } -// keysFromTablet returns a slice of potential type based keys for a tablet -func (hc *HealthCheckImpl) keysFromTablet(tablet *topodata.Tablet) []keyspaceShardTabletType { - tabletTypeKeys := make([]keyspaceShardTabletType, len(topoproto.AllTabletTypes)) - for i, tabletType := range topoproto.AllTabletTypes { - tabletTypeKeys[i] = keyspaceShardTabletType(fmt.Sprintf("%s.%s.%s", tablet.Keyspace, tablet.Shard, topoproto.TabletTypeLString(tabletType))) - } - return tabletTypeKeys -} - // getAliasByCell should only be called while holding hc.mu func (hc *HealthCheckImpl) getAliasByCell(cell string) string { if alias, ok := hc.cellAliases[cell]; ok { From cfbc6e838555e35631a6ccf225596e486da7b071 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 19 Nov 2021 15:01:31 -0500 Subject: [PATCH 5/5] Use explicit shorter tablet_refres_interval for test This ensures we are not assuming any default value and the tests will run in less time. Signed-off-by: Matt Lord --- .../correctness_test.go | 17 +++++++++-------- go/vt/discovery/healthcheck.go | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go b/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go index a094e975a4f..de45effc803 100644 --- a/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go +++ b/go/test/endtoend/vtgate/tablet_healthcheck_cache/correctness_test.go @@ -33,12 +33,13 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - keyspaceName = "healthcheck_test_ks" - cell = "healthcheck_test_cell" - shards = []string{"-80", "80-"} - schemaSQL = ` + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + tabletRefreshInterval = 5 * time.Second + keyspaceName = "healthcheck_test_ks" + cell = "healthcheck_test_cell" + shards = []string{"-80", "80-"} + schemaSQL = ` create table customer( customer_id bigint not null auto_increment, email varbinary(128), @@ -110,7 +111,7 @@ func TestMain(m *testing.M) { return 1 } - clusterInstance.VtGateExtraArgs = []string{} + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, []string{"-tablet_refresh_interval", tabletRefreshInterval.String()}...) err = clusterInstance.StartVtgate() if err != nil { return 1 @@ -161,7 +162,7 @@ func TestHealthCheckCacheWithTabletChurn(t *testing.T) { // We need to sleep for at least vtgate's -tablet_refresh_interval to be sure we // have resynchronized the healthcheck cache with the topo server via the topology // watcher and pruned the deleted tablet from the healthcheck cache. - time.Sleep(1 * time.Minute) + time.Sleep(tabletRefreshInterval) qr, _ = vtgateConn.ExecuteFetch(query, 100, true) assert.Equal(t, expectedTabletHCcacheEntries, len(qr.Rows), "wrong number of tablet records in healthcheck cache, expected %d but had %d. Results: %v", expectedTabletHCcacheEntries, len(qr.Rows), qr.Rows) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index dfe0df6e56f..70618276359 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -410,7 +410,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ defer hc.mu.Unlock() tabletAlias := tabletAliasString(topoproto.TabletAliasString(th.Tablet.Alias)) - // let's be sure that this tablet hasn't been deleted from the authortative map + // let's be sure that this tablet hasn't been deleted from the authoritative map // so that we're not racing to update it and in effect re-adding a copy of the // tablet record that was deleted if _, ok := hc.healthByAlias[tabletAlias]; !ok {