Skip to content

Commit

Permalink
refine test
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Sep 1, 2023
1 parent b61256b commit ae859e2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
62 changes: 37 additions & 25 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,68 +105,80 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) TestGetClusterMinResolvedTS() {
func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
defer func() {
require.Nil(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()

// Set DC label for store 1.
// Mock Cluster-level min resolved ts failed.
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
})
defer restore()

labels := []*metapb.StoreLabel{
{
Key: tikv.DCLabelKey,
Value: dcLabel,
},
}
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
for s.store.GetMinSafeTS(dcLabel) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
defer func() {
require.Nil(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Set DC label for store 1.
// Mock PD server not support get min resolved ts by stores.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()

// Set DC label for store 1.
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
Expand All @@ -185,7 +197,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(2 * time.Second)
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
Expand Down
3 changes: 3 additions & 0 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
func (s *KVStore) safeTSUpdater() {
defer s.wg.Done()
t := time.NewTicker(safeTSUpdateInterval)
if _, e := util.EvalFailpoint("mockFastSafeTSUpdater"); e == nil {
t.Reset(time.Millisecond * 100)
}
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
Expand Down

0 comments on commit ae859e2

Please sign in to comment.