From 39fb207563d5eb4f6e15cfc71e1236f449f20b8e Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 22 Dec 2021 14:57:47 +0800 Subject: [PATCH 01/19] api: Revert for showing State in StoreInfo (#4485) * Revert "fix #3816: recovered pdctl/store_test && add comment to function onlu used by test" This reverts commit acc531f55207446a4160905973aef76cffe91569. Signed-off-by: Cabinfever_B * Revert "fix #3816: fix code format problem" This reverts commit 0f0f65a17461904b5933bfb971a0e8be0a909654. Signed-off-by: Cabinfever_B * Revert "fix #3816: fix code format problem" This reverts commit b36673f52d2ba9e1861ecdab60f043df468902af. Signed-off-by: Cabinfever_B * Revert "fix #3816: fix code format problem" This reverts commit 8c281746bfcbbf9ff2807449c0d9612761d8e540. Signed-off-by: Cabinfever_B * Revert "Fix #3816 delete unnecessary field and fix unit test" This reverts commit acc942e948114939f88fd740160e270cd98fd47b. Signed-off-by: Cabinfever_B * revert #4334 Signed-off-by: Cabinfever_B * revert #4334: empty commit for add signed-off-by Signed-off-by: Cabinfever_B Co-authored-by: JmPotato --- server/api/store.go | 57 ++++------------------------- server/api/store_test.go | 63 +++++---------------------------- server/api/trend.go | 4 +-- tests/cluster.go | 2 +- tests/pdctl/helper.go | 3 +- tests/pdctl/store/store_test.go | 6 ++-- 6 files changed, 22 insertions(+), 113 deletions(-) diff --git a/server/api/store.go b/server/api/store.go index 50d9297f376..b3caca88643 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -36,56 +36,10 @@ import ( "github.com/unrolled/render" ) -// MetaStore contains meta information about a store which needed to show. +// MetaStore contains meta information about a store. type MetaStore struct { - StoreID uint64 `json:"id,omitempty"` - Address string `json:"address,omitempty"` - Labels []*metapb.StoreLabel `json:"labels,omitempty"` - Version string `json:"version,omitempty"` - PeerAddress string `json:"peer_address,omitempty"` - StatusAddress string `json:"status_address,omitempty"` - GitHash string `json:"git_hash,omitempty"` - StartTimestamp int64 `json:"start_timestamp,omitempty"` - DeployPath string `json:"deploy_path,omitempty"` - LastHeartbeat int64 `json:"last_heartbeat,omitempty"` - PhysicallyDestroyed bool `json:"physically_destroyed,omitempty"` - StateName string `json:"state_name"` -} - -// NewMetaStore convert metapb.Store to MetaStore without State -func NewMetaStore(store *metapb.Store, stateName string) *MetaStore { - metaStore := &MetaStore{StateName: stateName} - metaStore.StoreID = store.GetId() - metaStore.Address = store.GetAddress() - metaStore.Labels = store.GetLabels() - metaStore.Version = store.GetVersion() - metaStore.PeerAddress = store.GetPeerAddress() - metaStore.StatusAddress = store.GetStatusAddress() - metaStore.GitHash = store.GetGitHash() - metaStore.StartTimestamp = store.GetStartTimestamp() - metaStore.DeployPath = store.GetDeployPath() - metaStore.LastHeartbeat = store.GetLastHeartbeat() - metaStore.PhysicallyDestroyed = store.GetPhysicallyDestroyed() - return metaStore -} - -// ConvertToMetapbStore convert to metapb.Store -// For test only. -func (m *MetaStore) ConvertToMetapbStore() *metapb.Store { - metapbStore := &metapb.Store{ - Id: m.StoreID, - Address: m.Address, - State: metapb.StoreState(metapb.StoreState_value[m.StateName]), - Labels: m.Labels, - Version: m.Version, - PeerAddress: m.PeerAddress, - StatusAddress: m.StatusAddress, - GitHash: m.GitHash, - StartTimestamp: m.StartTimestamp, - DeployPath: m.DeployPath, - LastHeartbeat: m.LastHeartbeat, - } - return metapbStore + *metapb.Store + StateName string `json:"state_name"` } // StoreStatus contains status about a store. @@ -123,7 +77,10 @@ const ( func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo { s := &StoreInfo{ - Store: NewMetaStore(store.GetMeta(), store.GetState().String()), + Store: &MetaStore{ + Store: store.GetMeta(), + StateName: store.GetState().String(), + }, Status: &StoreStatus{ Capacity: typeutil.ByteSize(store.GetCapacity()), Available: typeutil.ByteSize(store.GetAvailable()), diff --git a/server/api/store_test.go b/server/api/store_test.go index c59d7dbfcc6..80d741ceecf 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" - "github.com/tikv/pd/server/versioninfo" ) var _ = Suite(&testStoreSuite{}) @@ -115,8 +114,7 @@ func checkStoresInfo(c *C, ss []*StoreInfo, want []*metapb.Store) { } } for _, s := range ss { - metapbStore := s.Store.ConvertToMetapbStore() - obtained := proto.Clone(metapbStore).(*metapb.Store) + obtained := proto.Clone(s.Store.Store).(*metapb.Store) expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store) // Ignore lastHeartbeat obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0 @@ -168,51 +166,6 @@ func (s *testStoreSuite) TestStoreGet(c *C) { checkStoresInfo(c, []*StoreInfo{info}, s.stores[:1]) } -func (s *testStoreSuite) TestStoreInfoGet(c *C) { - timeStamp := time.Now().Unix() - url := fmt.Sprintf("%s/store/1112", s.urlPrefix) - _, errPut := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ - Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, - Store: &metapb.Store{ - Id: 1112, - Address: fmt.Sprintf("tikv%d", 1112), - State: 1, - Labels: nil, - Version: versioninfo.MinSupportedVersion(versioninfo.Version5_0).String(), - StatusAddress: fmt.Sprintf("tikv%d", 1112), - GitHash: "45ce5b9584d618bc777877bea77cb94f61b8410", - StartTimestamp: timeStamp, - DeployPath: "/home/test", - LastHeartbeat: timeStamp, - }, - }) - c.Assert(errPut, IsNil) - - info := new(StoreInfo) - - err := readJSON(testDialClient, url, info) - c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String()) - c.Assert(info.Store.StoreID, Equals, uint64(1112)) - c.Assert(info.Store.Address, Equals, "tikv1112") - c.Assert(info.Store.Version, Equals, versioninfo.MinSupportedVersion(versioninfo.Version5_0).String()) - c.Assert(info.Store.StatusAddress, Equals, fmt.Sprintf("tikv%d", 1112)) - c.Assert(info.Store.GitHash, Equals, "45ce5b9584d618bc777877bea77cb94f61b8410") - c.Assert(info.Store.StartTimestamp, Equals, timeStamp) - c.Assert(info.Store.DeployPath, Equals, "/home/test") - c.Assert(info.Store.LastHeartbeat, Equals, timeStamp) - - resp, err := testDialClient.Get(url) - c.Assert(err, IsNil) - defer resp.Body.Close() - b, err := io.ReadAll(resp.Body) - c.Assert(err, IsNil) - str := string(b) - c.Assert(strings.Contains(str, "\"state\""), Equals, false) - s.cleanup() - s.SetUpSuite(c) -} - func (s *testStoreSuite) TestStoreLabel(c *C) { url := fmt.Sprintf("%s/store/1", s.urlPrefix) var info StoreInfo @@ -309,7 +262,7 @@ func (s *testStoreSuite) TestStoreDelete(c *C) { err := readJSON(testDialClient, url, store) c.Assert(err, IsNil) c.Assert(store.Store.PhysicallyDestroyed, IsFalse) - c.Assert(store.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(store.Store.State, Equals, metapb.StoreState_Offline) // up store success because it is offline but not physically destroyed status := requestStatusBody(c, testDialClient, http.MethodPost, fmt.Sprintf("%s/state?state=Up", url)) @@ -320,7 +273,7 @@ func (s *testStoreSuite) TestStoreDelete(c *C) { store = new(StoreInfo) err = readJSON(testDialClient, url, store) c.Assert(err, IsNil) - c.Assert(store.Store.StateName, Equals, metapb.StoreState_Up.String()) + c.Assert(store.Store.State, Equals, metapb.StoreState_Up) c.Assert(store.Store.PhysicallyDestroyed, IsFalse) // offline store with physically destroyed @@ -328,7 +281,7 @@ func (s *testStoreSuite) TestStoreDelete(c *C) { c.Assert(status, Equals, http.StatusOK) err = readJSON(testDialClient, url, store) c.Assert(err, IsNil) - c.Assert(store.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(store.Store.State, Equals, metapb.StoreState_Offline) c.Assert(store.Store.PhysicallyDestroyed, IsTrue) // try to up store again failed because it is physically destroyed @@ -344,7 +297,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) { info := StoreInfo{} err := readJSON(testDialClient, url, &info) c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Up.String()) + c.Assert(info.Store.State, Equals, metapb.StoreState_Up) // Set to Offline. info = StoreInfo{} @@ -352,7 +305,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) { c.Assert(err, IsNil) err = readJSON(testDialClient, url, &info) c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(info.Store.State, Equals, metapb.StoreState_Offline) // store not found info = StoreInfo{} @@ -367,7 +320,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) { c.Assert(err, NotNil) err = readJSON(testDialClient, url, &info) c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(info.Store.State, Equals, metapb.StoreState_Offline) } // Set back to Up. @@ -376,7 +329,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) { c.Assert(err, IsNil) err = readJSON(testDialClient, url, &info) c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Up.String()) + c.Assert(info.Store.State, Equals, metapb.StoreState_Up) } func (s *testStoreSuite) TestUrlStoreFilter(c *C) { diff --git a/server/api/trend.go b/server/api/trend.go index 564319af163..542d2d9e966 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -130,8 +130,8 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { for _, store := range stores { info := newStoreInfo(h.svr.GetScheduleConfig(), store) s := trendStore{ - ID: info.Store.StoreID, - Address: info.Store.Address, + ID: info.Store.GetId(), + Address: info.Store.GetAddress(), StateName: info.Store.StateName, Capacity: uint64(info.Status.Capacity), Available: uint64(info.Status.Available), diff --git a/tests/cluster.go b/tests/cluster.go index c0768bec9d8..4d42ee38806 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -361,7 +361,7 @@ func (s *TestServer) GetStoreRegions(storeID uint64) []*core.RegionInfo { func (s *TestServer) BootstrapCluster() error { bootstrapReq := &pdpb.BootstrapRequest{ Header: &pdpb.RequestHeader{ClusterId: s.GetClusterID()}, - Store: &metapb.Store{Id: 1, Address: "mock://1", LastHeartbeat: time.Now().UnixNano()}, + Store: &metapb.Store{Id: 1, Address: "mock://1"}, Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, } _, err := s.grpcServer.Bootstrap(context.Background(), bootstrapReq) diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 67d991871b1..66e675edb7f 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -51,8 +51,7 @@ func CheckStoresInfo(c *check.C, stores []*api.StoreInfo, want []*metapb.Store) } } for _, s := range stores { - metapbStore := s.Store.ConvertToMetapbStore() - obtained := proto.Clone(metapbStore).(*metapb.Store) + obtained := proto.Clone(s.Store.Store).(*metapb.Store) expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store) // Ignore lastHeartbeat obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0 diff --git a/tests/pdctl/store/store_test.go b/tests/pdctl/store/store_test.go index 59b9927bb93..e89115ef2c6 100644 --- a/tests/pdctl/store/store_test.go +++ b/tests/pdctl/store/store_test.go @@ -248,7 +248,7 @@ func (s *storeTestSuite) TestStore(c *C) { c.Assert(ok, IsFalse) // store delete command - c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Up.String()) + c.Assert(storeInfo.Store.State, Equals, metapb.StoreState_Up) args = []string{"-u", pdAddr, "store", "delete", "1"} _, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) @@ -256,7 +256,7 @@ func (s *storeTestSuite) TestStore(c *C) { output, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) c.Assert(json.Unmarshal(output, &storeInfo), IsNil) - c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(storeInfo.Store.State, Equals, metapb.StoreState_Offline) // store check status args = []string{"-u", pdAddr, "store", "check", "Offline"} @@ -285,7 +285,7 @@ func (s *storeTestSuite) TestStore(c *C) { output, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) c.Assert(json.Unmarshal(output, &storeInfo), IsNil) - c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(storeInfo.Store.State, Equals, metapb.StoreState_Offline) // store remove-tombstone args = []string{"-u", pdAddr, "store", "remove-tombstone"} From d6daf97305a4cf2dbfc7831e7313833e6eb90ab8 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 22 Dec 2021 18:15:08 +0800 Subject: [PATCH 02/19] close #4490 Signed-off-by: Cabinfever_B --- pkg/apiutil/apiutil.go | 12 +++++ pkg/apiutil/serverapi/middleware.go | 26 ++++++++++ server/api/server.go | 1 + server/self_protection.go | 80 +++++++++++++++++++++++++++++ server/server.go | 8 +++ server/server_test.go | 42 +++++++++++++++ tests/pdctl/global.go | 59 +++++++++++++++++++++ tests/server/api/api_test.go | 32 ++++++++++++ 8 files changed, 260 insertions(+) create mode 100644 server/self_protection.go create mode 100644 tests/pdctl/global.go diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index 2c61ed45f28..89c89f39e8d 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -21,6 +21,7 @@ import ( "net/http" "strconv" + "github.com/gorilla/mux" "github.com/pingcap/errcode" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -127,3 +128,14 @@ func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) { rd.JSON(w, http.StatusInternalServerError, err.Error()) } } + +// GetHTTPRouteName return mux route name registered for ServiceName +func GetHTTPRouteName(req *http.Request) (string, bool) { + route := mux.CurrentRoute(req) + if route != nil { + if route.GetName() != "" { + return route.GetName(), true + } + } + return "", false +} diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index 973775d1214..971e1a5d78c 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -20,6 +20,7 @@ import ( "net/url" "strings" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" @@ -79,6 +80,31 @@ func IsServiceAllowed(s *server.Server, group server.ServiceGroup) bool { return false } +type selfProtector struct { + s *server.Server +} + +// NewSelfProtector handle self-protection +func NewSelfProtector(s *server.Server) negroni.Handler { + return &selfProtector{s: s} +} + +func (protector *selfProtector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { + handler := protector.s.GetSelfProtectionHandler() + + failpoint.Inject("addSelfProtectionHTTPHeader", func() { + w.Header().Add("self-protection", "ok") + }) + + if handler == nil || handler.HandleHTTPSelfProtection(r) { + next(w, r) + } else { + // current plan will only deny request when over the speed limit + // todo: support more HTTP Status code + http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests) + } +} + type redirector struct { s *server.Server } diff --git a/server/api/server.go b/server/api/server.go index 528c20a08f3..df2ed9ddf56 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -37,6 +37,7 @@ func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.S router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr), + serverapi.NewSelfProtector(svr), negroni.Wrap(r)), ) diff --git a/server/self_protection.go b/server/self_protection.go new file mode 100644 index 00000000000..ceffdd9fdbe --- /dev/null +++ b/server/self_protection.go @@ -0,0 +1,80 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + + "github.com/tikv/pd/pkg/apiutil" +) + +// SelfProtectionHandler is a framework to handle self protection mechanism +// Self-protection granularity is a logical service +type SelfProtectionHandler struct { + // ServiceHandlers is a map to store handler owned by different services + ServiceHandlers map[string]*serviceSelfProtectionHandler +} + +// NewSelfProtectionHandler returns a new SelfProtectionHandler with config +func NewSelfProtectionHandler(server *Server) *SelfProtectionHandler { + handler := &SelfProtectionHandler{ + ServiceHandlers: make(map[string]*serviceSelfProtectionHandler), + } + return handler +} + +// HandleHTTPSelfProtection is used to handle http api self protection +func (h *SelfProtectionHandler) HandleHTTPSelfProtection(req *http.Request) bool { + serviceName, findName := apiutil.GetHTTPRouteName(req) + // if path is not registered in router, go on process + if !findName { + return true + } + + serviceHandler, ok := h.ServiceHandlers[serviceName] + // if there is no service handler, go on process + if !ok { + return true + } + + httpHandler := &HTTPServiceSelfProtectionHandler{ + req: req, + handler: serviceHandler, + } + return httpHandler.Handle() +} + +// ServiceSelfProtectionHandler is a interface for define self-protection handler by service granularity +type ServiceSelfProtectionHandler interface { + Handle() bool +} + +// HTTPServiceSelfProtectionHandler implement ServiceSelfProtectionHandler to handle http +type HTTPServiceSelfProtectionHandler struct { + req *http.Request + handler *serviceSelfProtectionHandler +} + +// Handle implement ServiceSelfProtectionHandler defined function +func (h *HTTPServiceSelfProtectionHandler) Handle() bool { + // to be implemented + return true +} + +// serviceSelfProtectionHandler is a handler which is independent communication mode +type serviceSelfProtectionHandler struct { + // todo APIRateLimiter + // todo AuditLogger +} diff --git a/server/server.go b/server/server.go index b4aec089f27..7156f5dd829 100644 --- a/server/server.go +++ b/server/server.go @@ -149,6 +149,8 @@ type Server struct { // tsoDispatcher is used to dispatch different TSO requests to // the corresponding forwarding TSO channel. tsoDispatcher sync.Map /* Store as map[string]chan *tsoRequest */ + // SelfProtectionHandler is used for PD self-pretection + selfProtectionHandler *SelfProtectionHandler } // HandlerBuilder builds a server HTTP handler. @@ -238,6 +240,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha } s.handler = newHandler(s) + s.selfProtectionHandler = NewSelfProtectionHandler(s) // Adjust etcd config. etcdCfg, err := s.cfg.GenEmbedEtcdConfig() @@ -668,6 +671,11 @@ func (s *Server) stopRaftCluster() { s.cluster.Stop() } +// GetSelfProtectionHandler returns the selfProt ectionHandler +func (s *Server) GetSelfProtectionHandler() *SelfProtectionHandler { + return s.selfProtectionHandler +} + // GetAddr returns the server urls for clients. func (s *Server) GetAddr() string { return s.cfg.AdvertiseClientUrls diff --git a/server/server_test.go b/server/server_test.go index 25f63f87ee3..ae71e54efa4 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -21,11 +21,14 @@ import ( "net/http" "testing" + "github.com/gorilla/mux" . "github.com/pingcap/check" + "github.com/tikv/pd/pkg/apiutil" "github.com/tikv/pd/pkg/assertutil" "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server/config" + "github.com/urfave/negroni" "go.etcd.io/etcd/embed" "go.etcd.io/etcd/pkg/types" "go.uber.org/goleak" @@ -235,3 +238,42 @@ func (s *testServerHandlerSuite) TestRegisterServerHandler(c *C) { bodyString := string(bodyBytes) c.Assert(bodyString, Equals, "Hello World\n") } + +func (s *testServerHandlerSuite) TestMuxRouterName(c *C) { + handler := func(ctx context.Context, s *Server) (http.Handler, ServiceGroup, error) { + r := mux.NewRouter() + r.HandleFunc("/pd/apis/mok/v1/router", func(w http.ResponseWriter, r *http.Request) { + RouterName, _ := apiutil.GetHTTPRouteName(r) + fmt.Fprintln(w, RouterName) + }).Name("Mux Router") + info := ServiceGroup{ + Name: "mok", + Version: "v1", + } + router := mux.NewRouter() + router.PathPrefix("/pd").Handler(negroni.New( + negroni.Wrap(r)), + ) + return router, info, nil + } + cfg := NewTestSingleConfig(checkerWithNilAssert(c)) + ctx, cancel := context.WithCancel(context.Background()) + svr, err := CreateServer(ctx, cfg, handler) + c.Assert(err, IsNil) + defer func() { + cancel() + svr.Close() + testutil.CleanServer(svr.cfg.DataDir) + }() + err = svr.Run() + c.Assert(err, IsNil) + resp, err := http.Get(fmt.Sprintf("%s/pd/apis/mok/v1/router", svr.GetAddr())) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + c.Assert(err, IsNil) + bodyBytes, err := io.ReadAll(resp.Body) + resp.Body.Close() + c.Assert(err, IsNil) + bodyString := string(bodyBytes) + c.Assert(bodyString, Equals, "Mux Router\n") +} diff --git a/tests/pdctl/global.go b/tests/pdctl/global.go new file mode 100644 index 00000000000..4e2c15f2647 --- /dev/null +++ b/tests/pdctl/global.go @@ -0,0 +1,59 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdctl + +import ( + "context" + "encoding/json" + "testing" + + . "github.com/pingcap/check" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/api" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tools/pd-ctl/pdctl" + cmd "github.com/tikv/pd/tools/pd-ctl/pdctl" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&globalTestSuite{}) + +type globalTestSuite struct{} + +func (s *globalTestSuite) SetUpSuite(c *C) { + server.EnableZap = true +} + +func (s *globalTestSuite) TestHealth(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1) + c.Assert(err, IsNil) + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + pdAddr := cluster.GetConfig().GetClientURL() + cmd := cmd.GetRootCmd() + + args := []string{"-u", pdAddr, "health"} + output, err := pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + h := make([]api.Health, len(healths)) + c.Assert(json.Unmarshal(output, &h), IsNil) + c.Assert(err, IsNil) + c.Assert(h, DeepEquals, healths) +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 736063ae0a5..ea8fe8ecd7f 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -22,6 +22,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/tikv/pd/pkg/apiutil/serverapi" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/pkg/typeutil" @@ -110,6 +111,37 @@ func (s *serverTestSuite) TestReconnect(c *C) { } } +var _ = Suite(&testSelfProtectorSuite{}) + +type testSelfProtectorSuite struct { + cleanup func() + cluster *tests.TestCluster +} + +func (s *testSelfProtectorSuite) SetUpSuite(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + server.EnableZap = true + s.cleanup = cancel + cluster, err := tests.NewTestCluster(ctx, 3) + c.Assert(err, IsNil) + c.Assert(cluster.RunInitialServers(), IsNil) + c.Assert(cluster.WaitLeader(), Not(HasLen), 0) + s.cluster = cluster +} + +func (s *testSelfProtectorSuite) TearDownSuite(c *C) { + s.cleanup() + s.cluster.Destroy() +} + +func (s *testSelfProtectorSuite) TestSelfProtect(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader", "return(true)"), IsNil) + leader := s.cluster.GetServer(s.cluster.GetLeader()) + header := mustRequestSuccess(c, leader.GetServer()) + c.Assert(header.Get("self-protection"), Equals, "ok") + c.Assert(failpoint.Disable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader"), IsNil) +} + var _ = Suite(&testRedirectorSuite{}) type testRedirectorSuite struct { From 43245bc21f0baaa7b2b4b65f6a056ea8c274999d Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 22 Dec 2021 18:18:40 +0800 Subject: [PATCH 03/19] Revert "close #4483: fix failpoint" This reverts commit 9fef69a1a693e8a4de0a44736db4490ef9a3d7e2. Signed-off-by: Cabinfever_B --- pkg/apiutil/serverapi/middleware.go | 9 ++++----- tests/server/api/api_test.go | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index 971e1a5d78c..46952310c72 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -91,12 +91,11 @@ func NewSelfProtector(s *server.Server) negroni.Handler { func (protector *selfProtector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { handler := protector.s.GetSelfProtectionHandler() - - failpoint.Inject("addSelfProtectionHTTPHeader", func() { - w.Header().Add("self-protection", "ok") - }) - if handler == nil || handler.HandleHTTPSelfProtection(r) { + failpoint.Inject("addSelfProtectionHTTPHeader", func() { + w.Header().Add("self-protection", "ok") + }) + next(w, r) } else { // current plan will only deny request when over the speed limit diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index ea8fe8ecd7f..b06f70fc6a8 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -135,9 +135,9 @@ func (s *testSelfProtectorSuite) TearDownSuite(c *C) { } func (s *testSelfProtectorSuite) TestSelfProtect(c *C) { - c.Assert(failpoint.Enable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader", "return(true)"), IsNil) leader := s.cluster.GetServer(s.cluster.GetLeader()) header := mustRequestSuccess(c, leader.GetServer()) + c.Assert(failpoint.Enable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader", `return(true)`), IsNil) c.Assert(header.Get("self-protection"), Equals, "ok") c.Assert(failpoint.Disable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader"), IsNil) } From 765c431eeb5e0247b78cfd2847e97d48f33fd278 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 22 Dec 2021 18:18:43 +0800 Subject: [PATCH 04/19] Revert "close #4483" This reverts commit dd14b0b699d2535280a5aecbfb0e688fe4d70a83. Signed-off-by: Cabinfever_B --- pkg/apiutil/serverapi/middleware.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index 46952310c72..a2f0e546e29 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -92,6 +92,7 @@ func NewSelfProtector(s *server.Server) negroni.Handler { func (protector *selfProtector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { handler := protector.s.GetSelfProtectionHandler() if handler == nil || handler.HandleHTTPSelfProtection(r) { + failpoint.Inject("addSelfProtectionHTTPHeader", func() { w.Header().Add("self-protection", "ok") }) From c25d78a747316eee76d59b2bb101663b7bd0d635 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 22 Dec 2021 18:18:44 +0800 Subject: [PATCH 05/19] Revert "close #4483: add failpoint" This reverts commit 810f1f5fa17c71c660b3df3f269c8e9a2fabffab. Signed-off-by: Cabinfever_B --- pkg/apiutil/serverapi/middleware.go | 9 ++------- server/self_protection.go | 2 +- server/server.go | 2 +- tests/server/api/api_test.go | 8 ++++---- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index a2f0e546e29..f208477904f 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -20,7 +20,6 @@ import ( "net/url" "strings" - "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" @@ -92,15 +91,11 @@ func NewSelfProtector(s *server.Server) negroni.Handler { func (protector *selfProtector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { handler := protector.s.GetSelfProtectionHandler() if handler == nil || handler.HandleHTTPSelfProtection(r) { - - failpoint.Inject("addSelfProtectionHTTPHeader", func() { - w.Header().Add("self-protection", "ok") - }) + // One line below is used for middleware testing only + w.Header().Add("self-protection", "ok") next(w, r) } else { - // current plan will only deny request when over the speed limit - // todo: support more HTTP Status code http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests) } } diff --git a/server/self_protection.go b/server/self_protection.go index ceffdd9fdbe..06f16f2d9aa 100644 --- a/server/self_protection.go +++ b/server/self_protection.go @@ -23,7 +23,7 @@ import ( // SelfProtectionHandler is a framework to handle self protection mechanism // Self-protection granularity is a logical service type SelfProtectionHandler struct { - // ServiceHandlers is a map to store handler owned by different services + // ServiceHandlers a ServiceHandlers map[string]*serviceSelfProtectionHandler } diff --git a/server/server.go b/server/server.go index 7156f5dd829..531fd6fd55a 100644 --- a/server/server.go +++ b/server/server.go @@ -671,7 +671,7 @@ func (s *Server) stopRaftCluster() { s.cluster.Stop() } -// GetSelfProtectionHandler returns the selfProt ectionHandler +// GetAddr returns the server urls for clients. func (s *Server) GetSelfProtectionHandler() *SelfProtectionHandler { return s.selfProtectionHandler } diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index b06f70fc6a8..76d98c80724 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -22,7 +22,6 @@ import ( "time" . "github.com/pingcap/check" - "github.com/pingcap/failpoint" "github.com/tikv/pd/pkg/apiutil/serverapi" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/pkg/typeutil" @@ -122,7 +121,10 @@ func (s *testSelfProtectorSuite) SetUpSuite(c *C) { ctx, cancel := context.WithCancel(context.Background()) server.EnableZap = true s.cleanup = cancel - cluster, err := tests.NewTestCluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.TickInterval = typeutil.Duration{Duration: 50 * time.Millisecond} + conf.ElectionInterval = typeutil.Duration{Duration: 250 * time.Millisecond} + }) c.Assert(err, IsNil) c.Assert(cluster.RunInitialServers(), IsNil) c.Assert(cluster.WaitLeader(), Not(HasLen), 0) @@ -137,9 +139,7 @@ func (s *testSelfProtectorSuite) TearDownSuite(c *C) { func (s *testSelfProtectorSuite) TestSelfProtect(c *C) { leader := s.cluster.GetServer(s.cluster.GetLeader()) header := mustRequestSuccess(c, leader.GetServer()) - c.Assert(failpoint.Enable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader", `return(true)`), IsNil) c.Assert(header.Get("self-protection"), Equals, "ok") - c.Assert(failpoint.Disable("github.com/tikv/pd/pkg/apiutil/serverapi/addSelfProtectionHTTPHeader"), IsNil) } var _ = Suite(&testRedirectorSuite{}) From 20682521e70d2eabfcc35bd16d1109bfe4bce3d2 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 22 Dec 2021 18:18:45 +0800 Subject: [PATCH 06/19] Revert "close #4483: add integration test" This reverts commit 974deb553c2fbbcc03aa50947e639012a3f408f4. Signed-off-by: Cabinfever_B --- pkg/apiutil/serverapi/middleware.go | 3 --- tests/server/api/api_test.go | 32 ----------------------------- 2 files changed, 35 deletions(-) diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index f208477904f..8f82b061948 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -91,9 +91,6 @@ func NewSelfProtector(s *server.Server) negroni.Handler { func (protector *selfProtector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { handler := protector.s.GetSelfProtectionHandler() if handler == nil || handler.HandleHTTPSelfProtection(r) { - // One line below is used for middleware testing only - w.Header().Add("self-protection", "ok") - next(w, r) } else { http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests) diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 76d98c80724..736063ae0a5 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -110,38 +110,6 @@ func (s *serverTestSuite) TestReconnect(c *C) { } } -var _ = Suite(&testSelfProtectorSuite{}) - -type testSelfProtectorSuite struct { - cleanup func() - cluster *tests.TestCluster -} - -func (s *testSelfProtectorSuite) SetUpSuite(c *C) { - ctx, cancel := context.WithCancel(context.Background()) - server.EnableZap = true - s.cleanup = cancel - cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { - conf.TickInterval = typeutil.Duration{Duration: 50 * time.Millisecond} - conf.ElectionInterval = typeutil.Duration{Duration: 250 * time.Millisecond} - }) - c.Assert(err, IsNil) - c.Assert(cluster.RunInitialServers(), IsNil) - c.Assert(cluster.WaitLeader(), Not(HasLen), 0) - s.cluster = cluster -} - -func (s *testSelfProtectorSuite) TearDownSuite(c *C) { - s.cleanup() - s.cluster.Destroy() -} - -func (s *testSelfProtectorSuite) TestSelfProtect(c *C) { - leader := s.cluster.GetServer(s.cluster.GetLeader()) - header := mustRequestSuccess(c, leader.GetServer()) - c.Assert(header.Get("self-protection"), Equals, "ok") -} - var _ = Suite(&testRedirectorSuite{}) type testRedirectorSuite struct { From 45b95d820c4ee4fe337f95309572eddf261feb29 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 22 Dec 2021 18:18:46 +0800 Subject: [PATCH 07/19] Revert "close #4373 : add framework" This reverts commit 01fb757473ea7b2ce342dd2bca66c2a76266b89b. Signed-off-by: Cabinfever_B --- pkg/apiutil/apiutil.go | 12 ----- pkg/apiutil/serverapi/middleware.go | 18 ------- server/api/server.go | 1 - server/self_protection.go | 80 ----------------------------- server/server.go | 8 --- server/server_test.go | 42 --------------- 6 files changed, 161 deletions(-) delete mode 100644 server/self_protection.go diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index 89c89f39e8d..2c61ed45f28 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -21,7 +21,6 @@ import ( "net/http" "strconv" - "github.com/gorilla/mux" "github.com/pingcap/errcode" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -128,14 +127,3 @@ func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) { rd.JSON(w, http.StatusInternalServerError, err.Error()) } } - -// GetHTTPRouteName return mux route name registered for ServiceName -func GetHTTPRouteName(req *http.Request) (string, bool) { - route := mux.CurrentRoute(req) - if route != nil { - if route.GetName() != "" { - return route.GetName(), true - } - } - return "", false -} diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index 8f82b061948..973775d1214 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -79,24 +79,6 @@ func IsServiceAllowed(s *server.Server, group server.ServiceGroup) bool { return false } -type selfProtector struct { - s *server.Server -} - -// NewSelfProtector handle self-protection -func NewSelfProtector(s *server.Server) negroni.Handler { - return &selfProtector{s: s} -} - -func (protector *selfProtector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { - handler := protector.s.GetSelfProtectionHandler() - if handler == nil || handler.HandleHTTPSelfProtection(r) { - next(w, r) - } else { - http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests) - } -} - type redirector struct { s *server.Server } diff --git a/server/api/server.go b/server/api/server.go index df2ed9ddf56..528c20a08f3 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -37,7 +37,6 @@ func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.S router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr), - serverapi.NewSelfProtector(svr), negroni.Wrap(r)), ) diff --git a/server/self_protection.go b/server/self_protection.go deleted file mode 100644 index 06f16f2d9aa..00000000000 --- a/server/self_protection.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2021 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "net/http" - - "github.com/tikv/pd/pkg/apiutil" -) - -// SelfProtectionHandler is a framework to handle self protection mechanism -// Self-protection granularity is a logical service -type SelfProtectionHandler struct { - // ServiceHandlers a - ServiceHandlers map[string]*serviceSelfProtectionHandler -} - -// NewSelfProtectionHandler returns a new SelfProtectionHandler with config -func NewSelfProtectionHandler(server *Server) *SelfProtectionHandler { - handler := &SelfProtectionHandler{ - ServiceHandlers: make(map[string]*serviceSelfProtectionHandler), - } - return handler -} - -// HandleHTTPSelfProtection is used to handle http api self protection -func (h *SelfProtectionHandler) HandleHTTPSelfProtection(req *http.Request) bool { - serviceName, findName := apiutil.GetHTTPRouteName(req) - // if path is not registered in router, go on process - if !findName { - return true - } - - serviceHandler, ok := h.ServiceHandlers[serviceName] - // if there is no service handler, go on process - if !ok { - return true - } - - httpHandler := &HTTPServiceSelfProtectionHandler{ - req: req, - handler: serviceHandler, - } - return httpHandler.Handle() -} - -// ServiceSelfProtectionHandler is a interface for define self-protection handler by service granularity -type ServiceSelfProtectionHandler interface { - Handle() bool -} - -// HTTPServiceSelfProtectionHandler implement ServiceSelfProtectionHandler to handle http -type HTTPServiceSelfProtectionHandler struct { - req *http.Request - handler *serviceSelfProtectionHandler -} - -// Handle implement ServiceSelfProtectionHandler defined function -func (h *HTTPServiceSelfProtectionHandler) Handle() bool { - // to be implemented - return true -} - -// serviceSelfProtectionHandler is a handler which is independent communication mode -type serviceSelfProtectionHandler struct { - // todo APIRateLimiter - // todo AuditLogger -} diff --git a/server/server.go b/server/server.go index 531fd6fd55a..b4aec089f27 100644 --- a/server/server.go +++ b/server/server.go @@ -149,8 +149,6 @@ type Server struct { // tsoDispatcher is used to dispatch different TSO requests to // the corresponding forwarding TSO channel. tsoDispatcher sync.Map /* Store as map[string]chan *tsoRequest */ - // SelfProtectionHandler is used for PD self-pretection - selfProtectionHandler *SelfProtectionHandler } // HandlerBuilder builds a server HTTP handler. @@ -240,7 +238,6 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha } s.handler = newHandler(s) - s.selfProtectionHandler = NewSelfProtectionHandler(s) // Adjust etcd config. etcdCfg, err := s.cfg.GenEmbedEtcdConfig() @@ -671,11 +668,6 @@ func (s *Server) stopRaftCluster() { s.cluster.Stop() } -// GetAddr returns the server urls for clients. -func (s *Server) GetSelfProtectionHandler() *SelfProtectionHandler { - return s.selfProtectionHandler -} - // GetAddr returns the server urls for clients. func (s *Server) GetAddr() string { return s.cfg.AdvertiseClientUrls diff --git a/server/server_test.go b/server/server_test.go index ae71e54efa4..25f63f87ee3 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -21,14 +21,11 @@ import ( "net/http" "testing" - "github.com/gorilla/mux" . "github.com/pingcap/check" - "github.com/tikv/pd/pkg/apiutil" "github.com/tikv/pd/pkg/assertutil" "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server/config" - "github.com/urfave/negroni" "go.etcd.io/etcd/embed" "go.etcd.io/etcd/pkg/types" "go.uber.org/goleak" @@ -238,42 +235,3 @@ func (s *testServerHandlerSuite) TestRegisterServerHandler(c *C) { bodyString := string(bodyBytes) c.Assert(bodyString, Equals, "Hello World\n") } - -func (s *testServerHandlerSuite) TestMuxRouterName(c *C) { - handler := func(ctx context.Context, s *Server) (http.Handler, ServiceGroup, error) { - r := mux.NewRouter() - r.HandleFunc("/pd/apis/mok/v1/router", func(w http.ResponseWriter, r *http.Request) { - RouterName, _ := apiutil.GetHTTPRouteName(r) - fmt.Fprintln(w, RouterName) - }).Name("Mux Router") - info := ServiceGroup{ - Name: "mok", - Version: "v1", - } - router := mux.NewRouter() - router.PathPrefix("/pd").Handler(negroni.New( - negroni.Wrap(r)), - ) - return router, info, nil - } - cfg := NewTestSingleConfig(checkerWithNilAssert(c)) - ctx, cancel := context.WithCancel(context.Background()) - svr, err := CreateServer(ctx, cfg, handler) - c.Assert(err, IsNil) - defer func() { - cancel() - svr.Close() - testutil.CleanServer(svr.cfg.DataDir) - }() - err = svr.Run() - c.Assert(err, IsNil) - resp, err := http.Get(fmt.Sprintf("%s/pd/apis/mok/v1/router", svr.GetAddr())) - c.Assert(err, IsNil) - c.Assert(resp.StatusCode, Equals, http.StatusOK) - c.Assert(err, IsNil) - bodyBytes, err := io.ReadAll(resp.Body) - resp.Body.Close() - c.Assert(err, IsNil) - bodyString := string(bodyBytes) - c.Assert(bodyString, Equals, "Mux Router\n") -} From baa5163b347c80494c7622f1bf30e1e542ee7d64 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 23 Dec 2021 03:26:00 +0800 Subject: [PATCH 08/19] close #4490: add teset Signed-off-by: Cabinfever_B --- pkg/apiutil/apiutil.go | 30 ++++++++++++++ tests/pdctl/{global.go => global_test.go} | 50 ++++++++++++++++------- tests/pdctl/helper.go | 9 ++++ tools/pd-ctl/pdctl/command/global.go | 16 ++------ 4 files changed, 78 insertions(+), 27 deletions(-) rename tests/pdctl/{global.go => global_test.go} (50%) diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index 2c61ed45f28..f8eac33ce12 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -27,6 +27,14 @@ import ( "github.com/unrolled/render" ) +var ( + // ComponentSignatureKey is used for http request header key + // to identify component signature + ComponentSignatureKey = "ti-component" + // ComponentAnonymousValue identify anonymous request source + ComponentAnonymousValue = "anonymous" +) + // DeferClose captures the error returned from closing (if an error occurs). // This is designed to be used in a defer statement. func DeferClose(c io.Closer, err *error) { @@ -127,3 +135,25 @@ func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) { rd.JSON(w, http.StatusInternalServerError, err.Error()) } } + +// GetComponentNameOnHTTP return component name from Request Header +func GetComponentNameOnHTTP(r *http.Request) string { + componentName := r.Header.Get(ComponentSignatureKey) + if componentName == "" { + componentName = ComponentAnonymousValue + } + return componentName +} + +type UserSignatureRoundTripper struct { + Proxied http.RoundTripper + Component string +} + +// RoundTrip is used to implement RoundTripper +func (rt *UserSignatureRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { + req.Header.Add(ComponentSignatureKey, rt.Component) + // Send the request, get the response and the error + resp, err = rt.Proxied.RoundTrip(req) + return +} diff --git a/tests/pdctl/global.go b/tests/pdctl/global_test.go similarity index 50% rename from tests/pdctl/global.go rename to tests/pdctl/global_test.go index 4e2c15f2647..bb14eeafac2 100644 --- a/tests/pdctl/global.go +++ b/tests/pdctl/global_test.go @@ -16,15 +16,17 @@ package pdctl import ( "context" - "encoding/json" + "fmt" + "net/http" "testing" . "github.com/pingcap/check" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/apiutil" + "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server" - "github.com/tikv/pd/server/api" - "github.com/tikv/pd/tests" - "github.com/tikv/pd/tools/pd-ctl/pdctl" cmd "github.com/tikv/pd/tools/pd-ctl/pdctl" + "go.uber.org/zap" ) func Test(t *testing.T) { @@ -39,21 +41,39 @@ func (s *globalTestSuite) SetUpSuite(c *C) { server.EnableZap = true } -func (s *globalTestSuite) TestHealth(c *C) { +func (s *globalTestSuite) TestSendAndGetComponent(c *C) { + handler := func(ctx context.Context, s *server.Server) (http.Handler, server.ServiceGroup, error) { + mux := http.NewServeMux() + mux.HandleFunc("/pd/api/v1/health", func(w http.ResponseWriter, r *http.Request) { + component := apiutil.GetComponentNameOnHTTP(r) + for k := range r.Header { + log.Info("header", zap.String("key", k)) + } + log.Info("component", zap.String("component", component)) + c.Assert(component, Equals, "pdctl") + fmt.Fprint(w, component) + }) + info := server.ServiceGroup{ + IsCore: true, + } + return mux, info, nil + } + cfg := server.NewTestSingleConfig(checkerWithNilAssert(c)) ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) + svr, err := server.CreateServer(ctx, cfg, handler) c.Assert(err, IsNil) - err = cluster.RunInitialServers() + err = svr.Run() c.Assert(err, IsNil) - pdAddr := cluster.GetConfig().GetClientURL() - cmd := cmd.GetRootCmd() + pdAddr := svr.GetAddr() + defer func() { + cancel() + svr.Close() + testutil.CleanServer(svr.GetConfig().DataDir) + }() + cmd := cmd.GetRootCmd() args := []string{"-u", pdAddr, "health"} - output, err := pdctl.ExecuteCommand(cmd, args...) - c.Assert(err, IsNil) - h := make([]api.Health, len(healths)) - c.Assert(json.Unmarshal(output, &h), IsNil) + output, err := ExecuteCommand(cmd, args...) c.Assert(err, IsNil) - c.Assert(h, DeepEquals, healths) + c.Assert(string(output), Equals, "pdctl\n") } diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 66e675edb7f..75e7d19c288 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/spf13/cobra" + "github.com/tikv/pd/pkg/assertutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/api" "github.com/tikv/pd/server/core" @@ -113,3 +114,11 @@ func MustPutRegion(c *check.C, cluster *tests.TestCluster, regionID, storeID uin c.Assert(err, check.IsNil) return r } + +func checkerWithNilAssert(c *check.C) *assertutil.Checker { + checker := assertutil.NewChecker(c.FailNow) + checker.IsNil = func(obtained interface{}) { + c.Assert(obtained, check.IsNil) + } + return checker +} diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index bc6dcaa82ea..2e886fd590c 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/spf13/cobra" - "go.etcd.io/etcd/pkg/transport" + "github.com/tikv/pd/pkg/apiutil" ) var ( @@ -35,19 +35,11 @@ var ( // InitHTTPSClient creates https client with ca file func InitHTTPSClient(caPath, certPath, keyPath string) error { - tlsInfo := transport.TLSInfo{ - CertFile: certPath, - KeyFile: keyPath, - TrustedCAFile: caPath, - } - tlsConfig, err := tlsInfo.ClientConfig() - if err != nil { - return errors.WithStack(err) - } dialClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsConfig, + Transport: &apiutil.UserSignatureRoundTripper{ + Component: "pdctl", + Proxied: http.DefaultTransport, }, } From 03d80301b0c891e1329316a5a5a442fe96756e34 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 23 Dec 2021 03:29:05 +0800 Subject: [PATCH 09/19] close #4490: add teset Signed-off-by: Cabinfever_B --- tools/pd-ctl/pdctl/command/global.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index 2e886fd590c..aae5fad0fcd 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/spf13/cobra" "github.com/tikv/pd/pkg/apiutil" + "go.etcd.io/etcd/pkg/transport" ) var ( @@ -35,11 +36,21 @@ var ( // InitHTTPSClient creates https client with ca file func InitHTTPSClient(caPath, certPath, keyPath string) error { + tlsInfo := transport.TLSInfo{ + CertFile: certPath, + KeyFile: keyPath, + TrustedCAFile: caPath, + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return errors.WithStack(err) + } dialClient = &http.Client{ Transport: &apiutil.UserSignatureRoundTripper{ Component: "pdctl", - Proxied: http.DefaultTransport, + Proxied: &http.Transport{ + TLSClientConfig: tlsConfig}, }, } From 377c854cf644ba47002acec53dc5a186ba4e6412 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 23 Dec 2021 16:19:12 +0800 Subject: [PATCH 10/19] close #4490: fix transport and change component key Signed-off-by: Cabinfever_B --- pkg/apiutil/apiutil.go | 2 +- tools/pd-ctl/pdctl/command/global.go | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index f8eac33ce12..98f9491642c 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -30,7 +30,7 @@ import ( var ( // ComponentSignatureKey is used for http request header key // to identify component signature - ComponentSignatureKey = "ti-component" + ComponentSignatureKey = "component" // ComponentAnonymousValue identify anonymous request source ComponentAnonymousValue = "anonymous" ) diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index aae5fad0fcd..50f3021be56 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -30,7 +30,12 @@ import ( ) var ( - dialClient = &http.Client{} + dialClient = &http.Client{ + Transport: &apiutil.UserSignatureRoundTripper{ + Component: "pdctl", + Proxied: http.DefaultTransport, + }, + } pingPrefix = "pd/api/v1/ping" ) From d4c21796cbd955564c5aa814874b6401ebd0eeb8 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 23 Dec 2021 16:26:09 +0800 Subject: [PATCH 11/19] close #4490: add comment Signed-off-by: Cabinfever_B --- pkg/apiutil/apiutil.go | 5 +++-- tools/pd-ctl/pdctl/command/global.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index 98f9491642c..28c78c2af09 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -145,13 +145,14 @@ func GetComponentNameOnHTTP(r *http.Request) string { return componentName } -type UserSignatureRoundTripper struct { +// ComponentSignatureRoundTripper is used to add component signature in HTTP header +type ComponentSignatureRoundTripper struct { Proxied http.RoundTripper Component string } // RoundTrip is used to implement RoundTripper -func (rt *UserSignatureRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { +func (rt *ComponentSignatureRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { req.Header.Add(ComponentSignatureKey, rt.Component) // Send the request, get the response and the error resp, err = rt.Proxied.RoundTrip(req) diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index 50f3021be56..a60a221a59b 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -31,7 +31,7 @@ import ( var ( dialClient = &http.Client{ - Transport: &apiutil.UserSignatureRoundTripper{ + Transport: &apiutil.ComponentSignatureRoundTripper{ Component: "pdctl", Proxied: http.DefaultTransport, }, @@ -52,7 +52,7 @@ func InitHTTPSClient(caPath, certPath, keyPath string) error { } dialClient = &http.Client{ - Transport: &apiutil.UserSignatureRoundTripper{ + Transport: &apiutil.ComponentSignatureRoundTripper{ Component: "pdctl", Proxied: &http.Transport{ TLSClientConfig: tlsConfig}, From 09f4d282f02c03d549b0bb2fb5a922449f5f52a8 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 23 Dec 2021 18:09:48 +0800 Subject: [PATCH 12/19] config: remove unused nextRetryDelay (#4492) close #4493 Signed-off-by: Ryan Leung --- server/config/config.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index da850fbf7f1..fb63683cabd 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -149,12 +149,9 @@ type Config struct { // For all warnings during parsing. WarningMsgs []string - // Only test can change them. - nextRetryDelay time.Duration DisableStrictReconfigCheck bool HeartbeatStreamBindInterval typeutil.Duration - LeaderPriorityCheckInterval typeutil.Duration logger *zap.Logger @@ -201,7 +198,6 @@ func NewConfig() *Config { const ( defaultLeaderLease = int64(3) - defaultNextRetryDelay = time.Second defaultCompactionMode = "periodic" defaultAutoCompactionRetention = "1h" defaultQuotaBackendBytes = typeutil.ByteSize(8 * 1024 * 1024 * 1024) // 8GB @@ -550,10 +546,6 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { c.Labels = make(map[string]string) } - if c.nextRetryDelay == 0 { - c.nextRetryDelay = defaultNextRetryDelay - } - adjustString(&c.AutoCompactionMode, defaultCompactionMode) adjustString(&c.AutoCompactionRetention, defaultAutoCompactionRetention) if !configMetaData.IsDefined("quota-backend-bytes") { @@ -788,7 +780,7 @@ const ( defaultEnableJointConsensus = true defaultEnableCrossTableMerge = true defaultHotRegionsWriteInterval = 10 * time.Minute - defaultHotRegionsResevervedDays = 0 + defaultHotRegionsReservedDays = 0 ) func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error { @@ -875,7 +867,7 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error { } if !meta.IsDefined("hot-regions-reserved-days") { - adjustInt64(&c.HotRegionsReservedDays, defaultHotRegionsResevervedDays) + adjustInt64(&c.HotRegionsReservedDays, defaultHotRegionsReservedDays) } return c.Validate() @@ -928,7 +920,7 @@ func (c *ScheduleConfig) MigrateDeprecatedFlags() { // Validate is used to validate if some scheduling configurations are right. func (c *ScheduleConfig) Validate() error { if c.TolerantSizeRatio < 0 { - return errors.New("tolerant-size-ratio should be nonnegative") + return errors.New("tolerant-size-ratio should be non-negative") } if c.LowSpaceRatio < 0 || c.LowSpaceRatio > 1 { return errors.New("low-space-ratio should between 0 and 1") From 8d35e9bd146c2cabd0bc5ae9fd2cf874b49c2571 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 24 Dec 2021 13:33:47 +0800 Subject: [PATCH 13/19] config: expose max request bytes (#4482) ref #4429 Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- server/config/config.go | 7 +++++++ server/config/config_test.go | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/server/config/config.go b/server/config/config.go index fb63683cabd..b02ab0e74b3 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -140,6 +140,8 @@ type Config struct { // an election, thus minimizing disruptions. PreVote bool `toml:"enable-prevote"` + MaxRequestBytes uint `toml:"max-request-bytes" json:"max-request-bytes"` + Security SecurityConfig `toml:"security" json:"security"` LabelProperty LabelPropertyConfig `toml:"label-property" json:"label-property"` @@ -201,6 +203,7 @@ const ( defaultCompactionMode = "periodic" defaultAutoCompactionRetention = "1h" defaultQuotaBackendBytes = typeutil.ByteSize(8 * 1024 * 1024 * 1024) // 8GB + defaultMaxRequestBytes = uint(1.5 * 1024 * 1024) // 1.5MB defaultName = "pd" defaultClientUrls = "http://127.0.0.1:2379" @@ -551,6 +554,9 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { if !configMetaData.IsDefined("quota-backend-bytes") { c.QuotaBackendBytes = defaultQuotaBackendBytes } + if !configMetaData.IsDefined("max-request-bytes") { + c.MaxRequestBytes = defaultMaxRequestBytes + } adjustDuration(&c.TickInterval, defaultTickInterval) adjustDuration(&c.ElectionInterval, defaultElectionInterval) @@ -1254,6 +1260,7 @@ func (c *Config) GenEmbedEtcdConfig() (*embed.Config, error) { cfg.AutoCompactionMode = c.AutoCompactionMode cfg.AutoCompactionRetention = c.AutoCompactionRetention cfg.QuotaBackendBytes = int64(c.QuotaBackendBytes) + cfg.MaxRequestBytes = c.MaxRequestBytes allowedCN, serr := c.Security.GetOneAllowedCN() if serr != nil { diff --git a/server/config/config_test.go b/server/config/config_test.go index f1f2785dd79..a0353828689 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -158,12 +158,15 @@ func (s *testConfigSuite) TestValidation(c *C) { c.Assert(cfg.Schedule.Validate(), NotNil) // check quota c.Assert(cfg.QuotaBackendBytes, Equals, defaultQuotaBackendBytes) + // check request bytes + c.Assert(cfg.MaxRequestBytes, Equals, defaultMaxRequestBytes) } func (s *testConfigSuite) TestAdjust(c *C) { cfgData := ` name = "" lease = 0 +max-request-bytes = 20000000 [pd-server] metric-storage = "http://127.0.0.1:9090" @@ -184,6 +187,7 @@ leader-schedule-limit = 0 c.Assert(err, IsNil) c.Assert(cfg.Name, Equals, fmt.Sprintf("%s-%s", defaultName, host)) c.Assert(cfg.LeaderLease, Equals, defaultLeaderLease) + c.Assert(cfg.MaxRequestBytes, Equals, uint(20000000)) // When defined, use values from config file. c.Assert(cfg.Schedule.MaxMergeRegionSize, Equals, uint64(0)) c.Assert(cfg.Schedule.EnableOneWayMerge, IsTrue) From 175d925bfee79f47fd5d5d897b132743fedaf804 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Mon, 27 Dec 2021 14:55:21 +0800 Subject: [PATCH 14/19] Update pkg/apiutil/apiutil.go Co-authored-by: disksing --- pkg/apiutil/apiutil.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index 28c78c2af09..bbd4264d155 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -31,7 +31,7 @@ var ( // ComponentSignatureKey is used for http request header key // to identify component signature ComponentSignatureKey = "component" - // ComponentAnonymousValue identify anonymous request source + // ComponentAnonymousValue identifies anonymous request source ComponentAnonymousValue = "anonymous" ) From ae434af6b5d712f7c6356ed6668e393b3c2e2f5a Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Mon, 27 Dec 2021 15:20:04 +0800 Subject: [PATCH 15/19] Update pkg/apiutil/apiutil.go Co-authored-by: disksing --- pkg/apiutil/apiutil.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index bbd4264d155..a5ed3bc0256 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -136,7 +136,7 @@ func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) { } } -// GetComponentNameOnHTTP return component name from Request Header +// GetComponentNameOnHTTP returns component name from Request Header func GetComponentNameOnHTTP(r *http.Request) string { componentName := r.Header.Get(ComponentSignatureKey) if componentName == "" { From e904c838a700cd8a1f8983c4042baa369adac2ce Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 27 Dec 2021 16:14:46 +0800 Subject: [PATCH 16/19] close #4490: fix exported var problem Signed-off-by: Cabinfever_B --- pkg/apiutil/apiutil.go | 28 ++++++++++++++++++---------- tools/pd-ctl/pdctl/command/global.go | 15 +++++---------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index a5ed3bc0256..83c2c3b75ec 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -28,11 +28,11 @@ import ( ) var ( - // ComponentSignatureKey is used for http request header key + // componentSignatureKey is used for http request header key // to identify component signature - ComponentSignatureKey = "component" - // ComponentAnonymousValue identifies anonymous request source - ComponentAnonymousValue = "anonymous" + componentSignatureKey = "component" + // componentAnonymousValue identifies anonymous request source + componentAnonymousValue = "anonymous" ) // DeferClose captures the error returned from closing (if an error occurs). @@ -138,23 +138,31 @@ func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) { // GetComponentNameOnHTTP returns component name from Request Header func GetComponentNameOnHTTP(r *http.Request) string { - componentName := r.Header.Get(ComponentSignatureKey) + componentName := r.Header.Get(componentSignatureKey) if componentName == "" { - componentName = ComponentAnonymousValue + componentName = componentAnonymousValue } return componentName } // ComponentSignatureRoundTripper is used to add component signature in HTTP header type ComponentSignatureRoundTripper struct { - Proxied http.RoundTripper - Component string + proxied http.RoundTripper + component *string +} + +// NewComponentSignatureRoundTripper returns a new ComponentSignatureRoundTripper. +func NewComponentSignatureRoundTripper(roundTripper http.RoundTripper, componentName *string) *ComponentSignatureRoundTripper { + return &ComponentSignatureRoundTripper{ + proxied: roundTripper, + component: componentName, + } } // RoundTrip is used to implement RoundTripper func (rt *ComponentSignatureRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { - req.Header.Add(ComponentSignatureKey, rt.Component) + req.Header.Add(componentSignatureKey, *rt.component) // Send the request, get the response and the error - resp, err = rt.Proxied.RoundTrip(req) + resp, err = rt.proxied.RoundTrip(req) return } diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index a60a221a59b..21a0f3d4ec8 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -30,11 +30,9 @@ import ( ) var ( - dialClient = &http.Client{ - Transport: &apiutil.ComponentSignatureRoundTripper{ - Component: "pdctl", - Proxied: http.DefaultTransport, - }, + pdControllerComponentName = "pdctl" + dialClient = &http.Client{ + Transport: apiutil.NewComponentSignatureRoundTripper(http.DefaultTransport, &pdControllerComponentName), } pingPrefix = "pd/api/v1/ping" ) @@ -52,11 +50,8 @@ func InitHTTPSClient(caPath, certPath, keyPath string) error { } dialClient = &http.Client{ - Transport: &apiutil.ComponentSignatureRoundTripper{ - Component: "pdctl", - Proxied: &http.Transport{ - TLSClientConfig: tlsConfig}, - }, + Transport: apiutil.NewComponentSignatureRoundTripper( + &http.Transport{TLSClientConfig: tlsConfig}, &pdControllerComponentName), } return nil From a706d8fb083848b96e50ddee725c6bfa55c85558 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 27 Dec 2021 16:21:50 +0800 Subject: [PATCH 17/19] api, pd-ctl: fix the region range hole end key check and add some help info (#4498) * Fix the region range hole end key check and add some help info (close #4496 #4216) Signed-off-by: JmPotato * Fix the typo Signed-off-by: JmPotato --- server/api/region_test.go | 1 + server/core/region.go | 4 +++ tests/pdctl/region/region_test.go | 1 + tools/pd-ctl/pdctl/command/region_command.go | 27 +++++++++++++++++--- 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/server/api/region_test.go b/server/api/region_test.go index b57553bc49c..d903afa8601 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -536,6 +536,7 @@ func (s *testGetRegionRangeHolesSuite) TestRegionRangeHoles(c *C) { {"", core.HexRegionKeyStr(r1.GetStartKey())}, {core.HexRegionKeyStr(r1.GetEndKey()), core.HexRegionKeyStr(r3.GetStartKey())}, {core.HexRegionKeyStr(r4.GetEndKey()), core.HexRegionKeyStr(r6.GetStartKey())}, + {core.HexRegionKeyStr(r6.GetEndKey()), ""}, }) } diff --git a/server/core/region.go b/server/core/region.go index aa403dc7667..381aa50c6b8 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -1143,6 +1143,10 @@ func (r *RegionsInfo) GetRangeHoles() [][]string { lastEndKey = region.GetEndKey() return true }) + // If the last end key is not empty, it means there is a range hole at the end. + if len(lastEndKey) > 0 { + rangeHoles = append(rangeHoles, []string{HexRegionKeyStr(lastEndKey), ""}) + } return rangeHoles } diff --git a/tests/pdctl/region/region_test.go b/tests/pdctl/region/region_test.go index 6423b97e81e..507094cadce 100644 --- a/tests/pdctl/region/region_test.go +++ b/tests/pdctl/region/region_test.go @@ -201,5 +201,6 @@ func (s *regionTestSuite) TestRegion(c *C) { c.Assert(*rangeHoles, DeepEquals, [][]string{ {"", core.HexRegionKeyStr(r1.GetStartKey())}, {core.HexRegionKeyStr(r4.GetEndKey()), core.HexRegionKeyStr(r5.GetStartKey())}, + {core.HexRegionKeyStr(r5.GetEndKey()), ""}, }) } diff --git a/tools/pd-ctl/pdctl/command/region_command.go b/tools/pd-ctl/pdctl/command/region_command.go index c16dc983df9..600a6ebaa88 100644 --- a/tools/pd-ctl/pdctl/command/region_command.go +++ b/tools/pd-ctl/pdctl/command/region_command.go @@ -519,12 +519,33 @@ func showRegionWithStoreCommandFunc(cmd *cobra.Command, args []string) { cmd.Println(r) } +const ( + rangeHolesLongDesc = `There are some cases that the region range is not continuous, for example, the region doesn't send the heartbeat to PD after a splitting. +This command will output all empty ranges without any region info.` + rangeHolesExample = ` + If PD now holds the region ranges info like ["", "a"], ["b", "x"], ["x", "z"]. The the output will be like: + + [ + [ + "a", + "b" + ], + [ + "z", + "" + ], + ] +` +) + // NewRangesWithRangeHolesCommand returns ranges with range-holes subcommand of regionCmd func NewRangesWithRangeHolesCommand() *cobra.Command { r := &cobra.Command{ - Use: "range-holes", - Short: "show all empty ranges without any region info", - Run: showRangesWithRangeHolesCommandFunc, + Use: "range-holes", + Short: "show all empty ranges without any region info.", + Long: rangeHolesLongDesc, + Example: rangeHolesExample, + Run: showRangesWithRangeHolesCommandFunc, } return r } From ea57c6d6a0c9ad081aded76545f3d97bc465635f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 27 Dec 2021 20:41:49 +0800 Subject: [PATCH 18/19] *: move `parseUrls` to util (#4506) * move parse url to util close #4505 Signed-off-by: Ryan Leung * address the comment Signed-off-by: Ryan Leung --- pkg/apiutil/serverapi/middleware.go | 15 +++++++++------ server/config/config.go | 25 ++++--------------------- server/config/util.go | 18 ++++++++++++++++++ 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index 973775d1214..1fea0e1b9d0 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -18,12 +18,10 @@ import ( "io" "net/http" "net/url" - "strings" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" - "github.com/tikv/pd/server/config" "github.com/urfave/negroni" "go.uber.org/zap" ) @@ -113,11 +111,16 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http http.Error(w, "no leader", http.StatusServiceUnavailable) return } + clientUrls := leader.GetClientUrls() + urls := make([]url.URL, 0, len(clientUrls)) + for _, item := range clientUrls { + u, err := url.Parse(item) + if err != nil { + http.Error(w, errs.ErrURLParse.Wrap(err).GenWithStackByCause().Error(), http.StatusInternalServerError) + return + } - urls, err := config.ParseUrls(strings.Join(leader.GetClientUrls(), ",")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + urls = append(urls, *u) } client := h.s.GetHTTPClient() NewCustomReverseProxies(client, urls).ServeHTTP(w, r) diff --git a/server/config/config.go b/server/config/config.go index b02ab0e74b3..930d459b0f3 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -1199,23 +1199,6 @@ func (c LabelPropertyConfig) Clone() LabelPropertyConfig { return m } -// ParseUrls parse a string into multiple urls. -// Export for api. -func ParseUrls(s string) ([]url.URL, error) { - items := strings.Split(s, ",") - urls := make([]url.URL, 0, len(items)) - for _, item := range items { - u, err := url.Parse(item) - if err != nil { - return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause() - } - - urls = append(urls, *u) - } - - return urls, nil -} - // SetupLogger setup the logger. func (c *Config) SetupLogger() error { lg, p, err := log.InitLogger(&c.Log, zap.AddStacktrace(zapcore.FatalLevel)) @@ -1283,22 +1266,22 @@ func (c *Config) GenEmbedEtcdConfig() (*embed.Config, error) { cfg.Logger = "zap" var err error - cfg.LPUrls, err = ParseUrls(c.PeerUrls) + cfg.LPUrls, err = parseUrls(c.PeerUrls) if err != nil { return nil, err } - cfg.APUrls, err = ParseUrls(c.AdvertisePeerUrls) + cfg.APUrls, err = parseUrls(c.AdvertisePeerUrls) if err != nil { return nil, err } - cfg.LCUrls, err = ParseUrls(c.ClientUrls) + cfg.LCUrls, err = parseUrls(c.ClientUrls) if err != nil { return nil, err } - cfg.ACUrls, err = ParseUrls(c.AdvertiseClientUrls) + cfg.ACUrls, err = parseUrls(c.AdvertiseClientUrls) if err != nil { return nil, err } diff --git a/server/config/util.go b/server/config/util.go index 7683f0dde0f..af11a7d8fbe 100644 --- a/server/config/util.go +++ b/server/config/util.go @@ -17,9 +17,11 @@ package config import ( "net/url" "regexp" + "strings" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/errs" ) const ( @@ -87,3 +89,19 @@ func NewTestOptions() *PersistOptions { c.Adjust(nil, false) return NewPersistOptions(c) } + +// parseUrls parse a string into multiple urls. +func parseUrls(s string) ([]url.URL, error) { + items := strings.Split(s, ",") + urls := make([]url.URL, 0, len(items)) + for _, item := range items { + u, err := url.Parse(item) + if err != nil { + return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause() + } + + urls = append(urls, *u) + } + + return urls, nil +} From 909f2fdebb3f06b704dfc4498b498708efe9ee32 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 28 Dec 2021 13:41:49 +0800 Subject: [PATCH 19/19] statistics: small refactor of hot statistics (#4461) * small refactor of hot statistics close #4463 Signed-off-by: Ryan Leung * address the comment Signed-off-by: Ryan Leung * address the comment Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/mock/mockcluster/mockcluster.go | 10 +- server/cluster/cluster.go | 6 +- server/handler.go | 2 +- server/schedulers/hot_region_test.go | 6 +- server/statistics/hot_cache.go | 102 +++++++++------------ server/statistics/hot_cache_task.go | 54 +++++------ server/statistics/hot_peer.go | 60 ++++++------ server/statistics/hot_peer_cache.go | 111 ++++++++++++----------- server/statistics/hot_peer_cache_test.go | 70 +++++++------- server/statistics/kind.go | 22 +++++ 10 files changed, 225 insertions(+), 218 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index ce8c79652ab..0864e9e525c 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -130,7 +130,7 @@ func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { // HotRegionsFromStore picks hot regions in specify store. func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*core.RegionInfo { - stats := mc.HotCache.HotRegionsFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold()) + stats := hotRegionsFromStore(mc.HotCache, store, kind, mc.GetHotRegionCacheHitsThreshold()) regions := make([]*core.RegionInfo, 0, len(stats)) for _, stat := range stats { region := mc.GetRegion(stat.RegionID) @@ -141,6 +141,14 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []* return regions } +// hotRegionsFromStore picks hot region in specify store. +func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics.RWType, minHotDegree int) []*statistics.HotPeerStat { + if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { + return stats + } + return nil +} + // AllocPeer allocs a new peer on a store. func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { peerID, err := mc.AllocID() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index fa9fce926ef..c6738b916fb 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -594,11 +594,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { c.limiter.Collect(newStore.GetStoreStats()) } - regionIDs := make(map[uint64]struct{}, len(stats.GetPeerStats())) + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) for _, peerStat := range stats.GetPeerStats() { regionID := peerStat.GetRegionId() - regionIDs[regionID] = struct{}{} region := c.GetRegion(regionID) + regions[regionID] = region if region == nil { log.Warn("discard hot peer stat for unknown region", zap.Uint64("region-id", regionID), @@ -624,7 +624,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { peerInfo := core.NewPeerInfo(peer, loads, interval) c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } - c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regionIDs, interval)) + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) return nil } diff --git a/server/handler.go b/server/handler.go index df3285120b6..46fc5c98455 100644 --- a/server/handler.go +++ b/server/handler.go @@ -999,7 +999,7 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotR } } stat := core.HistoryHotRegion{ - // store in ms. + // store in ms. UpdateTime: hotPeerStat.LastUpdateTime.UnixNano() / int64(time.Millisecond), RegionID: hotPeerStat.RegionID, StoreID: hotPeerStat.StoreID, diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index dd7192cba55..39328ceac74 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -1552,7 +1552,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) { c.Check(len(items), Greater, 0) for _, item := range items { if item.StoreID == 3 { - c.Check(item.IsNeedDelete(), IsTrue) + c.Check(item.GetActionType(), Equals, statistics.Remove) continue } c.Check(item.HotDegree, Equals, testcase.DegreeAfterTransferLeader+2) @@ -1586,9 +1586,9 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) { items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1) for _, item := range items { if item.StoreID < 4 { - c.Check(item.IsNeedDelete(), IsTrue) + c.Check(item.GetActionType(), Equals, statistics.Remove) } else { - c.Check(item.IsNeedDelete(), IsFalse) + c.Check(item.GetActionType(), Equals, statistics.Update) } } } diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index 125b5f6206e..ece8932c959 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -28,43 +28,39 @@ const queueCap = 20000 // HotCache is a cache hold hot regions. type HotCache struct { - ctx context.Context - readFlowQueue chan FlowItemTask - writeFlowQueue chan FlowItemTask - writeFlow *hotPeerCache - readFlow *hotPeerCache + ctx context.Context + writeCache *hotPeerCache + readCache *hotPeerCache } // NewHotCache creates a new hot spot cache. func NewHotCache(ctx context.Context) *HotCache { w := &HotCache{ - ctx: ctx, - readFlowQueue: make(chan FlowItemTask, queueCap), - writeFlowQueue: make(chan FlowItemTask, queueCap), - writeFlow: NewHotPeerCache(Write), - readFlow: NewHotPeerCache(Read), + ctx: ctx, + writeCache: NewHotPeerCache(Write), + readCache: NewHotPeerCache(Read), } - go w.updateItems(w.readFlowQueue, w.runReadTask) - go w.updateItems(w.writeFlowQueue, w.runWriteTask) + go w.updateItems(w.readCache.taskQueue, w.runReadTask) + go w.updateItems(w.writeCache.taskQueue, w.runWriteTask) return w } // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster. func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.writeFlow.CheckPeerFlow(peer, region) + return w.writeCache.checkPeerFlow(peer, region) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster. func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.readFlow.CheckPeerFlow(peer, region) + return w.readCache.checkPeerFlow(peer, region) } // CheckWriteAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { +func (w *HotCache) CheckWriteAsync(task flowItemTask) bool { select { - case w.writeFlowQueue <- task: + case w.writeCache.taskQueue <- task: return true default: return false @@ -72,9 +68,9 @@ func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { } // CheckReadAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { +func (w *HotCache) CheckReadAsync(task flowItemTask) bool { select { - case w.readFlowQueue <- task: + case w.readCache.taskQueue <- task: return true default: return false @@ -86,39 +82,26 @@ func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { func (w *HotCache) Update(item *HotPeerStat) { switch item.Kind { case Write: - update(item, w.writeFlow) + updateStat(w.writeCache, item) case Read: - update(item, w.readFlow) + updateStat(w.readCache, item) } } // RegionStats returns hot items according to kind func (w *HotCache) RegionStats(kind RWType, minHotDegree int) map[uint64][]*HotPeerStat { + task := newCollectRegionStatsTask(minHotDegree) + var succ bool switch kind { case Write: - task := newCollectRegionStatsTask(minHotDegree) - succ := w.CheckWriteAsync(task) - if !succ { - return nil - } - return task.waitRet(w.ctx) + succ = w.CheckWriteAsync(task) case Read: - task := newCollectRegionStatsTask(minHotDegree) - succ := w.CheckReadAsync(task) - if !succ { - return nil - } - return task.waitRet(w.ctx) + succ = w.CheckReadAsync(task) } - return nil -} - -// HotRegionsFromStore picks hot region in specify store. -func (w *HotCache) HotRegionsFromStore(storeID uint64, kind RWType, minHotDegree int) []*HotPeerStat { - if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { - return stats + if !succ { + return nil } - return nil + return task.waitRet(w.ctx) } // IsRegionHot checks if the region is hot. @@ -149,13 +132,13 @@ func (w *HotCache) ResetMetrics() { // ExpiredReadItems returns the read items which are already expired. // This is used for mockcluster. func (w *HotCache) ExpiredReadItems(region *core.RegionInfo) []*HotPeerStat { - return w.readFlow.CollectExpiredItems(region) + return w.readCache.collectExpiredItems(region) } // ExpiredWriteItems returns the write items which are already expired. // This is used for mockcluster. func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat { - return w.writeFlow.CollectExpiredItems(region) + return w.writeCache.collectExpiredItems(region) } func incMetrics(name string, storeID uint64, kind RWType) { @@ -172,14 +155,14 @@ func incMetrics(name string, storeID uint64, kind RWType) { func (w *HotCache) GetFilledPeriod(kind RWType) int { switch kind { case Write: - return w.writeFlow.getDefaultTimeMedian().GetFilledPeriod() + return w.writeCache.getDefaultTimeMedian().GetFilledPeriod() case Read: - return w.readFlow.getDefaultTimeMedian().GetFilledPeriod() + return w.readCache.getDefaultTimeMedian().GetFilledPeriod() } return 0 } -func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task FlowItemTask)) { +func (w *HotCache) updateItems(queue <-chan flowItemTask, runTask func(task flowItemTask)) { for { select { case <-w.ctx.Done(): @@ -190,29 +173,30 @@ func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task Flow } } -func (w *HotCache) runReadTask(task FlowItemTask) { +func (w *HotCache) runReadTask(task flowItemTask) { if task != nil { - // TODO: do we need a run-task timeout to protect the queue won't be stucked by a task? - task.runTask(w.readFlow) - hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readFlowQueue))) + // TODO: do we need a run-task timeout to protect the queue won't be stuck by a task? + task.runTask(w.readCache) + hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readCache.taskQueue))) } } -func (w *HotCache) runWriteTask(task FlowItemTask) { +func (w *HotCache) runWriteTask(task flowItemTask) { if task != nil { - // TODO: do we need a run-task timeout to protect the queue won't be stucked by a task? - task.runTask(w.writeFlow) - hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeFlowQueue))) + // TODO: do we need a run-task timeout to protect the queue won't be stuck by a task? + task.runTask(w.writeCache) + hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeCache.taskQueue))) } } -func update(item *HotPeerStat, flow *hotPeerCache) { - flow.Update(item) - if item.IsNeedDelete() { - incMetrics("remove_item", item.StoreID, item.Kind) - } else if item.IsNew() { +func updateStat(cache *hotPeerCache, item *HotPeerStat) { + cache.update(item) + switch item.actionType { + case Add: incMetrics("add_item", item.StoreID, item.Kind) - } else { + case Remove: + incMetrics("remove_item", item.StoreID, item.Kind) + case Update: incMetrics("update_item", item.StoreID, item.Kind) } } diff --git a/server/statistics/hot_cache_task.go b/server/statistics/hot_cache_task.go index 672306b35d0..2f71c5ecee5 100644 --- a/server/statistics/hot_cache_task.go +++ b/server/statistics/hot_cache_task.go @@ -31,10 +31,10 @@ const ( collectMetricsTaskType ) -// FlowItemTask indicates the task in flowItem queue -type FlowItemTask interface { +// flowItemTask indicates the task in flowItem queue +type flowItemTask interface { taskType() flowItemTaskKind - runTask(flow *hotPeerCache) + runTask(cache *hotPeerCache) } type checkPeerTask struct { @@ -43,7 +43,7 @@ type checkPeerTask struct { } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask { +func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) flowItemTask { return &checkPeerTask{ peerInfo: peerInfo, regionInfo: regionInfo, @@ -54,10 +54,10 @@ func (t *checkPeerTask) taskType() flowItemTaskKind { return checkPeerTaskType } -func (t *checkPeerTask) runTask(flow *hotPeerCache) { - stat := flow.CheckPeerFlow(t.peerInfo, t.regionInfo) +func (t *checkPeerTask) runTask(cache *hotPeerCache) { + stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo) if stat != nil { - update(stat, flow) + updateStat(cache, stat) } } @@ -66,7 +66,7 @@ type checkExpiredTask struct { } // NewCheckExpiredItemTask creates task to collect expired items -func NewCheckExpiredItemTask(region *core.RegionInfo) FlowItemTask { +func NewCheckExpiredItemTask(region *core.RegionInfo) flowItemTask { return &checkExpiredTask{ region: region, } @@ -76,25 +76,25 @@ func (t *checkExpiredTask) taskType() flowItemTaskKind { return checkExpiredTaskType } -func (t *checkExpiredTask) runTask(flow *hotPeerCache) { - expiredStats := flow.CollectExpiredItems(t.region) +func (t *checkExpiredTask) runTask(cache *hotPeerCache) { + expiredStats := cache.collectExpiredItems(t.region) for _, stat := range expiredStats { - update(stat, flow) + updateStat(cache, stat) } } type collectUnReportedPeerTask struct { - storeID uint64 - regionIDs map[uint64]struct{} - interval uint64 + storeID uint64 + regions map[uint64]*core.RegionInfo + interval uint64 } // NewCollectUnReportedPeerTask creates task to collect unreported peers -func NewCollectUnReportedPeerTask(storeID uint64, regionIDs map[uint64]struct{}, interval uint64) FlowItemTask { +func NewCollectUnReportedPeerTask(storeID uint64, regions map[uint64]*core.RegionInfo, interval uint64) flowItemTask { return &collectUnReportedPeerTask{ - storeID: storeID, - regionIDs: regionIDs, - interval: interval, + storeID: storeID, + regions: regions, + interval: interval, } } @@ -102,10 +102,10 @@ func (t *collectUnReportedPeerTask) taskType() flowItemTaskKind { return collectUnReportedPeerTaskType } -func (t *collectUnReportedPeerTask) runTask(flow *hotPeerCache) { - stats := flow.CheckColdPeer(t.storeID, t.regionIDs, t.interval) +func (t *collectUnReportedPeerTask) runTask(cache *hotPeerCache) { + stats := cache.checkColdPeer(t.storeID, t.regions, t.interval) for _, stat := range stats { - update(stat, flow) + updateStat(cache, stat) } } @@ -125,8 +125,8 @@ func (t *collectRegionStatsTask) taskType() flowItemTaskKind { return collectRegionStatsTaskType } -func (t *collectRegionStatsTask) runTask(flow *hotPeerCache) { - t.ret <- flow.RegionStats(t.minDegree) +func (t *collectRegionStatsTask) runTask(cache *hotPeerCache) { + t.ret <- cache.RegionStats(t.minDegree) } // TODO: do we need a wait-return timeout? @@ -157,8 +157,8 @@ func (t *isRegionHotTask) taskType() flowItemTaskKind { return isRegionHotTaskType } -func (t *isRegionHotTask) runTask(flow *hotPeerCache) { - t.ret <- flow.isRegionHotWithAnyPeers(t.region, t.minHotDegree) +func (t *isRegionHotTask) runTask(cache *hotPeerCache) { + t.ret <- cache.isRegionHotWithAnyPeers(t.region, t.minHotDegree) } // TODO: do we need a wait-return timeout? @@ -185,6 +185,6 @@ func (t *collectMetricsTask) taskType() flowItemTaskKind { return collectMetricsTaskType } -func (t *collectMetricsTask) runTask(flow *hotPeerCache) { - flow.CollectMetrics(t.typ) +func (t *collectMetricsTask) runTask(cache *hotPeerCache) { + cache.collectMetrics(t.typ) } diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index e7055a38af3..0bebaaa7a5f 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -33,48 +33,48 @@ const ( type dimStat struct { typ RegionStatKind - Rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. - LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. + rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. + lastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. } func newDimStat(typ RegionStatKind, reportInterval time.Duration) *dimStat { return &dimStat{ typ: typ, - Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), - LastAverage: movingaverage.NewAvgOverTime(reportInterval), + rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), + lastAverage: movingaverage.NewAvgOverTime(reportInterval), } } func (d *dimStat) Add(delta float64, interval time.Duration) { - d.LastAverage.Add(delta, interval) - d.Rolling.Add(delta, interval) + d.lastAverage.Add(delta, interval) + d.rolling.Add(delta, interval) } func (d *dimStat) isLastAverageHot(threshold float64) bool { - return d.LastAverage.Get() >= threshold + return d.lastAverage.Get() >= threshold } func (d *dimStat) isHot(threshold float64) bool { - return d.Rolling.Get() >= threshold + return d.rolling.Get() >= threshold } func (d *dimStat) isFull() bool { - return d.LastAverage.IsFull() + return d.lastAverage.IsFull() } func (d *dimStat) clearLastAverage() { - d.LastAverage.Clear() + d.lastAverage.Clear() } func (d *dimStat) Get() float64 { - return d.Rolling.Get() + return d.rolling.Get() } func (d *dimStat) Clone() *dimStat { return &dimStat{ typ: d.typ, - Rolling: d.Rolling.Clone(), - LastAverage: d.LastAverage.Clone(), + rolling: d.rolling.Clone(), + lastAverage: d.lastAverage.Clone(), } } @@ -97,11 +97,8 @@ type HotPeerStat struct { // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` - needDelete bool - isLeader bool - isNew bool - // TODO: remove it when we send peer stat by store info - justTransferLeader bool + actionType ActionType + isLeader bool interval uint64 thresholds []float64 peers []uint64 @@ -140,10 +137,9 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi zap.Int("hot-degree", stat.HotDegree), zap.Int("hot-anti-count", stat.AntiCount), zap.Duration("sum-interval", stat.getIntervalSum()), - zap.Bool("need-delete", stat.IsNeedDelete()), zap.String("source", stat.source.String()), zap.Bool("allow-adopt", stat.allowAdopt), - zap.Bool("just-transfer-leader", stat.justTransferLeader), + zap.String("action-type", stat.actionType.String()), zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime)) } @@ -152,22 +148,17 @@ func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool { return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*stat.hotStatReportInterval()) } -// IsNeedDelete to delete the item in cache. -func (stat *HotPeerStat) IsNeedDelete() bool { - return stat.needDelete -} - // IsLeader indicates the item belong to the leader. func (stat *HotPeerStat) IsLeader() bool { return stat.isLeader } -// IsNew indicates the item is first update in the cache of the region. -func (stat *HotPeerStat) IsNew() bool { - return stat.isNew +// GetActionType returns the item action type. +func (stat *HotPeerStat) GetActionType() ActionType { + return stat.actionType } -// GetLoad returns denoised load if possible. +// GetLoad returns denoising load if possible. func (stat *HotPeerStat) GetLoad(k RegionStatKind) float64 { if len(stat.rollingLoads) > int(k) { return math.Round(stat.rollingLoads[int(k)].Get()) @@ -175,7 +166,7 @@ func (stat *HotPeerStat) GetLoad(k RegionStatKind) float64 { return math.Round(stat.Loads[int(k)]) } -// GetLoads returns denoised load if possible. +// GetLoads returns denoising load if possible. func (stat *HotPeerStat) GetLoads() []float64 { regionStats := stat.Kind.RegionStats() loads := make([]float64, len(regionStats)) @@ -185,7 +176,8 @@ func (stat *HotPeerStat) GetLoads() []float64 { return loads } -// GetThresholds returns thresholds +// GetThresholds returns thresholds. +// Only for test purpose. func (stat *HotPeerStat) GetThresholds() []float64 { return stat.thresholds } @@ -201,9 +193,9 @@ func (stat *HotPeerStat) Clone() *HotPeerStat { return &ret } -func (stat *HotPeerStat) isFullAndHot() bool { +func (stat *HotPeerStat) isHot() bool { return slice.AnyOf(stat.rollingLoads, func(i int) bool { - return stat.rollingLoads[i].isFull() && stat.rollingLoads[i].isLastAverageHot(stat.thresholds[i]) + return stat.rollingLoads[i].isLastAverageHot(stat.thresholds[i]) }) } @@ -224,5 +216,5 @@ func (stat *HotPeerStat) getIntervalSum() time.Duration { if len(stat.rollingLoads) == 0 || stat.rollingLoads[0] == nil { return 0 } - return stat.rollingLoads[0].LastAverage.GetIntervalSum() + return stat.rollingLoads[0].lastAverage.GetIntervalSum() } diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index c5a08db5dd8..7de9a8ceaca 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -61,6 +61,7 @@ type hotPeerCache struct { inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat topNTTL time.Duration reportIntervalSecs int + taskQueue chan flowItemTask } // NewHotPeerCache creates a hotPeerCache @@ -71,6 +72,7 @@ func NewHotPeerCache(kind RWType) *hotPeerCache { storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), inheritItem: make(map[uint64]*HotPeerStat), + taskQueue: make(chan flowItemTask, queueCap), } if kind == Write { c.reportIntervalSecs = WriteReportInterval @@ -98,14 +100,14 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { return res } -// Update updates the items in statistics. -func (f *hotPeerCache) Update(item *HotPeerStat) { - if item.IsNeedDelete() { +// update updates the items in statistics. +func (f *hotPeerCache) update(item *HotPeerStat) { + if item.actionType == Remove { if item.AntiCount > 0 { // means it's deleted because expired rather than cold f.putInheritItem(item) } f.removeItem(item) - item.Log("region heartbeat delete from cache", log.Debug) + item.Log("region heartbeat remove from cache", log.Debug) } else { f.putItem(item) item.Log("region heartbeat update", log.Debug) @@ -136,15 +138,15 @@ func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) { } } -// CollectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items -func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerStat { +// collectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items +func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerStat { regionID := region.GetID() items := make([]*HotPeerStat, 0) for _, storeID := range f.getAllStoreIDs(region) { if region.GetStorePeer(storeID) == nil { item := f.getOldHotPeerStat(regionID, storeID) if item != nil { - item.needDelete = true + item.actionType = Remove items = append(items, item) } } @@ -152,10 +154,10 @@ func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerSt return items } -// CheckPeerFlow checks the flow information of a peer. -// Notice: CheckPeerFlow couldn't be used concurrently. -// CheckPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { +// checkPeerFlow checks the flow information of a peer. +// Notice: checkPeerFlow couldn't be used concurrently. +// checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. +func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { interval := peer.GetInterval() if Denoising && interval < HotRegionReportMinInterval { return nil @@ -167,7 +169,6 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf for i := range deltaLoads { loads[i] = deltaLoads[i] / float64(interval) } - justTransferLeader := f.justTransferLeader(region) oldItem := f.getOldHotPeerStat(region.GetID(), storeID) thresholds := f.calcHotThresholds(storeID) regionPeers := region.GetPeers() @@ -176,18 +177,17 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf peers = append(peers, peer.StoreId) } newItem := &HotPeerStat{ - StoreID: storeID, - RegionID: region.GetID(), - Kind: f.kind, - Loads: loads, - LastUpdateTime: time.Now(), - needDelete: false, - isLeader: region.GetLeader().GetStoreId() == storeID, - justTransferLeader: justTransferLeader, - interval: interval, - peers: peers, - thresholds: thresholds, - source: direct, + StoreID: storeID, + RegionID: region.GetID(), + Kind: f.kind, + Loads: loads, + LastUpdateTime: time.Now(), + isLeader: region.GetLeader().GetStoreId() == storeID, + interval: interval, + peers: peers, + actionType: Update, + thresholds: thresholds, + source: direct, } if oldItem == nil { @@ -205,11 +205,11 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf } } } - return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) + return f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) } -// CheckColdPeer checks the collect the un-heartbeat peer and maintain it. -func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]struct{}, interval uint64) (ret []*HotPeerStat) { +// checkColdPeer checks the collect the un-heartbeat peer and maintain it. +func (f *hotPeerCache) checkColdPeer(storeID uint64, reportRegions map[uint64]*core.RegionInfo, interval uint64) (ret []*HotPeerStat) { if Denoising && interval < HotRegionReportMinInterval { return } @@ -218,7 +218,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st return } for regionID := range previousHotStat { - if _, ok := reportRegions[regionID]; !ok { + if region, ok := reportRegions[regionID]; !ok { oldItem := f.getOldHotPeerStat(regionID, storeID) if oldItem == nil { continue @@ -228,21 +228,20 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st RegionID: regionID, Kind: f.kind, // use oldItem.thresholds to make the newItem won't affect the threshold - Loads: oldItem.thresholds, - LastUpdateTime: time.Now(), - needDelete: false, - isLeader: oldItem.isLeader, - justTransferLeader: oldItem.justTransferLeader, - interval: interval, - peers: oldItem.peers, - thresholds: oldItem.thresholds, - inCold: true, + Loads: oldItem.thresholds, + LastUpdateTime: time.Now(), + isLeader: oldItem.isLeader, + interval: interval, + peers: oldItem.peers, + actionType: Update, + thresholds: oldItem.thresholds, + inCold: true, } deltaLoads := make([]float64, RegionStatCount) for i, loads := range oldItem.thresholds { deltaLoads[i] = loads * float64(interval) } - stat := f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) + stat := f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) if stat != nil { ret = append(ret, stat) } @@ -251,7 +250,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st return } -func (f *hotPeerCache) CollectMetrics(typ string) { +func (f *hotPeerCache) collectMetrics(typ string) { for storeID, peers := range f.peersOfStore { store := storeTag(storeID) thresholds := f.calcHotThresholds(storeID) @@ -338,6 +337,9 @@ func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool } func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo) bool { + if region == nil { + return false + } ids, ok := f.storesOfRegion[region.GetID()] if ok { for storeID := range ids { @@ -379,10 +381,10 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second) } -func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { +func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { regionStats := f.kind.RegionStats() if oldItem == nil { - return f.updateNewHotPeerStat(newItem, deltaLoads, interval) + return f.updateNewHotPeerStat(regionStats, newItem, deltaLoads, interval) } if newItem.source == adopt { @@ -395,14 +397,14 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa newItem.allowAdopt = oldItem.allowAdopt } - if newItem.justTransferLeader { + if f.justTransferLeader(region) { newItem.lastTransferLeaderTime = time.Now() // skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate // maintain anticount and hotdegree to avoid store threshold and hot peer are unstable. // For write stat, as the stat is send by region heartbeat, the first heartbeat will be skipped. // For read stat, as the stat is send by store heartbeat, the first heartbeat won't be skipped. if newItem.Kind == Write { - inheritItemDegree(newItem, oldItem) + inheritItem(newItem, oldItem) return newItem } } else { @@ -416,7 +418,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa isFull := newItem.rollingLoads[0].isFull() // The intervals of dims are the same, so it is only necessary to determine whether any of them if !isFull { // not update hot degree and anti count - inheritItemDegree(newItem, oldItem) + inheritItem(newItem, oldItem) } else { // If item is inCold, it means the pd didn't recv this item in the store heartbeat, // thus we make it colder @@ -424,13 +426,13 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa coldItem(newItem, oldItem) } else { if f.isOldColdPeer(oldItem, newItem.StoreID) { - if newItem.isFullAndHot() { - initItemDegree(newItem) + if newItem.isHot() { + initItem(newItem) } else { - newItem.needDelete = true + newItem.actionType = Remove } } else { - if newItem.isFullAndHot() { + if newItem.isHot() { hotItem(newItem, oldItem) } else { coldItem(newItem, oldItem) @@ -442,8 +444,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa return newItem } -func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { - regionStats := f.kind.RegionStats() +func (f *hotPeerCache) updateNewHotPeerStat(regionStats []RegionStatKind, newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { if interval == 0 { return nil } @@ -454,9 +455,9 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f return nil } if interval.Seconds() >= float64(f.reportIntervalSecs) { - initItemDegree(newItem) + initItem(newItem) } - newItem.isNew = true + newItem.actionType = Add newItem.rollingLoads = make([]*dimStat, len(regionStats)) for i, k := range regionStats { ds := newDimStat(k, time.Duration(newItem.hotStatReportInterval())*time.Second) @@ -522,7 +523,7 @@ func coldItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree - 1 newItem.AntiCount = oldItem.AntiCount - 1 if newItem.AntiCount <= 0 { - newItem.needDelete = true + newItem.actionType = Remove } else { newItem.allowAdopt = true } @@ -537,7 +538,7 @@ func hotItem(newItem, oldItem *HotPeerStat) { } } -func initItemDegree(item *HotPeerStat) { +func initItem(item *HotPeerStat) { item.HotDegree = 1 item.AntiCount = hotRegionAntiCount item.allowAdopt = true @@ -546,7 +547,7 @@ func initItemDegree(item *HotPeerStat) { } } -func inheritItemDegree(newItem, oldItem *HotPeerStat) { +func inheritItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree newItem.AntiCount = oldItem.AntiCount } diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index c3ce13f5fb5..3ef1c03ed12 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -72,17 +72,17 @@ type testCacheCase struct { kind RWType operator operator expect int - needDelete bool + actionType ActionType } func (t *testHotPeerCache) TestCache(c *C) { tests := []*testCacheCase{ - {Read, transferLeader, 3, false}, - {Read, movePeer, 4, true}, - {Read, addReplica, 4, false}, - {Write, transferLeader, 3, true}, - {Write, movePeer, 4, true}, - {Write, addReplica, 4, true}, + {Read, transferLeader, 3, Update}, + {Read, movePeer, 4, Remove}, + {Read, addReplica, 4, Update}, + {Write, transferLeader, 3, Remove}, + {Write, movePeer, 4, Remove}, + {Write, addReplica, 4, Remove}, } for _, t := range tests { testCache(c, t) @@ -97,13 +97,13 @@ func testCache(c *C, t *testCacheCase) { cache := NewHotPeerCache(t.kind) region := buildRegion(t.kind, 3, 60) checkAndUpdate(c, cache, region, defaultSize[t.kind]) - checkHit(c, cache, region, t.kind, false) // all peers are new + checkHit(c, cache, region, t.kind, Add) // all peers are new srcStore, region := schedule(c, t.operator, region, 10) res := checkAndUpdate(c, cache, region, t.expect) - checkHit(c, cache, region, t.kind, true) // hit cache + checkHit(c, cache, region, t.kind, Update) // hit cache if t.expect != defaultSize[t.kind] { - checkNeedDelete(c, res, srcStore, t.needDelete) + checkOp(c, res, srcStore, t.actionType) } } @@ -122,10 +122,10 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - res = append(res, cache.CollectExpiredItems(region)...) + res = append(res, cache.collectExpiredItems(region)...) for _, peer := range peers { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := cache.CheckPeerFlow(peerInfo, region) + item := cache.checkPeerFlow(peerInfo, region) if item != nil { res = append(res, item) } @@ -135,7 +135,7 @@ func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Pee func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { for _, p := range res { - cache.Update(p) + cache.update(p) } return res } @@ -168,7 +168,7 @@ func checkAndUpdateSkipOne(c *C, cache *hotPeerCache, region *core.RegionInfo, e return updateFlow(cache, res) } -func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, isHit bool) { +func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, actionType ActionType) { var peers []*metapb.Peer if kind == Read { peers = []*metapb.Peer{region.GetLeader()} @@ -178,14 +178,14 @@ func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, i for _, peer := range peers { item := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) c.Assert(item, NotNil) - c.Assert(item.isNew, Equals, !isHit) + c.Assert(item.actionType, Equals, actionType) } } -func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64, needDelete bool) { +func checkOp(c *C, ret []*HotPeerStat, storeID uint64, actionType ActionType) { for _, item := range ret { if item.StoreID == storeID { - c.Assert(item.needDelete, Equals, needDelete) + c.Assert(item.actionType, Equals, actionType) return } } @@ -296,55 +296,55 @@ func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { m := RegionHeartBeatReportInterval / StoreHeartBeatReportInterval // skip interval=0 - newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 0) + newItem := &HotPeerStat{actionType: Update, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{0.0, 0.0, 0.0}, 0) c.Check(newItem, IsNil) // new peer, interval is larger than report interval, but no hot - newItem = &HotPeerStat{needDelete: false, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second) + newItem = &HotPeerStat{actionType: Update, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second) c.Check(newItem, IsNil) // new peer, interval is less than report interval - newItem = &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = &HotPeerStat{actionType: Update, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem, NotNil) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is less than report interval oldItem := newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is less than report interval oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 2) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and cold oldItem = newItem newItem.thresholds = []float64{10.0, 10.0, 10.0} - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m-1) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) } c.Check(newItem.HotDegree, Less, 0) c.Check(newItem.AntiCount, Equals, 0) - c.Check(newItem.needDelete, IsTrue) + c.Check(newItem.actionType, Equals, Remove) } func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { @@ -369,7 +369,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold Kind: cache.kind, StoreID: storeID, RegionID: i, - needDelete: false, + actionType: Update, thresholds: thresholds, Loads: make([]float64, DimLen), } @@ -379,8 +379,8 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold if oldItem != nil && oldItem.rollingLoads[RegionReadBytes].isHot(thresholds[RegionReadBytes]) == true { break } - item := cache.updateHotPeerStat(newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) - cache.Update(item) + item := cache.updateHotPeerStat(nil, newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) + cache.update(item) } thresholds := cache.calcHotThresholds(storeID) if i < TopNN { @@ -493,13 +493,13 @@ func BenchmarkCheckRegionFlow(b *testing.B) { for i := 0; i < b.N; i++ { items := make([]*HotPeerStat, 0) for _, peerInfo := range peerInfos { - item := cache.CheckPeerFlow(peerInfo, region) + item := cache.checkPeerFlow(peerInfo, region) if item != nil { items = append(items, item) } } for _, ret := range items { - cache.Update(ret) + cache.update(ret) } } } diff --git a/server/statistics/kind.go b/server/statistics/kind.go index e5965fcbfcb..65ec300f7ac 100644 --- a/server/statistics/kind.go +++ b/server/statistics/kind.go @@ -147,3 +147,25 @@ func (k RWType) RegionStats() []RegionStatKind { } return nil } + +// ActionType indicates the action type for the stat item. +type ActionType int + +// Flags for action type. +const ( + Add ActionType = iota + Remove + Update +) + +func (t ActionType) String() string { + switch t { + case Add: + return "add" + case Remove: + return "remove" + case Update: + return "update" + } + return "unimplemented" +}