Skip to content

Commit

Permalink
Merge branch 'cgroup_mem' of github.com:bufferflies/pd into cgroup_mem
Browse files Browse the repository at this point in the history
  • Loading branch information
bufferflies committed Jan 4, 2024
2 parents 6eba446 + e3b718a commit 4bf9651
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 21 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
Expand All @@ -31,7 +32,6 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
5 changes: 4 additions & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ func (ci *clientInner) requestWithRetry(
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err))
}
// Try to send the request to the other PD followers.
for idx := 0; idx < len(pdAddrs) && idx != leaderAddrIdx; idx++ {
for idx := 0; idx < len(pdAddrs); idx++ {
if idx == leaderAddrIdx {
continue
}
addr = ci.pdAddrs[idx]
err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil {
Expand Down
77 changes: 77 additions & 0 deletions client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"strings"
"testing"

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
Expand All @@ -32,11 +35,13 @@ func TestPDAddrNormalization(t *testing.T) {
re.Len(pdAddrs, 1)
re.Equal(-1, leaderAddrIdx)
re.Contains(pdAddrs[0], httpScheme)
c.Close()
c = NewClient("test-https-pd-addr", []string{"127.0.0.1"}, WithTLSConfig(&tls.Config{}))
pdAddrs, leaderAddrIdx = c.(*client).inner.getPDAddrs()
re.Len(pdAddrs, 1)
re.Equal(-1, leaderAddrIdx)
re.Contains(pdAddrs[0], httpsScheme)
c.Close()
}

// requestChecker is used to check the HTTP request sent by the client.
Expand Down Expand Up @@ -93,3 +98,75 @@ func TestCallerID(t *testing.T) {
c.WithCallerID(expectedVal.Load()).GetRegions(context.Background())
c.Close()
}

func TestRedirectWithMetrics(t *testing.T) {
re := require.New(t)

pdAddrs := []string{"127.0.0.1", "172.0.0.1", "192.0.0.1"}
metricCnt := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "check",
}, []string{"name", ""})

// 1. Test all followers failed, need to send all followers.
httpClient := newHTTPClientWithRequestChecker(func(req *http.Request) error {
if req.URL.Path == Schedulers {
return errors.New("mock error")
}
return nil
})
c := NewClient("test-http-pd-redirect", pdAddrs, WithHTTPClient(httpClient), WithMetrics(metricCnt, nil))
pdAddrs, leaderAddrIdx := c.(*client).inner.getPDAddrs()
re.Equal(-1, leaderAddrIdx)
c.CreateScheduler(context.Background(), "test", 0)
var out dto.Metric
failureCnt, err := c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, networkErrorStatus}...)
re.NoError(err)
failureCnt.Write(&out)
re.Equal(float64(3), out.Counter.GetValue())

// 2. Test the Leader success, just need to send to leader.
httpClient = newHTTPClientWithRequestChecker(func(req *http.Request) error {
// mock leader success.
if !strings.Contains(pdAddrs[0], req.Host) {
return errors.New("mock error")
}
return nil
})
c.(*client).inner.cli = httpClient
// Force to update members info.
c.(*client).inner.leaderAddrIdx = 0
c.(*client).inner.pdAddrs = pdAddrs
c.CreateScheduler(context.Background(), "test", 0)
successCnt, err := c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, ""}...)
re.NoError(err)
successCnt.Write(&out)
re.Equal(float64(1), out.Counter.GetValue())

// 3. Test when the leader fails, needs to be sent to the follower in order,
// and returns directly if one follower succeeds
httpClient = newHTTPClientWithRequestChecker(func(req *http.Request) error {
// mock leader failure.
if strings.Contains(pdAddrs[0], req.Host) {
return errors.New("mock error")
}
return nil
})
c.(*client).inner.cli = httpClient
// Force to update members info.
c.(*client).inner.leaderAddrIdx = 0
c.(*client).inner.pdAddrs = pdAddrs
c.CreateScheduler(context.Background(), "test", 0)
successCnt, err = c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, ""}...)
re.NoError(err)
successCnt.Write(&out)
// only one follower success
re.Equal(float64(2), out.Counter.GetValue())
failureCnt, err = c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, networkErrorStatus}...)
re.NoError(err)
failureCnt.Write(&out)
// leader failure
re.Equal(float64(4), out.Counter.GetValue())

