Skip to content

Commit

Permalink
Merge branch 'master' into optimize-lock-of-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Apr 8, 2024
2 parents bc036e4 + 11dfa15 commit 4c21000
Show file tree
Hide file tree
Showing 15 changed files with 275 additions and 131 deletions.
2 changes: 1 addition & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
var (
ErrClientListResourceGroup = errors.Normalize("get all resource group failed, %v", errors.RFCCodeText("PD:client:ErrClientListResourceGroup"))
ErrClientResourceGroupConfigUnavailable = errors.Normalize("resource group config is unavailable, %v", errors.RFCCodeText("PD:client:ErrClientResourceGroupConfigUnavailable"))
ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled"))
ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation, estimated wait time %s, ltb state is %.2f:%.2f", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled"))
)

// ErrClientGetResourceGroup is the error type for getting resource group.
Expand Down
26 changes: 15 additions & 11 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDurtion time.Duration
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDuration time.Duration
// This is the Limit at reservation time, it can change later.
limit Limit
limit Limit
remainingTokens float64
}

// OK returns whether the limiter can provide the requested number of tokens
Expand Down Expand Up @@ -359,10 +360,11 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDurtion: waitDuration,
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDuration: waitDuration,
remainingTokens: tokens,
}
if ok {
r.tokens = n
Expand All @@ -380,6 +382,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
zap.Float64("current-ltb-tokens", lim.tokens),
zap.Float64("current-ltb-rate", float64(lim.limit)),
zap.Float64("request-tokens", n),
zap.Float64("notify-threshold", lim.notifyThreshold),
zap.Bool("is-low-process", lim.isLowProcess),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
lim.last = last
Expand Down Expand Up @@ -461,7 +465,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled
return res.needWaitDuration, errs.ErrClientResourceGroupThrottled.FastGenByArgs(res.needWaitDuration, res.limit, res.remainingTokens)
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestCancel(t *testing.T) {
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(4*time.Second, d)
re.Error(err)
re.Contains(err.Error(), "estimated wait time 4s, ltb state is 1.00:-4.00")
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
cancel1()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41
github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo=
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1 h1:vDWWJKU6ztczn24XixahtLwcnJ15DOtSRIRM3jVtZNU=
github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
7 changes: 7 additions & 0 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -780,6 +781,12 @@ func (r *regionItem) IsRaftStale(origin *regionItem, u *Controller) bool {
func(a, b *regionItem) int {
return int(a.report.GetRaftState().GetHardState().GetTerm()) - int(b.report.GetRaftState().GetHardState().GetTerm())
},
// choose the peer has maximum applied index or last index.
func(a, b *regionItem) int {
maxIdxA := typeutil.MaxUint64(a.report.GetRaftState().GetLastIndex(), a.report.AppliedIndex)
maxIdxB := typeutil.MaxUint64(b.report.GetRaftState().GetLastIndex(), b.report.AppliedIndex)
return int(maxIdxA - maxIdxB)
},
func(a, b *regionItem) int {
return int(a.report.GetRaftState().GetLastIndex()) - int(b.report.GetRaftState().GetLastIndex())
},
Expand Down
102 changes: 102 additions & 0 deletions pkg/unsaferecovery/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1856,3 +1856,105 @@ func newTestStores(n uint64, version string) []*core.StoreInfo {
func getTestDeployPath(storeID uint64) string {
return fmt.Sprintf("test/store%d", storeID)
}

func TestSelectLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(ctx, opts)
coordinator := schedule.NewCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster.ID, cluster, true))
coordinator.Run()
stores := newTestStores(6, "6.0.0")
labels := []*metapb.StoreLabel{
{
Key: core.EngineKey,
Value: core.EngineTiFlash,
},
}
stores[5].IsTiFlash()
core.SetStoreLabels(labels)(stores[5])
for _, store := range stores {
cluster.PutStore(store)
}
recoveryController := NewController(cluster)

cases := []struct {
peers []*regionItem
leaderID uint64
}{
{
peers: []*regionItem{
newPeer(1, 1, 10, 5, 4),
newPeer(2, 2, 9, 9, 8),
},
leaderID: 2,
},
{
peers: []*regionItem{
newPeer(1, 1, 10, 10, 9),
newPeer(2, 1, 8, 8, 15),
newPeer(3, 1, 12, 11, 11),
},
leaderID: 2,
},
{
peers: []*regionItem{
newPeer(1, 1, 9, 9, 11),
newPeer(2, 1, 10, 8, 7),
newPeer(3, 1, 11, 7, 6),
},
leaderID: 3,
},
{
peers: []*regionItem{
newPeer(1, 1, 11, 11, 8),
newPeer(2, 1, 11, 10, 10),
newPeer(3, 1, 11, 9, 8),
},
leaderID: 1,
},
{
peers: []*regionItem{
newPeer(6, 1, 11, 11, 9),
newPeer(1, 1, 11, 11, 8),
newPeer(2, 1, 11, 10, 10),
newPeer(3, 1, 11, 9, 8),
},
leaderID: 1,
},
}

for i, c := range cases {
peersMap := map[uint64][]*regionItem{
1: c.peers,
}
region := &metapb.Region{
Id: 1,
}
leader := recoveryController.selectLeader(peersMap, region)
re.Equal(leader.Region().Id, c.leaderID, "case: %d", i)
}
}

func newPeer(storeID, term, lastIndex, committedIndex, appliedIndex uint64) *regionItem {
return &regionItem{
storeID: storeID,
report: &pdpb.PeerReport{
RaftState: &raft_serverpb.RaftLocalState{
HardState: &eraftpb.HardState{
Term: term,
Commit: committedIndex,
},
LastIndex: lastIndex,
},
RegionState: &raft_serverpb.RegionLocalState{
Region: &metapb.Region{
Id: storeID,
},
},
AppliedIndex: appliedIndex,
},
}
}
2 changes: 1 addition & 1 deletion tests/integrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/go-sql-driver/mysql v1.7.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41
github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ=
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 h1:7tDr4J6gGQ3OqBq+lZQkI9wlJIIXFitHjNK8ymU/SEo=
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1 h1:vDWWJKU6ztczn24XixahtLwcnJ15DOtSRIRM3jVtZNU=
github.com/pingcap/kvproto v0.0.0-20240403065636-c699538f7aa1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
2 changes: 2 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ func (suite *apiTestSuite) checkStores(cluster *tests.TestCluster) {
for _, store := range stores {
tests.MustPutStore(re, cluster, store)
}
// prevent the offline store from changing to tombstone
tests.MustPutRegion(re, cluster, 3, 6, []byte("a"), []byte("b"))
// Test /stores
apiServerAddr := cluster.GetLeaderServer().GetAddr()
urlPrefix := fmt.Sprintf("%s/pd/api/v1/stores", apiServerAddr)
Expand Down
82 changes: 37 additions & 45 deletions tests/server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,11 +780,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) {
},
},
}
var bundles []placement.GroupBundle
err := tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles)
re.NoError(err)
re.Len(bundles, 1)
suite.assertBundleEqual(re, bundles[0], b1)
suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1}, 1)

