From ed9685a79905446a4c7bf6241c2bc34348d51643 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Wed, 27 Dec 2023 16:54:27 +0800 Subject: [PATCH 1/2] resource_mananger: deep clone resource group (#7623) close tikv/pd#7206 resource_mananger: deep clone resource group Signed-off-by: nolouch Co-authored-by: tongjian <1045931706@qq.com> --- go.mod | 1 + go.sum | 2 + pkg/mcs/resourcemanager/server/manager.go | 4 +- .../resourcemanager/server/resource_group.go | 46 +++++++++++++------ .../server/resource_group_test.go | 40 +++++++++++++++- .../resourcemanager/server/token_buckets.go | 26 +++++++++-- tests/integrations/client/go.sum | 2 + tests/integrations/mcs/go.sum | 2 + tests/integrations/tso/go.sum | 2 + 9 files changed, 106 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 0def4b4b6ea..31ffccd8026 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/kms v1.20.8 github.com/aws/aws-sdk-go-v2/service/sts v1.18.7 github.com/axw/gocov v1.0.0 + github.com/brianvoe/gofakeit/v6 v6.26.3 github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/coreos/go-semver v0.3.0 diff --git a/go.sum b/go.sum index 37354b38f15..02de15a2c2a 100644 --- a/go.sum +++ b/go.sum @@ -72,6 +72,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI= +github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8= +github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index a4b49b38062..bf0ab7e9b24 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -282,7 +282,7 @@ func (m *Manager) GetResourceGroup(name string, withStats bool) *ResourceGroup { m.RLock() defer m.RUnlock() if group, ok := m.groups[name]; ok { - return group.Copy(withStats) + return group.Clone(withStats) } return nil } @@ -302,7 +302,7 @@ func (m *Manager) GetResourceGroupList(withStats bool) []*ResourceGroup { m.RLock() res := make([]*ResourceGroup, 0, len(m.groups)) for _, group := range m.groups { - res = append(res, group.Copy(withStats)) + res = append(res, group.Clone(withStats)) } m.RUnlock() sort.Slice(res, func(i, j int) bool { diff --git a/pkg/mcs/resourcemanager/server/resource_group.go b/pkg/mcs/resourcemanager/server/resource_group.go index 09f8a33de9f..cb76a98327a 100644 --- a/pkg/mcs/resourcemanager/server/resource_group.go +++ b/pkg/mcs/resourcemanager/server/resource_group.go @@ -19,6 +19,7 @@ import ( "encoding/json" "time" + "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" @@ -46,6 +47,20 @@ type RequestUnitSettings struct { RU *GroupTokenBucket `json:"r_u,omitempty"` } +// Clone returns a deep copy of the RequestUnitSettings. +func (rus *RequestUnitSettings) Clone() *RequestUnitSettings { + if rus == nil { + return nil + } + var ru *GroupTokenBucket + if rus.RU != nil { + ru = rus.RU.Clone() + } + return &RequestUnitSettings{ + RU: ru, + } +} + // NewRequestUnitSettings creates a new RequestUnitSettings with the given token bucket. func NewRequestUnitSettings(tokenBucket *rmpb.TokenBucket) *RequestUnitSettings { return &RequestUnitSettings{ @@ -62,24 +77,29 @@ func (rg *ResourceGroup) String() string { return string(res) } -// Copy copies the resource group. -func (rg *ResourceGroup) Copy(withStats bool) *ResourceGroup { - // TODO: use a better way to copy +// Clone copies the resource group. +func (rg *ResourceGroup) Clone(withStats bool) *ResourceGroup { rg.RLock() defer rg.RUnlock() - res, err := json.Marshal(rg) - if err != nil { - panic(err) + newRG := &ResourceGroup{ + Name: rg.Name, + Mode: rg.Mode, + Priority: rg.Priority, + RUSettings: rg.RUSettings.Clone(), } - var newRG ResourceGroup - err = json.Unmarshal(res, &newRG) - if err != nil { - panic(err) + if rg.Runaway != nil { + newRG.Runaway = proto.Clone(rg.Runaway).(*rmpb.RunawaySettings) } - if !withStats { - newRG.RUConsumption = nil + + if rg.Background != nil { + newRG.Background = proto.Clone(rg.Background).(*rmpb.BackgroundSettings) } - return &newRG + + if withStats && rg.RUConsumption != nil { + newRG.RUConsumption = proto.Clone(rg.RUConsumption).(*rmpb.Consumption) + } + + return newRG } func (rg *ResourceGroup) getRUToken() float64 { diff --git a/pkg/mcs/resourcemanager/server/resource_group_test.go b/pkg/mcs/resourcemanager/server/resource_group_test.go index 8df1be6dccc..da5f5c4f0e4 100644 --- a/pkg/mcs/resourcemanager/server/resource_group_test.go +++ b/pkg/mcs/resourcemanager/server/resource_group_test.go @@ -2,8 +2,10 @@ package server import ( "encoding/json" + "reflect" "testing" + "github.com/brianvoe/gofakeit/v6" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/stretchr/testify/require" ) @@ -29,8 +31,44 @@ func TestPatchResourceGroup(t *testing.T) { re.NoError(err) err = rg.PatchSettings(patch) re.NoError(err) - res, err := json.Marshal(rg.Copy(false)) + res, err := json.Marshal(rg.Clone(false)) re.NoError(err) re.Equal(ca.expectJSONString, string(res)) } } + +func resetSizeCache(obj interface{}) { + resetSizeCacheRecursive(reflect.ValueOf(obj)) +} + +func resetSizeCacheRecursive(value reflect.Value) { + if value.Kind() == reflect.Ptr { + value = value.Elem() + } + + if value.Kind() != reflect.Struct { + return + } + + for i := 0; i < value.NumField(); i++ { + fieldValue := value.Field(i) + fieldType := value.Type().Field(i) + + if fieldType.Name == "XXX_sizecache" && fieldType.Type.Kind() == reflect.Int32 { + fieldValue.SetInt(0) + } else { + resetSizeCacheRecursive(fieldValue) + } + } +} + +func TestClone(t *testing.T) { + for i := 0; i <= 10; i++ { + var rg ResourceGroup + gofakeit.Struct(&rg) + // hack to reset XXX_sizecache, gofakeit will random set this field but proto clone will not copy this field. + resetSizeCache(&rg) + rgClone := rg.Clone(true) + require.EqualValues(t, &rg, rgClone) + } +} diff --git a/pkg/mcs/resourcemanager/server/token_buckets.go b/pkg/mcs/resourcemanager/server/token_buckets.go index 05a93c32673..2819b0af421 100644 --- a/pkg/mcs/resourcemanager/server/token_buckets.go +++ b/pkg/mcs/resourcemanager/server/token_buckets.go @@ -49,6 +49,22 @@ type GroupTokenBucket struct { GroupTokenBucketState `json:"state,omitempty"` } +// Clone returns the deep copy of GroupTokenBucket +func (gtb *GroupTokenBucket) Clone() *GroupTokenBucket { + if gtb == nil { + return nil + } + var settings *rmpb.TokenLimitSettings + if gtb.Settings != nil { + settings = proto.Clone(gtb.Settings).(*rmpb.TokenLimitSettings) + } + stateClone := *gtb.GroupTokenBucketState.Clone() + return &GroupTokenBucket{ + Settings: settings, + GroupTokenBucketState: stateClone, + } +} + func (gtb *GroupTokenBucket) setState(state *GroupTokenBucketState) { gtb.Tokens = state.Tokens gtb.LastUpdate = state.LastUpdate @@ -85,10 +101,14 @@ type GroupTokenBucketState struct { // Clone returns the copy of GroupTokenBucketState func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState { - tokenSlots := make(map[uint64]*TokenSlot) - for id, tokens := range gts.tokenSlots { - tokenSlots[id] = tokens + var tokenSlots map[uint64]*TokenSlot + if gts.tokenSlots != nil { + tokenSlots = make(map[uint64]*TokenSlot) + for id, tokens := range gts.tokenSlots { + tokenSlots[id] = tokens + } } + var lastUpdate *time.Time if gts.LastUpdate != nil { newLastUpdate := *gts.LastUpdate diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 9e42dfa900c..36c0d6758ab 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -68,6 +68,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI= +github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8= +github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 6397ab5b017..d7cf59ea8c8 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -68,6 +68,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI= +github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8= +github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index a00d2941582..7070f069960 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -68,6 +68,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI= +github.com/brianvoe/gofakeit/v6 v6.26.3 h1:3ljYrjPwsUNAUFdUIr2jVg5EhKdcke/ZLop7uVg1Er8= +github.com/brianvoe/gofakeit/v6 v6.26.3/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= From cee6e63b9f0694123184825d545eea53932291ae Mon Sep 17 00:00:00 2001 From: Hu# Date: Wed, 27 Dec 2023 20:01:57 +0800 Subject: [PATCH 2/2] client: fix wrong for check (#7622) close tikv/pd#7613 fix wrong for loop check Signed-off-by: husharp --- client/go.mod | 2 +- client/http/client.go | 5 ++- client/http/client_test.go | 77 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/client/go.mod b/client/go.mod index eb49eb674d8..fcb8fd9bfe5 100644 --- a/client/go.mod +++ b/client/go.mod @@ -13,6 +13,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.1 + github.com/prometheus/client_model v0.2.0 github.com/stretchr/testify v1.8.2 go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 @@ -31,7 +32,6 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/client/http/client.go b/client/http/client.go index bf8e9af9bbe..38120d220ee 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -161,7 +161,10 @@ func (ci *clientInner) requestWithRetry( zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err)) } // Try to send the request to the other PD followers. - for idx := 0; idx < len(pdAddrs) && idx != leaderAddrIdx; idx++ { + for idx := 0; idx < len(pdAddrs); idx++ { + if idx == leaderAddrIdx { + continue + } addr = ci.pdAddrs[idx] err = ci.doRequest(ctx, addr, reqInfo, headerOpts...) if err == nil { diff --git a/client/http/client_test.go b/client/http/client_test.go index 3965eb42068..2f7fc68abf4 100644 --- a/client/http/client_test.go +++ b/client/http/client_test.go @@ -21,6 +21,9 @@ import ( "strings" "testing" + "github.com/pingcap/errors" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/atomic" ) @@ -32,11 +35,13 @@ func TestPDAddrNormalization(t *testing.T) { re.Len(pdAddrs, 1) re.Equal(-1, leaderAddrIdx) re.Contains(pdAddrs[0], httpScheme) + c.Close() c = NewClient("test-https-pd-addr", []string{"127.0.0.1"}, WithTLSConfig(&tls.Config{})) pdAddrs, leaderAddrIdx = c.(*client).inner.getPDAddrs() re.Len(pdAddrs, 1) re.Equal(-1, leaderAddrIdx) re.Contains(pdAddrs[0], httpsScheme) + c.Close() } // requestChecker is used to check the HTTP request sent by the client. @@ -93,3 +98,75 @@ func TestCallerID(t *testing.T) { c.WithCallerID(expectedVal.Load()).GetRegions(context.Background()) c.Close() } + +func TestRedirectWithMetrics(t *testing.T) { + re := require.New(t) + + pdAddrs := []string{"127.0.0.1", "172.0.0.1", "192.0.0.1"} + metricCnt := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "check", + }, []string{"name", ""}) + + // 1. Test all followers failed, need to send all followers. + httpClient := newHTTPClientWithRequestChecker(func(req *http.Request) error { + if req.URL.Path == Schedulers { + return errors.New("mock error") + } + return nil + }) + c := NewClient("test-http-pd-redirect", pdAddrs, WithHTTPClient(httpClient), WithMetrics(metricCnt, nil)) + pdAddrs, leaderAddrIdx := c.(*client).inner.getPDAddrs() + re.Equal(-1, leaderAddrIdx) + c.CreateScheduler(context.Background(), "test", 0) + var out dto.Metric + failureCnt, err := c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, networkErrorStatus}...) + re.NoError(err) + failureCnt.Write(&out) + re.Equal(float64(3), out.Counter.GetValue()) + + // 2. Test the Leader success, just need to send to leader. + httpClient = newHTTPClientWithRequestChecker(func(req *http.Request) error { + // mock leader success. + if !strings.Contains(pdAddrs[0], req.Host) { + return errors.New("mock error") + } + return nil + }) + c.(*client).inner.cli = httpClient + // Force to update members info. + c.(*client).inner.leaderAddrIdx = 0 + c.(*client).inner.pdAddrs = pdAddrs + c.CreateScheduler(context.Background(), "test", 0) + successCnt, err := c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, ""}...) + re.NoError(err) + successCnt.Write(&out) + re.Equal(float64(1), out.Counter.GetValue()) + + // 3. Test when the leader fails, needs to be sent to the follower in order, + // and returns directly if one follower succeeds + httpClient = newHTTPClientWithRequestChecker(func(req *http.Request) error { + // mock leader failure. + if strings.Contains(pdAddrs[0], req.Host) { + return errors.New("mock error") + } + return nil + }) + c.(*client).inner.cli = httpClient + // Force to update members info. + c.(*client).inner.leaderAddrIdx = 0 + c.(*client).inner.pdAddrs = pdAddrs + c.CreateScheduler(context.Background(), "test", 0) + successCnt, err = c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, ""}...) + re.NoError(err) + successCnt.Write(&out) + // only one follower success + re.Equal(float64(2), out.Counter.GetValue()) + failureCnt, err = c.(*client).inner.requestCounter.GetMetricWithLabelValues([]string{createSchedulerName, networkErrorStatus}...) + re.NoError(err) + failureCnt.Write(&out) + // leader failure + re.Equal(float64(4), out.Counter.GetValue()) + + c.Close() +}