From 5453d020eefd192e7a543641b383adf8e0da969d Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 29 Jul 2024 23:20:11 +0800 Subject: [PATCH 1/9] fix issue that store's liveness may incorrectly marked as unreachable when the store restarts with label changed Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 17 +++++++++++- internal/locate/region_cache_test.go | 41 ++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index cbd265e84..d01cf916d 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2455,12 +2455,20 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} - newStore.livenessState = atomic.LoadUint32(&s.livenessState) newStore.unreachableSince = s.unreachableSince c.storeMu.Lock() + // set new store liveness state in mutex to avoid problem in https://github.com/tikv/client-go/issues/1401 + newStore.livenessState = atomic.LoadUint32(&s.livenessState) c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() s.setResolveState(deleted) + logutil.BgLogger().Info("store address or labels changed, add new store and mark old store deleted", + zap.Uint64("store", s.storeID), + zap.String("old-addr", s.addr), + zap.Any("old-labels", s.labels), + zap.String("new-addr", newStore.addr), + zap.Any("new-labels", newStore.labels), + zap.String("liveness", newStore.getLivenessState().String())) return false, nil } s.changeResolveStateTo(needCheck, resolved) @@ -2665,6 +2673,13 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol liveness = s.requestLiveness(bo, c) atomic.StoreUint32(&s.livenessState, uint32(liveness)) if liveness == reachable { + c.storeMu.RLock() + newStore := c.storeMu.stores[s.storeID] + if newStore != nil && newStore.addr == s.addr { + // set new store liveness state in mutex to avoid problem in https://github.com/tikv/client-go/issues/1401 + atomic.StoreUint32(&newStore.livenessState, uint32(liveness)) + } + c.storeMu.RUnlock() logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) return } diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 9d7961958..deb299b02 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1708,3 +1708,44 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable }, 3*time.Second, time.Second) } + +func (s *testRegionCacheSuite) TestIssue1401() { + // init region cache + s.cache.LocateKey(s.bo, []byte("a")) + + store1 := s.cache.getStoreByStoreID(s.store1) + s.Require().NotNil(store1) + s.Require().Equal(resolved, store1.getResolveState()) + // change store1 label. + labels := store1.labels + labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"}) + s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...) + + // mark the store is unreachable and need check. + atomic.StoreUint32(&store1.livenessState, uint32(unreachable)) + store1.setResolveState(needCheck) + + // setup mock liveness func + tf := func(s *Store, bo *retry.Backoffer) livenessState { + return reachable + } + s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + + // start health check loop + go store1.checkUntilHealth(s.cache, unreachable, time.Second*30) + + // mock asyncCheckAndResolveLoop worker to check and resolve store. + s.cache.checkAndResolve(nil, func(s *Store) bool { + return s.getResolveState() == needCheck + }) + + // assert that the old store should be deleted and it's reachable + s.Eventually(func() bool { + return store1.getResolveState() == deleted && store1.getLivenessState() == reachable + }, 3*time.Second, time.Second) + // assert the new store should be added and it should be resolved and reachable. + newStore1 := s.cache.getStoreByStoreID(s.store1) + s.Require().Equal(resolved, newStore1.getResolveState()) + s.Require().Equal(reachable, newStore1.getLivenessState()) + s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161")) +} From c08137fd9cdc0e6e893183f708f2284591c0c040 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 30 Jul 2024 12:08:12 +0800 Subject: [PATCH 2/9] refine Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index d01cf916d..e32b61701 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2455,10 +2455,12 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} - newStore.unreachableSince = s.unreachableSince - c.storeMu.Lock() - // set new store liveness state in mutex to avoid problem in https://github.com/tikv/client-go/issues/1401 newStore.livenessState = atomic.LoadUint32(&s.livenessState) + if newStore.getLivenessState() != reachable { + newStore.unreachableSince = s.unreachableSince + go newStore.checkUntilHealth(c, newStore.getLivenessState(), storeReResolveInterval) + } + c.storeMu.Lock() c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() s.setResolveState(deleted) @@ -2466,9 +2468,10 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { zap.Uint64("store", s.storeID), zap.String("old-addr", s.addr), zap.Any("old-labels", s.labels), + zap.String("old-liveness", s.getLivenessState().String()), zap.String("new-addr", newStore.addr), zap.Any("new-labels", newStore.labels), - zap.String("liveness", newStore.getLivenessState().String())) + zap.String("new-liveness", newStore.getLivenessState().String())) return false, nil } s.changeResolveStateTo(needCheck, resolved) @@ -2606,6 +2609,8 @@ func (s livenessState) String() string { } } +var storeReResolveInterval = 30 * time.Second + func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) { // This mechanism doesn't support non-TiKV stores currently. if s.storeType != tikvrpc.TiKV { @@ -2617,7 +2622,7 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt // It may be already started by another thread. if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { s.unreachableSince = time.Now() - reResolveInterval := 30 * time.Second + reResolveInterval := storeReResolveInterval if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { if dur, err := time.ParseDuration(val.(string)); err == nil { reResolveInterval = dur @@ -2673,13 +2678,6 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol liveness = s.requestLiveness(bo, c) atomic.StoreUint32(&s.livenessState, uint32(liveness)) if liveness == reachable { - c.storeMu.RLock() - newStore := c.storeMu.stores[s.storeID] - if newStore != nil && newStore.addr == s.addr { - // set new store liveness state in mutex to avoid problem in https://github.com/tikv/client-go/issues/1401 - atomic.StoreUint32(&newStore.livenessState, uint32(liveness)) - } - c.storeMu.RUnlock() logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) return } From 236f0f5d954f4030cc4051e3c611954bad5dcd8c Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 30 Jul 2024 12:11:15 +0800 Subject: [PATCH 3/9] remove unnecessary code Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index e32b61701..37e3612bb 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2657,19 +2657,7 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol if err != nil { logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) } else if !valid { - if s.getResolveState() == deleted { - // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). - c.storeMu.RLock() - newStore := c.storeMu.stores[s.storeID] - c.storeMu.RUnlock() - logutil.BgLogger().Info("[health check] store meta changed", - zap.Uint64("storeID", s.storeID), - zap.String("oldAddr", s.addr), - zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), - zap.String("newAddr", newStore.addr), - zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) - go newStore.checkUntilHealth(c, liveness, reResolveInterval) - } + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr)) return } } From 0f919d741e3614453e8a73913668f6121a2c9c11 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 30 Jul 2024 12:13:39 +0800 Subject: [PATCH 4/9] refine log Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 37e3612bb..9729698bf 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2657,7 +2657,7 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol if err != nil { logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) } else if !valid { - logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr)) + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) return } } From 5761fc94d22175596b737ea5e1d267c19328eb5e Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 30 Jul 2024 14:01:52 +0800 Subject: [PATCH 5/9] add log Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 9729698bf..bae4f9bae 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2669,6 +2669,10 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) return } + if s.getResolveState() == deleted { + logutil.BgLogger().Info("[health check] store has been deleted", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) + return + } } } } From cf447229b199493787d4f8193f181ac17c23c3a7 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 30 Jul 2024 15:41:03 +0800 Subject: [PATCH 6/9] refine Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index bae4f9bae..78ea3a904 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2650,6 +2650,10 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol case <-c.ctx.Done(): return case <-ticker.C: + if s.getResolveState() == deleted { + logutil.BgLogger().Info("[health check] store has been deleted", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) + return + } if time.Since(lastCheckPDTime) > reResolveInterval { lastCheckPDTime = time.Now() @@ -2669,10 +2673,6 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) return } - if s.getResolveState() == deleted { - logutil.BgLogger().Info("[health check] store has been deleted", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) - return - } } } } From 00ed200f60ec1264ad116be980d6a6d28995ac59 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 30 Jul 2024 15:43:25 +0800 Subject: [PATCH 7/9] fix test Signed-off-by: crazycs520 --- internal/locate/region_cache_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index deb299b02..0e8fd3435 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1739,9 +1739,9 @@ func (s *testRegionCacheSuite) TestIssue1401() { return s.getResolveState() == needCheck }) - // assert that the old store should be deleted and it's reachable + // assert that the old store should be deleted. s.Eventually(func() bool { - return store1.getResolveState() == deleted && store1.getLivenessState() == reachable + return store1.getResolveState() == deleted }, 3*time.Second, time.Second) // assert the new store should be added and it should be resolved and reachable. newStore1 := s.cache.getStoreByStoreID(s.store1) From 77d232af5ea8deb6bb9cc7f53cd979dd56aff227 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 30 Jul 2024 19:05:22 +0800 Subject: [PATCH 8/9] refine log Signed-off-by: crazycs520 --- internal/locate/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 78ea3a904..aabeb8149 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2651,7 +2651,7 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol return case <-ticker.C: if s.getResolveState() == deleted { - logutil.BgLogger().Info("[health check] store has been deleted", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) return } if time.Since(lastCheckPDTime) > reResolveInterval { From 45ae68a7fce43452c7bfdbef4fea70e4adf1be2c Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 30 Jul 2024 20:20:57 +0800 Subject: [PATCH 9/9] fix test Signed-off-by: crazycs520 --- internal/locate/region_cache_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 0e8fd3435..4e2c54c86 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1745,7 +1745,8 @@ func (s *testRegionCacheSuite) TestIssue1401() { }, 3*time.Second, time.Second) // assert the new store should be added and it should be resolved and reachable. newStore1 := s.cache.getStoreByStoreID(s.store1) - s.Require().Equal(resolved, newStore1.getResolveState()) - s.Require().Equal(reachable, newStore1.getLivenessState()) + s.Eventually(func() bool { + return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable + }, 3*time.Second, time.Second) s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161")) }