// Set
b2 := placement.GroupBundle{
Expand All @@ -801,27 +797,17 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) {
re.NoError(err)

// Get
var bundle placement.GroupBundle
err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/foo", &bundle)
re.NoError(err)
suite.assertBundleEqual(re, bundle, b2)
suite.assertBundleEqual(re, urlPrefix+"/placement-rule/foo", b2)

// GetAll again
err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles)
re.NoError(err)
re.Len(bundles, 2)
suite.assertBundleEqual(re, bundles[0], b1)
suite.assertBundleEqual(re, bundles[1], b2)
suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1, b2}, 2)

// Delete
err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/pd", tu.StatusOK(re))
re.NoError(err)

// GetAll again
err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles)
re.NoError(err)
re.Len(bundles, 1)
suite.assertBundleEqual(re, bundles[0], b2)
suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b2}, 1)

// SetAll
b2.Rules = append(b2.Rules, &placement.Rule{GroupID: "foo", ID: "baz", Index: 2, Role: placement.Follower, Count: 1})
Expand All @@ -833,22 +819,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) {
re.NoError(err)

// GetAll again
err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles)
re.NoError(err)
re.Len(bundles, 3)
suite.assertBundleEqual(re, bundles[0], b2)
suite.assertBundleEqual(re, bundles[1], b1)
suite.assertBundleEqual(re, bundles[2], b3)
suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1, b2, b3}, 3)

