diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index aa5d3855c..b47d294e9 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -44,7 +44,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -67,7 +69,7 @@ type apiTestSuite struct { func (s *apiTestSuite) SetupTest() { addrs := strings.Split(*pdAddrs, ",") pdClient, err := pd.NewClient(addrs, pd.SecurityOption{}) - s.Require().Nil(err) + s.Require().NoError(err) rpcClient := tikv.NewRPCClient() // Set PD HTTP client. store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs)) @@ -103,18 +105,65 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error { return c.Client.CloseAddr(addr) } -func (s *apiTestSuite) TestGetStoreMinResolvedTS() { +func (s *apiTestSuite) TestGetStoresMinResolvedTS() { util.EnableFailpoints() - // Try to get the minimum resolved timestamp of the store from PD. require := s.Require() - require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) + require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) + defer func() { + require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) + }() + + // Set DC label for store 1. + // Mock Cluster-level min resolved ts failed. + dcLabel := "testDC" + restore := config.UpdateGlobal(func(conf *config.Config) { + conf.TxnScope = dcLabel + }) + defer restore() + + labels := []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, + Value: dcLabel, + }, + } + storeID := uint64(1) + s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) + // Try to get the minimum resolved timestamp of the stores from PD. + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) + mockClient := storeSafeTsMockClient{ + Client: s.store.GetTiKVClient(), + } + s.store.SetTiKVClient(&mockClient) + var retryCount int + for s.store.GetMinSafeTS(dcLabel) != 100 { + time.Sleep(100 * time.Millisecond) + if retryCount > 5 { + break + } + retryCount++ + } + require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount)) + require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel)) + require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) +} + +func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { + util.EnableFailpoints() + // Try to get the minimum resolved timestamp of the cluster from PD. + require := s.Require() + require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) + defer func() { + require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) + }() + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) mockClient := storeSafeTsMockClient{ Client: s.store.GetTiKVClient(), } s.store.SetTiKVClient(&mockClient) var retryCount int for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { - time.Sleep(2 * time.Second) + time.Sleep(100 * time.Millisecond) if retryCount > 5 { break } @@ -122,23 +171,41 @@ func (s *apiTestSuite) TestGetStoreMinResolvedTS() { } require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) - // Try to get the minimum resolved timestamp of the store from TiKV. - require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) + // Set DC label for store 1. + // Mock PD server not support get min resolved ts by stores. + require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) defer func() { - s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS")) }() + dcLabel := "testDC" + restore := config.UpdateGlobal(func(conf *config.Config) { + conf.TxnScope = dcLabel + }) + defer restore() + + labels := []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, + Value: dcLabel, + }, + } + storeID := uint64(1) + s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) + + // Try to get the minimum resolved timestamp of the store from TiKV. retryCount = 0 - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 { - time.Sleep(2 * time.Second) + for s.store.GetMinSafeTS(dcLabel) != 150 { + time.Sleep(100 * time.Millisecond) if retryCount > 5 { break } retryCount++ } + require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) - require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) } func (s *apiTestSuite) TearDownTest() { diff --git a/tikv/kv.go b/tikv/kv.go index 3e9753612..506bfd67d 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -565,6 +565,9 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { func (s *KVStore) safeTSUpdater() { defer s.wg.Done() t := time.NewTicker(safeTSUpdateInterval) + if _, e := util.EvalFailpoint("mockFastSafeTSUpdater"); e == nil { + t.Reset(time.Millisecond * 100) + } defer t.Stop() ctx, cancel := context.WithCancel(s.ctx) ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) @@ -580,33 +583,45 @@ func (s *KVStore) safeTSUpdater() { } func (s *KVStore) updateSafeTS(ctx context.Context) { + // Try to get the cluster-level minimum resolved timestamp from PD first. + if s.updateGlobalTxnScopeTSFromPD(ctx) { + return + } + + // When txn scope is not global, we need to get the minimum resolved timestamp of each store. stores := s.regionCache.GetAllStores() tikvClient := s.GetTiKVClient() wg := &sync.WaitGroup{} wg.Add(len(stores)) - for _, store := range stores { + // Try to get the minimum resolved timestamp of the store from PD. + var ( + err error + storeMinResolvedTSs map[uint64]uint64 + ) + storeIDs := make([]string, len(stores)) + if s.pdHttpClient != nil { + for i, store := range stores { + storeIDs[i] = strconv.FormatUint(store.StoreID(), 10) + } + _, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs) + if err != nil { + // If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV. + logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs)) + } + } + + for i, store := range stores { storeID := store.StoreID() storeAddr := store.GetAddr() if store.IsTiFlash() { storeAddr = store.GetPeerAddr() } - go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) { + go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) { defer wg.Done() - var ( - safeTS uint64 - err error - ) - storeIDStr := strconv.Itoa(int(storeID)) - // Try to get the minimum resolved timestamp of the store from PD. - if s.pdHttpClient != nil { - safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID) - if err != nil { - logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID)) - } - } + var safeTS uint64 // If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV. - if safeTS == 0 || err != nil { + if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil { resp, err := tikvClient.SendRequest( ctx, storeAddr, tikvrpc.NewRequest( tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{ @@ -625,6 +640,8 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { return } safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs() + } else { + safeTS = storeMinResolvedTSs[storeID] } _, preSafeTS := s.getSafeTS(storeID) @@ -638,7 +655,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc() safeTSTime := oracle.GetTimeFromTS(safeTS) metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds()) - }(ctx, wg, storeID, storeAddr) + }(ctx, wg, storeID, storeAddr, storeIDs[i]) } txnScopeMap := make(map[string][]uint64) @@ -655,6 +672,41 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { wg.Wait() } +var ( + skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster") + successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster") + clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster") +) + +// updateGlobalTxnScopeTSFromPD check whether it is needed to get cluster-level's min resolved ts from PD +// to update min safe ts for global txn scope. +func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { + isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope + // Try to get the minimum resolved timestamp of the cluster from PD. + if s.pdHttpClient != nil && isGlobal { + clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil) + if err != nil { + logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err)) + } else if clusterMinSafeTS != 0 { + // Update ts and metrics. + preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope) + if preClusterMinSafeTS > clusterMinSafeTS { + skipSafeTSUpdateCounter.Inc() + preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS) + clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds()) + } else { + s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS) + successSafeTSUpdateCounter.Inc() + safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS) + clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds()) + } + return true + } + } + + return false +} + func (s *KVStore) ruRuntimeStatsMapCleaner() { defer s.wg.Done() t := time.NewTicker(ruRuntimeStatsCleanInterval) diff --git a/util/pd.go b/util/pd.go index b4c405c15..6f27c514f 100644 --- a/util/pd.go +++ b/util/pd.go @@ -56,7 +56,7 @@ const ( // pd request retry time when connection fail. pdRequestRetryTime = 10 - storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts" + minResolvedTSPrefix = "pd/api/v1/min-resolved-ts" ) // PDHTTPClient is an HTTP client of pd. @@ -86,45 +86,66 @@ func NewPDHTTPClient( } } -// GetStoreMinResolvedTS get store-level min-resolved-ts from pd. -func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) { +// GetMinResolvedTSByStoresIDs get min-resolved-ts from pd by stores ids. +func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (uint64, map[uint64]uint64, error) { var err error for _, addr := range p.addrs { - query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID) + // scope is an optional parameter, it can be `cluster` or specified store IDs. + // - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil. + // - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled. + // - When scope given a list of stores, min_resolved_ts will be provided for each store + // and the scope-specific min_resolved_ts will be returned. + query := minResolvedTSPrefix + if len(storeIDs) != 0 { + query = fmt.Sprintf("%s?scope=%s", query, strings.Join(storeIDs, ",")) + } v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil) if e != nil { logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e)) err = e continue } - logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v))) + logutil.BgLogger().Debug("min resolved ts", zap.String("resp", string(v))) d := struct { - IsRealTime bool `json:"is_real_time,omitempty"` - MinResolvedTS uint64 `json:"min_resolved_ts"` + MinResolvedTS uint64 `json:"min_resolved_ts"` + IsRealTime bool `json:"is_real_time,omitempty"` + StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"` }{} err = json.Unmarshal(v, &d) if err != nil { - return 0, errors.Trace(err) + return 0, nil, errors.Trace(err) } if !d.IsRealTime { - message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr) + message := fmt.Errorf("min resolved ts not enabled, addr: %s", addr) logutil.BgLogger().Debug(message.Error()) - return 0, errors.Trace(message) + return 0, nil, errors.Trace(message) } if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil { // Need to make sure successfully get from real pd. - if d.MinResolvedTS != 0 { - // Should be val.(uint64) but failpoint doesn't support that. - if tmp, ok := val.(int); ok { - d.MinResolvedTS = uint64(tmp) + if d.StoresMinResolvedTS != nil { + for storeID, v := range d.StoresMinResolvedTS { + if v != 0 { + // Should be val.(uint64) but failpoint doesn't support that. + if tmp, ok := val.(int); ok { + d.StoresMinResolvedTS[storeID] = uint64(tmp) + logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(tmp))) + } + } } + } else if tmp, ok := val.(int); ok { + // Should be val.(uint64) but failpoint doesn't support that. + // ci's store id is 1, we can change it if we have more stores. + // but for pool ci it's no need to do that :( + d.MinResolvedTS = uint64(tmp) + logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp))) } + } - return d.MinResolvedTS, nil + return d.MinResolvedTS, d.StoresMinResolvedTS, nil } - return 0, errors.Trace(err) + return 0, nil, errors.Trace(err) } // pdRequest is a func to send an HTTP to pd and return the result bytes.