Skip to content

Commit

Permalink
expand min-resolved-ts to support stores
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Aug 2, 2023
1 parent 719e645 commit 46d5ceb
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 47 deletions.
14 changes: 6 additions & 8 deletions integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/tidb v1.1.0-beta.0.20230619015310-8b1006f1af04
github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a
github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.1
github.com/tikv/client-go/v2 v2.0.8-0.20230714052714-85fc8f337565
github.com/tikv/client-go/v2 v2.0.8-0.20230731032349-719e6456f7d5
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc
go.uber.org/goleak v1.2.1
)

require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/BurntSushi/toml v1.3.0 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down Expand Up @@ -65,14 +65,14 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect
github.com/pingcap/tidb/parser v0.0.0-20230619015310-8b1006f1af04 // indirect
github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a // indirect
github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/sasha-s/go-deadlock v0.2.0 // indirect
Expand Down Expand Up @@ -112,7 +112,5 @@ require (

replace (
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
github.com/pingcap/tidb => github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5
github.com/pingcap/tidb/parser => github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5
github.com/tikv/client-go/v2 => ../
)
22 changes: 11 additions & 11 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@ github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.3.0 h1:Ws8e5YmnrGEHzZEzg0YvK/7COGYtTC5PbaH9oSSbgfA=
github.com/BurntSushi/toml v1.3.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw=
github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5 h1:wSOvDYbKkvHjlWWFBihIoeJ5yBc1jZe9Ehkku3Jn8cA=
github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5/go.mod h1:C3tuWINS2/Vt/gxZ0OLdGI2x5crlN8E3/qNJJkIIkTI=
github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5 h1:bxwmPI7ambmbOAaozdYz81HFpIeu6ctWo7TiXfOGE14=
github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5/go.mod h1:ENXEsaVS6N3CTMpL4txc6m93y6XaztF9W4SFLjhPWJg=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down Expand Up @@ -363,14 +359,18 @@ github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a h1:5gSwd337GRaT1E0O3y10jb2ZDzv+h30pjCpRcCC5Phg=
github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a h1:JBh4xHmZ6w33+4fFTWZi5J3yPhLnOWChlf0Za875z8Y=
github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a/go.mod h1:KxfAxDSUAAgPyCpaAgQ7qTYaqDonaJA73eyGnUdMlcQ=
github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a h1:GEx1TEBP1O/mUumHhJodRa2Mk3w6Jtt+yNRq3yAxPjk=
github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a/go.mod h1:ENXEsaVS6N3CTMpL4txc6m93y6XaztF9W4SFLjhPWJg=
github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6 h1:D79RE4RVhq2ic8sqDSv7QdL0tT5aZV3CaCXUAT41iWc=
github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
Expand Down Expand Up @@ -407,8 +407,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
Expand Down Expand Up @@ -437,7 +437,7 @@ github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1K
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
Expand Down
9 changes: 6 additions & 3 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ func (s *apiTestSuite) SetupTest() {
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
s.store = store
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
// Need to start cluster at least with 3 stores for test.
for i := 1; i <= 3; i++ {
storeID := uint64(i)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
}
}

func (s *apiTestSuite) storeAddr(id uint64) string {
Expand Down Expand Up @@ -120,7 +123,7 @@ func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
}
retryCount++
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))

Expand Down
28 changes: 20 additions & 8 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,23 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
tikvClient := s.GetTiKVClient()
wg := &sync.WaitGroup{}
wg.Add(len(stores))
// Try to get the minimum resolved timestamp of the store from PD.
var (
err error
)
var storeIDs []string
for _, store := range stores {
storeIDs = append(storeIDs, strconv.FormatUint(store.StoreID(), 10))
}
StoreMinResolvedTSs := make(map[uint64]uint64)
if s.pdClient != nil {
StoreMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
if err != nil {
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
}
}

for _, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
Expand All @@ -595,16 +612,11 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {

var (
safeTS uint64
err error
)
storeIDStr := strconv.Itoa(int(storeID))
// Try to get the minimum resolved timestamp of the store from PD.
if s.pdHttpClient != nil {
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
}
if s.pdClient != nil {
safeTS = StoreMinResolvedTSs[storeID]
}
storeIDStr := strconv.Itoa(int(storeID))
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if safeTS == 0 || err != nil {
resp, err := tikvClient.SendRequest(
Expand Down
40 changes: 23 additions & 17 deletions util/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package util

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
Expand All @@ -56,7 +57,7 @@ const (
// pd request retry time when connection fail.
pdRequestRetryTime = 10

storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
minResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
)

// PDHTTPClient is an HTTP client of pd.
Expand Down Expand Up @@ -86,45 +87,48 @@ func NewPDHTTPClient(
}
}

// GetStoreMinResolvedTS get store-level min-resolved-ts from pd.
func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
// GetMinResolvedTSByStoresIDs get min-resolved-ts from pd by stores ids.
func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (map[uint64]uint64, error) {
var err error
for _, addr := range p.addrs {
query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID)
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
data, _ := json.Marshal(storeIDs)
v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, bytes.NewBuffer(data))
if e != nil {
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
err = e
continue
}
logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v)))
logutil.BgLogger().Debug("min resolved ts", zap.String("resp", string(v)))
d := struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
IsRealTime bool `json:"is_real_time,omitempty"`
StoreMinResolvedTS map[uint64]uint64 `json:"store_min_resolved_ts"`
}{}
err = json.Unmarshal(v, &d)
if err != nil {
return 0, errors.Trace(err)
return nil, errors.Trace(err)
}
if !d.IsRealTime {
message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr)
message := fmt.Errorf("min resolved ts not enabled, addr: %s", addr)
logutil.BgLogger().Debug(message.Error())
return 0, errors.Trace(message)
return nil, errors.Trace(message)
}
if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil {
// Need to make sure successfully get from real pd.
if d.MinResolvedTS != 0 {
// Should be val.(uint64) but failpoint doesn't support that.
if tmp, ok := val.(int); ok {
d.MinResolvedTS = uint64(tmp)
for storeID, v := range d.StoreMinResolvedTS {
if v != 0 {
// Should be val.(uint64) but failpoint doesn't support that.
if tmp, ok := val.(int); ok {
d.StoreMinResolvedTS[storeID] = uint64(tmp)
}
}
}

}

return d.MinResolvedTS, nil
return d.StoreMinResolvedTS, nil
}

return 0, errors.Trace(err)
return nil, errors.Trace(err)
}

// pdRequest is a func to send an HTTP to pd and return the result bytes.
Expand Down Expand Up @@ -178,7 +182,9 @@ func pdRequest(
}

r, err := io.ReadAll(resp.Body)

if err != nil {
println("resp err", err.Error())
return nil, errors.Trace(err)
}

Expand Down

0 comments on commit 46d5ceb

Please sign in to comment.