// Delete using regexp
err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/"+url.PathEscape("foo.*")+"?regexp", tu.StatusOK(re))
re.NoError(err)

// GetAll again
err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles)
re.NoError(err)
re.Len(bundles, 1)
suite.assertBundleEqual(re, bundles[0], b1)
suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1}, 1)

// Set
id := "rule-without-group-id"
Expand All @@ -865,18 +843,11 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) {

b4.ID = id
b4.Rules[0].GroupID = b4.ID

// Get
err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/"+id, &bundle)
re.NoError(err)
suite.assertBundleEqual(re, bundle, b4)
suite.assertBundleEqual(re, urlPrefix+"/placement-rule/"+id, b4)

// GetAll again
err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles)
re.NoError(err)
re.Len(bundles, 2)
suite.assertBundleEqual(re, bundles[0], b1)
suite.assertBundleEqual(re, bundles[1], b4)
suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1, b4}, 2)

// SetAll
b5 := placement.GroupBundle{
Expand All @@ -894,12 +865,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) {
b5.Rules[0].GroupID = b5.ID

// GetAll again
err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles)
re.NoError(err)
re.Len(bundles, 3)
suite.assertBundleEqual(re, bundles[0], b1)
suite.assertBundleEqual(re, bundles[1], b4)
suite.assertBundleEqual(re, bundles[2], b5)
suite.assertBundlesEqual(re, urlPrefix+"/placement-rule", []placement.GroupBundle{b1, b4, b5}, 3)
}

func (suite *ruleTestSuite) TestBundleBadRequest() {
Expand Down Expand Up @@ -1228,9 +1194,35 @@ func (suite *ruleTestSuite) checkLargeRules(cluster *tests.TestCluster) {
suite.postAndCheckRuleBundle(urlPrefix, genBundlesWithRulesNum(etcdutil.MaxEtcdTxnOps*2))
}

func (suite *ruleTestSuite) assertBundleEqual(re *require.Assertions, b1, b2 placement.GroupBundle) {
func (suite *ruleTestSuite) assertBundleEqual(re *require.Assertions, url string, expectedBundle placement.GroupBundle) {
var bundle placement.GroupBundle
tu.Eventually(re, func() bool {
err := tu.ReadGetJSON(re, testDialClient, url, &bundle)
if err != nil {
return false
}
return suite.compareBundle(bundle, expectedBundle)
})
}

func (suite *ruleTestSuite) assertBundlesEqual(re *require.Assertions, url string, expectedBundles []placement.GroupBundle, expectedLen int) {
var bundles []placement.GroupBundle
tu.Eventually(re, func() bool {
return suite.compareBundle(b1, b2)
err := tu.ReadGetJSON(re, testDialClient, url, &bundles)
if err != nil {
return false
}
if len(bundles) != expectedLen {
return false
}
sort.Slice(bundles, func(i, j int) bool { return bundles[i].ID < bundles[j].ID })
sort.Slice(expectedBundles, func(i, j int) bool { return expectedBundles[i].ID < expectedBundles[j].ID })
for i := range bundles {
if !suite.compareBundle(bundles[i], expectedBundles[i]) {
return false
}
}
return true
})
}

Expand Down
Loading

0 comments on commit 4c21000

Please sign in to comment.