Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv/safeTS: obtain min-resolved-ts by store list #921

Merged
merged 18 commits into from
Sep 5, 2023
Merged
91 changes: 79 additions & 12 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand All @@ -67,7 +69,7 @@ type apiTestSuite struct {
func (s *apiTestSuite) SetupTest() {
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().Nil(err)
s.Require().NoError(err)
rpcClient := tikv.NewRPCClient()
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
Expand Down Expand Up @@ -103,42 +105,107 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the store from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
defer func() {
require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()

// Set DC label for store 1.
// Mock Cluster-level min resolved ts failed.
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
})
defer restore()

labels := []*metapb.StoreLabel{
{
Key: tikv.DCLabelKey,
Value: dcLabel,
},
}
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(dcLabel) != 100 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
defer func() {
require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

// Try to get the minimum resolved timestamp of the store from TiKV.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
// Set DC label for store 1.
// Mock PD server not support get min resolved ts by stores.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
})
defer restore()

labels := []*metapb.StoreLabel{
{
Key: tikv.DCLabelKey,
Value: dcLabel,
},
}
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)

// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(2 * time.Second)
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}

require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
}

func (s *apiTestSuite) TearDownTest() {
Expand Down
84 changes: 68 additions & 16 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
func (s *KVStore) safeTSUpdater() {
defer s.wg.Done()
t := time.NewTicker(safeTSUpdateInterval)
if _, e := util.EvalFailpoint("mockFastSafeTSUpdater"); e == nil {
t.Reset(time.Millisecond * 100)
}
defer t.Stop()
ctx, cancel := context.WithCancel(s.ctx)
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
Expand All @@ -580,33 +583,45 @@ func (s *KVStore) safeTSUpdater() {
}

func (s *KVStore) updateSafeTS(ctx context.Context) {
// Try to get the cluster-level minimum resolved timestamp from PD first.
if s.updateGlobalTxnScopeTSFromPD(ctx) {
return
}

// When txn scope is not global, we need to get the minimum resolved timestamp of each store.
stores := s.regionCache.GetAllStores()
tikvClient := s.GetTiKVClient()
wg := &sync.WaitGroup{}
wg.Add(len(stores))
for _, store := range stores {
// Try to get the minimum resolved timestamp of the store from PD.
var (
err error
storeMinResolvedTSs map[uint64]uint64
)
storeIDs := make([]string, len(stores))
if s.pdHttpClient != nil {
for i, store := range stores {
storeIDs[i] = strconv.FormatUint(store.StoreID(), 10)
}
_, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
}
}

for i, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
if store.IsTiFlash() {
storeAddr = store.GetPeerAddr()
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) {
defer wg.Done()

var (
safeTS uint64
err error
)
storeIDStr := strconv.Itoa(int(storeID))
// Try to get the minimum resolved timestamp of the store from PD.
if s.pdHttpClient != nil {
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
}
}
var safeTS uint64
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if safeTS == 0 || err != nil {
if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
Expand All @@ -625,6 +640,8 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
return
}
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
} else {
safeTS = storeMinResolvedTSs[storeID]
}

_, preSafeTS := s.getSafeTS(storeID)
Expand All @@ -638,7 +655,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc()
safeTSTime := oracle.GetTimeFromTS(safeTS)
metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds())
}(ctx, wg, storeID, storeAddr)
}(ctx, wg, storeID, storeAddr, storeIDs[i])
}

txnScopeMap := make(map[string][]uint64)
Expand All @@ -655,6 +672,41 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

var (
skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster")
)

// updateGlobalTxnScopeTSFromPD check whether it is needed to get cluster-level's min resolved ts from PD
// to update min safe ts for global txn scope.
func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
} else if clusterMinSafeTS != 0 {
// Update ts and metrics.
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
if preClusterMinSafeTS > clusterMinSafeTS {
skipSafeTSUpdateCounter.Inc()
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())
} else {
s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
successSafeTSUpdateCounter.Inc()
safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds())
}
return true
}
}

return false
}

func (s *KVStore) ruRuntimeStatsMapCleaner() {
defer s.wg.Done()
t := time.NewTicker(ruRuntimeStatsCleanInterval)
Expand Down
53 changes: 37 additions & 16 deletions util/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
// pd request retry time when connection fail.
pdRequestRetryTime = 10

storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
minResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
)

// PDHTTPClient is an HTTP client of pd.
Expand Down Expand Up @@ -86,45 +86,66 @@ func NewPDHTTPClient(
}
}

// GetStoreMinResolvedTS get store-level min-resolved-ts from pd.
func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
// GetMinResolvedTSByStoresIDs get min-resolved-ts from pd by stores ids.
func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (uint64, map[uint64]uint64, error) {
var err error
for _, addr := range p.addrs {
query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID)
// scope is an optional parameter, it can be `cluster` or specified store IDs.
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
// - When scope given a list of stores, min_resolved_ts will be provided for each store
// and the scope-specific min_resolved_ts will be returned.
query := minResolvedTSPrefix
if len(storeIDs) != 0 {
query = fmt.Sprintf("%s?scope=%s", query, strings.Join(storeIDs, ","))
}
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
if e != nil {
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
err = e
continue
}
logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v)))
logutil.BgLogger().Debug("min resolved ts", zap.String("resp", string(v)))
d := struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
IsRealTime bool `json:"is_real_time,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}{}
err = json.Unmarshal(v, &d)
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}
if !d.IsRealTime {
message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr)
message := fmt.Errorf("min resolved ts not enabled, addr: %s", addr)
logutil.BgLogger().Debug(message.Error())
return 0, errors.Trace(message)
return 0, nil, errors.Trace(message)
}
if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil {
// Need to make sure successfully get from real pd.
if d.MinResolvedTS != 0 {
// Should be val.(uint64) but failpoint doesn't support that.
if tmp, ok := val.(int); ok {
d.MinResolvedTS = uint64(tmp)
if d.StoresMinResolvedTS != nil {
for storeID, v := range d.StoresMinResolvedTS {
if v != 0 {
// Should be val.(uint64) but failpoint doesn't support that.
if tmp, ok := val.(int); ok {
d.StoresMinResolvedTS[storeID] = uint64(tmp)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(tmp)))
}
}
}
} else if tmp, ok := val.(int); ok {
// Should be val.(uint64) but failpoint doesn't support that.
// ci's store id is 1, we can change it if we have more stores.
// but for pool ci it's no need to do that :(
d.MinResolvedTS = uint64(tmp)
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp)))
}

}

return d.MinResolvedTS, nil
return d.MinResolvedTS, d.StoresMinResolvedTS, nil
}

return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}

// pdRequest is a func to send an HTTP to pd and return the result bytes.
Expand Down
Loading