From ae859e24506e68dc0221cfff1cea94cb3c95a900 Mon Sep 17 00:00:00 2001 From: husharp Date: Fri, 1 Sep 2023 15:28:18 +0800 Subject: [PATCH] refine test Signed-off-by: husharp --- integration_tests/pd_api_test.go | 62 +++++++++++++++++++------------- tikv/kv.go | 3 ++ 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 91dbba160..d24bbbdf8 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -105,48 +105,57 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error { return c.Client.CloseAddr(addr) } -func (s *apiTestSuite) TestGetClusterMinResolvedTS() { +func (s *apiTestSuite) TestGetStoresMinResolvedTS() { util.EnableFailpoints() - // Try to get the minimum resolved timestamp of the cluster from PD. require := s.Require() + require.Nil(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) + defer func() { + require.Nil(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) + }() + + // Set DC label for store 1. + // Mock Cluster-level min resolved ts failed. + dcLabel := "testDC" + restore := config.UpdateGlobal(func(conf *config.Config) { + conf.TxnScope = dcLabel + }) + defer restore() + + labels := []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, + Value: dcLabel, + }, + } + storeID := uint64(1) + s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) + // Try to get the minimum resolved timestamp of the stores from PD. require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) mockClient := storeSafeTsMockClient{ Client: s.store.GetTiKVClient(), } s.store.SetTiKVClient(&mockClient) var retryCount int - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { - time.Sleep(2 * time.Second) + for s.store.GetMinSafeTS(dcLabel) != 100 { + time.Sleep(100 * time.Millisecond) if retryCount > 5 { break } retryCount++ } require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount)) - require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel)) require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) - - // Try to get the minimum resolved timestamp of the cluster from TiKV. - require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) - defer func() { - s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) - }() - retryCount = 0 - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 { - time.Sleep(2 * time.Second) - if retryCount > 5 { - break - } - retryCount++ - } - require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) - require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) } func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { util.EnableFailpoints() // Try to get the minimum resolved timestamp of the cluster from PD. require := s.Require() + require.Nil(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) + defer func() { + require.Nil(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) + }() require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) mockClient := storeSafeTsMockClient{ Client: s.store.GetTiKVClient(), @@ -154,7 +163,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { s.store.SetTiKVClient(&mockClient) var retryCount int for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { - time.Sleep(2 * time.Second) + time.Sleep(100 * time.Millisecond) if retryCount > 5 { break } @@ -162,11 +171,14 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { } require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + + // Set DC label for store 1. + // Mock PD server not support get min resolved ts by stores. + require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) defer func() { s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) }() - - // Set DC label for store 1. dcLabel := "testDC" restore := config.UpdateGlobal(func(conf *config.Config) { conf.TxnScope = dcLabel @@ -185,7 +197,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { // Try to get the minimum resolved timestamp of the store from TiKV. retryCount = 0 for s.store.GetMinSafeTS(dcLabel) != 150 { - time.Sleep(2 * time.Second) + time.Sleep(100 * time.Millisecond) if retryCount > 5 { break } diff --git a/tikv/kv.go b/tikv/kv.go index d7f9a4199..fcd730a89 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -565,6 +565,9 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { func (s *KVStore) safeTSUpdater() { defer s.wg.Done() t := time.NewTicker(safeTSUpdateInterval) + if _, e := util.EvalFailpoint("mockFastSafeTSUpdater"); e == nil { + t.Reset(time.Millisecond * 100) + } defer t.Stop() ctx, cancel := context.WithCancel(s.ctx) ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)