Skip to content

Commit

Permalink
api: Extend the min-resolved-ts api to support getting the specified …
Browse files Browse the repository at this point in the history
…store (#6880)

close #6879

Nowadays we have 2 api interface for obtaining min resolved ts

- /pd/api/v1/min-resolved-ts: obtain cluster's min resolved ts
- /pd/api/v1/min-resolved-ts/{store_id}: obtain each store's min resolved ts
For client-go's updateSafeTS, we call approach II for each store, which is not necessary.
We can extend the approach I request params to obtain specified store via a list to reduce the cost per call.

Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp authored Aug 14, 2023
1 parent 67a51b7 commit 54981d1
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 43 deletions.
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Client interface {
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
// Also it may return nil if PD finds no Region for the key temporarily,
// Also, it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
Expand All @@ -96,7 +96,7 @@ type Client interface {
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error)
// ScanRegion gets a list of regions, starts from the region that contains key.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
Expand All @@ -109,7 +109,7 @@ type Client interface {
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error)
// Update GC safe point. TiKV will check it and do GC themselves if necessary.
// UpdateGCSafePoint TiKV will check it and do GC themselves if necessary.
// If the given safePoint is less than the current one, it will not be updated.
// Returns the new safePoint after updating.
UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error)
Expand Down
68 changes: 55 additions & 13 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package api
import (
"net/http"
"strconv"
"strings"

"github.com/gorilla/mux"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand All @@ -38,17 +39,18 @@ func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolved

// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type minResolvedTS struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
PersistInterval typeutil.Duration `json:"persist_interval,omitempty"`
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
PersistInterval typeutil.Duration `json:"persist_interval,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}

// @Tags min_store_resolved_ts
// @Summary Get store-level min resolved ts.
// @Produce json
// @Success 200 {array} minResolvedTS
// @Produce json
// @Success 200 {array} minResolvedTS
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts/{store_id} [get]
func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
Expand All @@ -67,19 +69,59 @@ func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *h
})
}

// @Tags min_resolved_ts
// @Summary Get cluster-level min resolved ts.
// @Tags min_resolved_ts
// @Summary Get cluster-level min resolved ts and optionally store-level min resolved ts.
// @Description Optionally, we support a query parameter `scope`
// to get store-level min resolved ts by specifying a list of store IDs.
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
// - When scope given a list of stores, min_resolved_ts will be provided for each store
// and the scope-specific min_resolved_ts will be returned.
//
// @Produce json
// @Param scope query string false "Scope of the min resolved ts: comma-separated list of store IDs (e.g., '1,2,3')." default(cluster)
// @Success 200 {array} minResolvedTS
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
value := c.GetMinResolvedTS()
scopeMinResolvedTS := c.GetMinResolvedTS()
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval

var storesMinResolvedTS map[uint64]uint64
if scopeStr := r.URL.Query().Get("scope"); len(scopeStr) > 0 {
// scope is an optional parameter, it can be `cluster` or specified store IDs.
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
// - When scope given a list of stores, min_resolved_ts will be provided for each store
// and the scope-specific min_resolved_ts will be returned.
if scopeStr == "cluster" {
stores := c.GetMetaStores()
ids := make([]uint64, len(stores))
for i, store := range stores {
ids[i] = store.GetId()
}
// use cluster-level min_resolved_ts as the scope-specific min_resolved_ts.
_, storesMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids)
} else {
scopeIDs := strings.Split(scopeStr, ",")
ids := make([]uint64, len(scopeIDs))
for i, idStr := range scopeIDs {
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
ids[i] = id
}
scopeMinResolvedTS, storesMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids)
}
}

h.rd.JSON(w, http.StatusOK, minResolvedTS{
MinResolvedTS: value,
PersistInterval: persistInterval,
IsRealTime: persistInterval.Duration != 0,
MinResolvedTS: scopeMinResolvedTS,
PersistInterval: persistInterval,
IsRealTime: persistInterval.Duration != 0,
StoresMinResolvedTS: storesMinResolvedTS,
})
}
91 changes: 83 additions & 8 deletions server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package api
import (
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -36,6 +38,7 @@ type minResolvedTSTestSuite struct {
cleanup testutil.CleanupFunc
url string
defaultInterval time.Duration
storesNum int
}

func TestMinResolvedTSTestSuite(t *testing.T) {
Expand All @@ -53,11 +56,13 @@ func (suite *minResolvedTSTestSuite) SetupSuite() {
suite.url = fmt.Sprintf("%s%s/api/v1/min-resolved-ts", addr, apiPrefix)

mustBootstrapCluster(re, suite.svr)
mustPutStore(re, suite.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
r1 := core.NewTestRegionInfo(7, 1, []byte("a"), []byte("b"))
mustRegionHeartbeat(re, suite.svr, r1)
r2 := core.NewTestRegionInfo(8, 1, []byte("b"), []byte("c"))
mustRegionHeartbeat(re, suite.svr, r2)
suite.storesNum = 3
for i := 1; i <= suite.storesNum; i++ {
id := uint64(i)
mustPutStore(re, suite.svr, id, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
r := core.NewTestRegionInfo(id, id, []byte(fmt.Sprintf("%da", id)), []byte(fmt.Sprintf("%db", id)))
mustRegionHeartbeat(re, suite.svr, r)
}
}

func (suite *minResolvedTSTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -92,9 +97,8 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
PersistInterval: interval,
})
// case4: set min resolved ts
rc := suite.svr.GetRaftCluster()
ts := uint64(233)
rc.SetMinResolvedTS(1, ts)
suite.setAllStoresMinResolvedTS(ts)
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: ts,
IsRealTime: true,
Expand All @@ -108,20 +112,77 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
IsRealTime: false,
PersistInterval: interval,
})
rc.SetMinResolvedTS(1, ts+1)
suite.setAllStoresMinResolvedTS(ts)
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: ts, // last persist value
IsRealTime: false,
PersistInterval: interval,
})
}

func (suite *minResolvedTSTestSuite) TestMinResolvedTSByStores() {
// run job.
interval := typeutil.Duration{Duration: suite.defaultInterval}
suite.setMinResolvedTSPersistenceInterval(interval)
suite.Eventually(func() bool {
return interval == suite.svr.GetRaftCluster().GetPDServerConfig().MinResolvedTSPersistenceInterval
}, time.Second*10, time.Millisecond*20)
// set min resolved ts.
rc := suite.svr.GetRaftCluster()
ts := uint64(233)

// scope is `cluster`
testStoresID := make([]string, 0)
testMap := make(map[uint64]uint64)
for i := 1; i <= suite.storesNum; i++ {
storeID := uint64(i)
testTS := ts + storeID
testMap[storeID] = testTS
rc.SetMinResolvedTS(storeID, testTS)

testStoresID = append(testStoresID, strconv.Itoa(i))
}
suite.checkMinResolvedTSByStores(&minResolvedTS{
MinResolvedTS: 234,
IsRealTime: true,
PersistInterval: interval,
StoresMinResolvedTS: testMap,
}, "cluster")

// set all stores min resolved ts.
testStoresIDStr := strings.Join(testStoresID, ",")
suite.checkMinResolvedTSByStores(&minResolvedTS{
MinResolvedTS: 234,
IsRealTime: true,
PersistInterval: interval,
StoresMinResolvedTS: testMap,
}, testStoresIDStr)

// remove last store for test.
testStoresID = testStoresID[:len(testStoresID)-1]
testStoresIDStr = strings.Join(testStoresID, ",")
delete(testMap, uint64(suite.storesNum))
suite.checkMinResolvedTSByStores(&minResolvedTS{
MinResolvedTS: 234,
IsRealTime: true,
PersistInterval: interval,
StoresMinResolvedTS: testMap,
}, testStoresIDStr)
}

func (suite *minResolvedTSTestSuite) setMinResolvedTSPersistenceInterval(duration typeutil.Duration) {
cfg := suite.svr.GetRaftCluster().GetPDServerConfig().Clone()
cfg.MinResolvedTSPersistenceInterval = duration
suite.svr.GetRaftCluster().SetPDServerConfig(cfg)
}

func (suite *minResolvedTSTestSuite) setAllStoresMinResolvedTS(ts uint64) {
rc := suite.svr.GetRaftCluster()
for i := 1; i <= suite.storesNum; i++ {
rc.SetMinResolvedTS(uint64(i), ts)
}
}

func (suite *minResolvedTSTestSuite) checkMinResolvedTS(expect *minResolvedTS) {
suite.Eventually(func() bool {
res, err := testDialClient.Get(suite.url)
Expand All @@ -130,6 +191,20 @@ func (suite *minResolvedTSTestSuite) checkMinResolvedTS(expect *minResolvedTS) {
listResp := &minResolvedTS{}
err = apiutil.ReadJSON(res.Body, listResp)
suite.NoError(err)
suite.Nil(listResp.StoresMinResolvedTS)
return reflect.DeepEqual(expect, listResp)
}, time.Second*10, time.Millisecond*20)
}

func (suite *minResolvedTSTestSuite) checkMinResolvedTSByStores(expect *minResolvedTS, scope string) {
suite.Eventually(func() bool {
url := fmt.Sprintf("%s?scope=%s", suite.url, scope)
res, err := testDialClient.Get(url)
suite.NoError(err)
defer res.Body.Close()
listResp := &minResolvedTS{}
err = apiutil.ReadJSON(res.Body, listResp)
suite.NoError(err)
return reflect.DeepEqual(expect, listResp)
}, time.Second*10, time.Millisecond*20)
}
23 changes: 21 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2548,10 +2548,29 @@ func (c *RaftCluster) GetMinResolvedTS() uint64 {
func (c *RaftCluster) GetStoreMinResolvedTS(storeID uint64) uint64 {
c.RLock()
defer c.RUnlock()
if !c.isInitialized() || !core.IsAvailableForMinResolvedTS(c.GetStore(storeID)) {
store := c.GetStore(storeID)
if store == nil {
return math.MaxUint64
}
if !c.isInitialized() || !core.IsAvailableForMinResolvedTS(store) {
return math.MaxUint64
}
return c.GetStore(storeID).GetMinResolvedTS()
return store.GetMinResolvedTS()
}

// GetMinResolvedTSByStoreIDs returns the min_resolved_ts for each store
// and returns the min_resolved_ts for all given store lists.
func (c *RaftCluster) GetMinResolvedTSByStoreIDs(ids []uint64) (uint64, map[uint64]uint64) {
minResolvedTS := uint64(math.MaxUint64)
storesMinResolvedTS := make(map[uint64]uint64)
for _, storeID := range ids {
storeTS := c.GetStoreMinResolvedTS(storeID)
storesMinResolvedTS[storeID] = storeTS
if minResolvedTS > storeTS {
minResolvedTS = storeTS
}
}
return minResolvedTS, storesMinResolvedTS
}

// GetExternalTS returns the external timestamp.
Expand Down
11 changes: 11 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,13 @@ func checkMinResolvedTS(re *require.Assertions, rc *cluster.RaftCluster, expect
}, time.Second*10, time.Millisecond*50)
}

func checkStoreMinResolvedTS(re *require.Assertions, rc *cluster.RaftCluster, expectTS, storeID uint64) {
re.Eventually(func() bool {
ts := rc.GetStoreMinResolvedTS(storeID)
return expectTS == ts
}, time.Second*10, time.Millisecond*50)
}

func checkMinResolvedTSFromStorage(re *require.Assertions, rc *cluster.RaftCluster, expect uint64) {
re.Eventually(func() bool {
ts2, err := rc.GetStorage().LoadMinResolvedTS()
Expand Down Expand Up @@ -1400,6 +1407,9 @@ func TestMinResolvedTS(t *testing.T) {
resetStoreState(re, rc, store1, metapb.StoreState_Tombstone)
checkMinResolvedTS(re, rc, store3TS)
checkMinResolvedTSFromStorage(re, rc, store3TS)
checkStoreMinResolvedTS(re, rc, store3TS, store3)
// check no-exist store
checkStoreMinResolvedTS(re, rc, math.MaxUint64, 100)

// case7: add a store with leader peer but no report min resolved ts
// min resolved ts should be no change
Expand All @@ -1419,6 +1429,7 @@ func TestMinResolvedTS(t *testing.T) {
checkMinResolvedTS(re, rc, store3TS)
setMinResolvedTSPersistenceInterval(re, rc, svr, time.Millisecond)
checkMinResolvedTS(re, rc, store5TS)
checkStoreMinResolvedTS(re, rc, store5TS, store5)
}

// See https://github.com/tikv/pd/issues/4941
Expand Down
13 changes: 12 additions & 1 deletion tools/pd-api-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,22 @@ The api bench cases we support are as follows:
-debug
> print the output of api response for debug
### Run Shell

You can run shell as follows.
```shell
go run main.go -http-cases GetRegionStatus-1+1,GetMinResolvedTS-1+1 -client 1 -debug
```

### HTTP params

You can use the following command to set the params of HTTP request:
```shell
go run main.go -http-cases GetMinResolvedTS-1+1 -params 'scope=cluster' -client 1 -debug
```
for more params, can use like `-params 'A=1&B=2&C=3'`


### TLS

You can use the following command to generate a certificate for testing TLS:
Expand All @@ -74,4 +85,4 @@ mkdir cert
go run main.go -http-cases GetRegionStatus-1+1,GetMinResolvedTS-1+1 -client 1 -debug -cacert ./cert/ca.pem -cert ./cert/pd-server.pem -key ./cert/pd-server-key.pem
./cert_opt.sh cleanup cert
rm -rf cert
```
```
Loading

0 comments on commit 54981d1

Please sign in to comment.