c.Close()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/kms v1.20.8
github.com/aws/aws-sdk-go-v2/service/sts v1.18.7
github.com/axw/gocov v1.0.0
github.com/brianvoe/gofakeit/v6 v6.26.3
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/coreos/go-semver v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI=
github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8=
github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (m *Manager) GetResourceGroup(name string, withStats bool) *ResourceGroup {
m.RLock()
defer m.RUnlock()
if group, ok := m.groups[name]; ok {
return group.Copy(withStats)
return group.Clone(withStats)
}
return nil
}
Expand All @@ -302,7 +302,7 @@ func (m *Manager) GetResourceGroupList(withStats bool) []*ResourceGroup {
m.RLock()
res := make([]*ResourceGroup, 0, len(m.groups))
for _, group := range m.groups {
res = append(res, group.Copy(withStats))
res = append(res, group.Clone(withStats))
}
m.RUnlock()
sort.Slice(res, func(i, j int) bool {
Expand Down
46 changes: 33 additions & 13 deletions pkg/mcs/resourcemanager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
Expand Down Expand Up @@ -46,6 +47,20 @@ type RequestUnitSettings struct {
RU *GroupTokenBucket `json:"r_u,omitempty"`
}

// Clone returns a deep copy of the RequestUnitSettings.
func (rus *RequestUnitSettings) Clone() *RequestUnitSettings {
if rus == nil {
return nil
}
var ru *GroupTokenBucket
if rus.RU != nil {
ru = rus.RU.Clone()
}
return &RequestUnitSettings{
RU: ru,
}
}

// NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket.
func NewRequestUnitSettings(tokenBucket *rmpb.TokenBucket) *RequestUnitSettings {
return &RequestUnitSettings{
Expand All @@ -62,24 +77,29 @@ func (rg *ResourceGroup) String() string {
return string(res)
}

// Copy copies the resource group.
func (rg *ResourceGroup) Copy(withStats bool) *ResourceGroup {
// TODO: use a better way to copy
// Clone copies the resource group.
func (rg *ResourceGroup) Clone(withStats bool) *ResourceGroup {
rg.RLock()
defer rg.RUnlock()
res, err := json.Marshal(rg)
if err != nil {
panic(err)
newRG := &ResourceGroup{
Name: rg.Name,
Mode: rg.Mode,
Priority: rg.Priority,
RUSettings: rg.RUSettings.Clone(),
}
var newRG ResourceGroup
err = json.Unmarshal(res, &newRG)
if err != nil {
panic(err)
if rg.Runaway != nil {
newRG.Runaway = proto.Clone(rg.Runaway).(*rmpb.RunawaySettings)
}
if !withStats {
newRG.RUConsumption = nil

if rg.Background != nil {
newRG.Background = proto.Clone(rg.Background).(*rmpb.BackgroundSettings)
}
return &newRG

if withStats && rg.RUConsumption != nil {
newRG.RUConsumption = proto.Clone(rg.RUConsumption).(*rmpb.Consumption)
}

return newRG
}

func (rg *ResourceGroup) getRUToken() float64 {
Expand Down
40 changes: 39 additions & 1 deletion pkg/mcs/resourcemanager/server/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package server

import (
"encoding/json"
"reflect"
"testing"

"github.com/brianvoe/gofakeit/v6"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
)
Expand All @@ -29,8 +31,44 @@ func TestPatchResourceGroup(t *testing.T) {
re.NoError(err)
err = rg.PatchSettings(patch)
re.NoError(err)
res, err := json.Marshal(rg.Copy(false))
res, err := json.Marshal(rg.Clone(false))
re.NoError(err)
re.Equal(ca.expectJSONString, string(res))
}
}

func resetSizeCache(obj interface{}) {
resetSizeCacheRecursive(reflect.ValueOf(obj))
}

func resetSizeCacheRecursive(value reflect.Value) {
if value.Kind() == reflect.Ptr {
value = value.Elem()
}

if value.Kind() != reflect.Struct {
return
}

for i := 0; i < value.NumField(); i++ {
fieldValue := value.Field(i)
fieldType := value.Type().Field(i)

if fieldType.Name == "XXX_sizecache" && fieldType.Type.Kind() == reflect.Int32 {
fieldValue.SetInt(0)
} else {
resetSizeCacheRecursive(fieldValue)
}
}
}

func TestClone(t *testing.T) {
for i := 0; i <= 10; i++ {
var rg ResourceGroup
gofakeit.Struct(&rg)
// hack to reset XXX_sizecache, gofakeit will random set this field but proto clone will not copy this field.
resetSizeCache(&rg)
rgClone := rg.Clone(true)
require.EqualValues(t, &rg, rgClone)
}
}
26 changes: 23 additions & 3 deletions pkg/mcs/resourcemanager/server/token_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ type GroupTokenBucket struct {
GroupTokenBucketState `json:"state,omitempty"`
}

// Clone returns the deep copy of GroupTokenBucket
func (gtb *GroupTokenBucket) Clone() *GroupTokenBucket {
if gtb == nil {
return nil
}
var settings *rmpb.TokenLimitSettings
if gtb.Settings != nil {
settings = proto.Clone(gtb.Settings).(*rmpb.TokenLimitSettings)
}
stateClone := *gtb.GroupTokenBucketState.Clone()
return &GroupTokenBucket{
Settings: settings,
GroupTokenBucketState: stateClone,
}
}

func (gtb *GroupTokenBucket) setState(state *GroupTokenBucketState) {
gtb.Tokens = state.Tokens
gtb.LastUpdate = state.LastUpdate
Expand Down Expand Up @@ -85,10 +101,14 @@ type GroupTokenBucketState struct {

// Clone returns the copy of GroupTokenBucketState
func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState {
tokenSlots := make(map[uint64]*TokenSlot)
for id, tokens := range gts.tokenSlots {
tokenSlots[id] = tokens
var tokenSlots map[uint64]*TokenSlot
if gts.tokenSlots != nil {
tokenSlots = make(map[uint64]*TokenSlot)
for id, tokens := range gts.tokenSlots {
tokenSlots[id] = tokens
}
}

var lastUpdate *time.Time
if gts.LastUpdate != nil {
newLastUpdate := *gts.LastUpdate
Expand Down
2 changes: 2 additions & 0 deletions tests/integrations/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI=
github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8=
github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
Expand Down
2 changes: 2 additions & 0 deletions tests/integrations/mcs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI=
github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8=
github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
Expand Down
2 changes: 2 additions & 0 deletions tests/integrations/tso/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI=
github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8=
github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
Expand Down

0 comments on commit 4bf9651

Please sign in to comment.