From 1b8f82378ee7005bd90d3b6c1bb043a7220b088e Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Wed, 16 Feb 2022 16:03:39 +0800 Subject: [PATCH] server: add `/operators/records` API to list completed operators (#4631) close tikv/pd#4630 Signed-off-by: bufferflies <1045931706@qq.com> Co-authored-by: Ti Chi Robot --- server/api/operator.go | 27 +++++++++ server/api/operator_test.go | 14 +++++ server/api/router.go | 1 + server/cluster/cluster.go | 1 - server/grpc_service.go | 4 +- server/handler.go | 13 +++++ server/schedule/operator/operator.go | 31 +++++++++++ server/schedule/operator/operator_test.go | 9 +++ server/schedule/operator_controller.go | 61 ++++++++------------- server/schedule/operator_controller_test.go | 2 +- tests/pdctl/operator/operator_test.go | 6 ++ tools/pd-ctl/pdctl/command/operator.go | 38 +++++++++++++ 12 files changed, 166 insertions(+), 41 deletions(-) diff --git a/server/api/operator.go b/server/api/operator.go index 91d1d9d2762..d65abadf6a9 100644 --- a/server/api/operator.go +++ b/server/api/operator.go @@ -17,6 +17,7 @@ package api import ( "net/http" "strconv" + "time" "github.com/gorilla/mux" "github.com/tikv/pd/pkg/apiutil" @@ -344,6 +345,32 @@ func (h *operatorHandler) Delete(w http.ResponseWriter, r *http.Request) { h.r.JSON(w, http.StatusOK, "The pending operator is canceled.") } +// @Tags operator +// @Summary lists the finished operators since the given timestamp in second. +// @Param from query integer false "From Unix timestamp" +// @Produce json +// @Success 200 {object} []operator.OpRecord +// @Failure 400 {string} string "The request is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /operators/records [get] +func (h *operatorHandler) Records(w http.ResponseWriter, r *http.Request) { + var from time.Time + if fromStr := r.URL.Query()["from"]; len(fromStr) > 0 { + fromInt, err := strconv.ParseInt(fromStr[0], 10, 64) + if err != nil { + h.r.JSON(w, http.StatusBadRequest, err.Error()) + return + } + from = time.Unix(fromInt, 0) + } + records, err := h.GetRecords(from) + if err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.r.JSON(w, http.StatusOK, records) +} + func parseStoreIDsAndPeerRole(ids interface{}, roles interface{}) (map[uint64]placement.PeerRoleType, bool) { items, ok := ids.([]interface{}) if !ok { diff --git a/server/api/operator_test.go b/server/api/operator_test.go index f05b7e2ede2..a535af97aae 100644 --- a/server/api/operator_test.go +++ b/server/api/operator_test.go @@ -19,7 +19,9 @@ import ( "errors" "fmt" "io" + "strconv" "strings" + "time" . "github.com/pingcap/check" "github.com/pingcap/failpoint" @@ -79,6 +81,9 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) { regionURL := fmt.Sprintf("%s/operators/%d", s.urlPrefix, region.GetId()) operator := mustReadURL(c, regionURL) c.Assert(strings.Contains(operator, "operator not found"), IsTrue) + recordURL := fmt.Sprintf("%s/operators/records?from=%s", s.urlPrefix, strconv.FormatInt(time.Now().Unix(), 10)) + records := mustReadURL(c, recordURL) + c.Assert(strings.Contains(records, "operator not found"), IsTrue) mustPutStore(c, s.svr, 3, metapb.StoreState_Up, nil) err := postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"add-peer", "region_id": 1, "store_id": 3}`)) @@ -89,6 +94,8 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) { _, err = doDelete(testDialClient, regionURL) c.Assert(err, IsNil) + records = mustReadURL(c, recordURL) + c.Assert(strings.Contains(records, "admin-add-peer {add peer: store [3]}"), IsTrue) err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"remove-peer", "region_id": 1, "store_id": 2}`)) c.Assert(err, IsNil) @@ -98,6 +105,8 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) { _, err = doDelete(testDialClient, regionURL) c.Assert(err, IsNil) + records = mustReadURL(c, recordURL) + c.Assert(strings.Contains(records, "admin-remove-peer {rm peer: store [2]}"), IsTrue) mustPutStore(c, s.svr, 4, metapb.StoreState_Up, nil) err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"add-learner", "region_id": 1, "store_id": 4}`)) @@ -114,6 +123,11 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) { c.Assert(err, NotNil) err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [1, 2, 3]}`)) c.Assert(err, NotNil) + + // Fail to get operator if from is latest. + time.Sleep(time.Second) + records = mustReadURL(c, fmt.Sprintf("%s/operators/records?from=%s", s.urlPrefix, strconv.FormatInt(time.Now().Unix(), 10))) + c.Assert(strings.Contains(records, "operator not found"), IsTrue) } func (s *testOperatorSuite) TestMergeRegionOperator(c *C) { diff --git a/server/api/router.go b/server/api/router.go index 3e6543b8e9c..9c8729bb324 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -150,6 +150,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { operatorHandler := newOperatorHandler(handler, rd) registerFunc(apiRouter, "GetOperators", "/operators", operatorHandler.List, setMethods("GET")) registerFunc(apiRouter, "SetOperators", "/operators", operatorHandler.Post, setMethods("POST")) + registerFunc(apiRouter, "GetOperatorRecords", "/operators/records", operatorHandler.Records, setMethods("GET")) registerFunc(apiRouter, "GetRegionOperator", "/operators/{region_id}", operatorHandler.Get, setMethods("GET")) registerFunc(apiRouter, "DeleteRegionOperator", "/operators/{region_id}", operatorHandler.Delete, setMethods("DELETE")) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 487d908817e..0cb971096cd 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -327,7 +327,6 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) { case <-ticker.C: c.checkStores() c.collectMetrics() - c.coordinator.opController.PruneHistory() } } } diff --git a/server/grpc_service.go b/server/grpc_service.go index fec678b2820..7f8b5d999e1 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1410,8 +1410,8 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR return &pdpb.GetOperatorResponse{ Header: s.header(), RegionId: requestID, - Desc: []byte(r.Op.Desc()), - Kind: []byte(r.Op.Kind().String()), + Desc: []byte(r.Desc()), + Kind: []byte(r.Kind().String()), Status: r.Status, }, nil } diff --git a/server/handler.go b/server/handler.go index 6fb64d4df95..dc8668b3536 100644 --- a/server/handler.go +++ b/server/handler.go @@ -457,6 +457,19 @@ func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) { return c.GetHistory(start), nil } +// GetRecords returns finished operators since start. +func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) { + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + records := c.GetRecords(from) + if len(records) == 0 { + return nil, ErrOperatorNotFound + } + return records, nil +} + // SetAllStoresLimit is used to set limit of all stores. func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error { c, err := h.GetRaftCluster() diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index 0917206f6c4..88906916e31 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -357,6 +357,37 @@ func (o *Operator) History() []OpHistory { return histories } +// OpRecord is used to log and visualize completed operators. +type OpRecord struct { + *Operator + FinishTime time.Time + duration time.Duration +} + +func (o *OpRecord) String() string { + return fmt.Sprintf("%s (finishAt:%v, duration:%v)", o.Operator.String(), o.FinishTime, o.duration) +} + +// MarshalJSON returns the status of operator as a JSON string +func (o *OpRecord) MarshalJSON() ([]byte, error) { + return []byte(`"` + o.String() + `"`), nil +} + +// Record transfers the operator to OpRecord. +func (o *Operator) Record(finishTime time.Time) *OpRecord { + step := atomic.LoadInt32(&o.currentStep) + record := &OpRecord{ + Operator: o, + FinishTime: finishTime, + } + start := o.GetStartTime() + if o.Status() != SUCCESS && 0 < step && int(step-1) < len(o.stepsTime) { + start = time.Unix(0, o.stepsTime[int(step-1)]) + } + record.duration = finishTime.Sub(start) + return record +} + // GetAdditionalInfo returns additional info with string func (o *Operator) GetAdditionalInfo() string { if len(o.AdditionalInfos) != 0 { diff --git a/server/schedule/operator/operator_test.go b/server/schedule/operator/operator_test.go index db01a047efb..791d9bdd1cd 100644 --- a/server/schedule/operator/operator_test.go +++ b/server/schedule/operator/operator_test.go @@ -418,3 +418,12 @@ func (s *testOperatorSuite) TestSchedulerKind(c *C) { c.Assert(v.op.SchedulerKind(), Equals, v.expect) } } + +func (s *testOperatorSuite) TestRecord(c *C) { + operator := s.newTestOperator(1, OpLeader, AddLearner{ToStore: 1, PeerID: 1}, RemovePeer{FromStore: 1, PeerID: 1}) + now := time.Now() + time.Sleep(time.Second) + ob := operator.Record(now) + c.Assert(ob.FinishTime, Equals, now) + c.Assert(ob.duration.Seconds(), Greater, time.Second.Seconds()) +} diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 968ed80fa07..83473c1e66d 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -16,7 +16,6 @@ package schedule import ( "container/heap" - "container/list" "context" "fmt" "strconv" @@ -45,7 +44,6 @@ const ( ) var ( - historyKeepTime = 5 * time.Minute slowNotifyInterval = 5 * time.Second fastNotifyInterval = 2 * time.Second // PushOperatorTickInterval is the interval try to push the operator. @@ -64,7 +62,6 @@ type OperatorController struct { operators map[uint64]*operator.Operator hbStreams *hbstream.HeartbeatStreams fastOperators *cache.TTLUint64 - histories *list.List counts map[operator.OpKind]uint64 opRecords *OperatorRecords wop WaitingOperator @@ -79,7 +76,6 @@ func NewOperatorController(ctx context.Context, cluster Cluster, hbStreams *hbst cluster: cluster, operators: make(map[uint64]*operator.Operator), hbStreams: hbStreams, - histories: list.New(), fastOperators: cache.NewIDTTL(ctx, time.Minute, FastOperatorFinishTime), counts: make(map[operator.OpKind]uint64), opRecords: NewOperatorRecords(ctx), @@ -114,7 +110,6 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { // The operator status should be STARTED. // Check will call CheckSuccess and CheckTimeout. step := op.Check(region) - switch op.Status() { case operator.STARTED: operatorCounter.WithLabelValues(op.Desc(), "check").Inc() @@ -123,7 +118,6 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { } oc.SendScheduleCommand(region, step, source) case operator.SUCCESS: - oc.pushHistory(op) if oc.RemoveOperator(op) { operatorWaitCounter.WithLabelValues(op.Desc(), "promote-success").Inc() oc.PromoteWaitingOperator() @@ -727,43 +721,34 @@ func addLearnerNode(id, storeID uint64) *pdpb.RegionHeartbeatResponse { } } -func (oc *OperatorController) pushHistory(op *operator.Operator) { - oc.Lock() - defer oc.Unlock() - for _, h := range op.History() { - oc.histories.PushFront(h) - } -} - func (oc *OperatorController) pushFastOperator(op *operator.Operator) { oc.fastOperators.Put(op.RegionID(), op) } -// PruneHistory prunes a part of operators' history. -func (oc *OperatorController) PruneHistory() { - oc.Lock() - defer oc.Unlock() - p := oc.histories.Back() - for p != nil && time.Since(p.Value.(operator.OpHistory).FinishTime) > historyKeepTime { - prev := p.Prev() - oc.histories.Remove(p) - p = prev +// GetRecords gets operators' records. +func (oc *OperatorController) GetRecords(from time.Time) []*operator.OpRecord { + records := make([]*operator.OpRecord, 0, oc.opRecords.ttl.Len()) + for _, id := range oc.opRecords.ttl.GetAllID() { + op := oc.opRecords.Get(id) + if op == nil || op.FinishTime.Before(from) { + continue + } + records = append(records, op.Record(op.FinishTime)) } + return records } // GetHistory gets operators' history. func (oc *OperatorController) GetHistory(start time.Time) []operator.OpHistory { - oc.RLock() - defer oc.RUnlock() - histories := make([]operator.OpHistory, 0, oc.histories.Len()) - for p := oc.histories.Front(); p != nil; p = p.Next() { - history := p.Value.(operator.OpHistory) - if history.FinishTime.Before(start) { - break + history := make([]operator.OpHistory, 0, oc.opRecords.ttl.Len()) + for _, id := range oc.opRecords.ttl.GetAllID() { + op := oc.opRecords.Get(id) + if op == nil || op.FinishTime.Before(start) { + continue } - histories = append(histories, history) + history = append(history, op.History()...) } - return histories + return history } // updateCounts updates resource counts using current pending operators. @@ -847,21 +832,23 @@ func (oc *OperatorController) SetOperator(op *operator.Operator) { // OperatorWithStatus records the operator and its status. type OperatorWithStatus struct { - Op *operator.Operator - Status pdpb.OperatorStatus + *operator.Operator + Status pdpb.OperatorStatus + FinishTime time.Time } // NewOperatorWithStatus creates an OperatorStatus from an operator. func NewOperatorWithStatus(op *operator.Operator) *OperatorWithStatus { return &OperatorWithStatus{ - Op: op, - Status: operator.OpStatusToPDPB(op.Status()), + Operator: op, + Status: operator.OpStatusToPDPB(op.Status()), + FinishTime: time.Now(), } } // MarshalJSON returns the status of operator as a JSON string func (o *OperatorWithStatus) MarshalJSON() ([]byte, error) { - return []byte(`"` + fmt.Sprintf("status: %s, operator: %s", o.Status.String(), o.Op.String()) + `"`), nil + return []byte(`"` + fmt.Sprintf("status: %s, operator: %s", o.Status.String(), o.Operator.String()) + `"`), nil } // OperatorRecords remains the operator and its status for a while. diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index e768ae68f86..294b3577011 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -608,7 +608,7 @@ func newRegionInfo(id uint64, startKey, endKey string, size, keys int64, leader func checkRemoveOperatorSuccess(c *C, oc *OperatorController, op *operator.Operator) { c.Assert(oc.RemoveOperator(op), IsTrue) c.Assert(op.IsEnd(), IsTrue) - c.Assert(oc.GetOperatorStatus(op.RegionID()).Op, DeepEquals, op) + c.Assert(oc.GetOperatorStatus(op.RegionID()).Operator, DeepEquals, op) } func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) { diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 69eca681be0..d6eec639770 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -16,6 +16,7 @@ package operator_test import ( "context" + "strconv" "strings" "testing" "time" @@ -179,8 +180,13 @@ func (s *operatorTestSuite) TestOperator(c *C) { output, e := pdctl.ExecuteCommand(cmd, testCase.show...) c.Assert(e, IsNil) c.Assert(strings.Contains(string(output), testCase.expect), IsTrue) + t := time.Now() _, e = pdctl.ExecuteCommand(cmd, testCase.reset...) c.Assert(e, IsNil) + historyCmd := []string{"-u", pdAddr, "operator", "history", strconv.FormatInt(t.Unix(), 10)} + records, e := pdctl.ExecuteCommand(cmd, historyCmd...) + c.Assert(e, IsNil) + c.Assert(strings.Contains(string(records), "admin"), IsTrue) } // operator add merge-region diff --git a/tools/pd-ctl/pdctl/command/operator.go b/tools/pd-ctl/pdctl/command/operator.go index 4971b4930b6..f4652fc78f8 100644 --- a/tools/pd-ctl/pdctl/command/operator.go +++ b/tools/pd-ctl/pdctl/command/operator.go @@ -33,6 +33,19 @@ var ( } ) +const ( + // HistoryExample history command example. + HistoryExample = ` + If the timestamp is right, the the output will be like: + + [ + "admin-remove-peer {rm peer: store [2]} (kind:admin,region, region:1(1,1), createAt:2022-02-15 15:11:14.974435 +0800 + CST m=+0.663988396, startAt:2022-02-15 15:11:14.974485 +0800 CST m=+0.664038719, currentStep:0, size:1, steps:[remove peer on store 2]) + (finishAt:2022-02-15 15:11:14.975531 +0800 CST m=+0.665084434, duration:1.045715ms)" + ] +` +) + // NewOperatorCommand returns a operator command. func NewOperatorCommand() *cobra.Command { c := &cobra.Command{ @@ -43,6 +56,7 @@ func NewOperatorCommand() *cobra.Command { c.AddCommand(NewCheckOperatorCommand()) c.AddCommand(NewAddOperatorCommand()) c.AddCommand(NewRemoveOperatorCommand()) + c.AddCommand(NewHistoryOperatorCommand()) return c } @@ -427,6 +441,30 @@ func removeOperatorCommandFunc(cmd *cobra.Command, args []string) { cmd.Println("Success!") } +// NewHistoryOperatorCommand returns a command to history finished operators. +func NewHistoryOperatorCommand() *cobra.Command { + c := &cobra.Command{ + Use: "history ", + Short: "list all finished operators since start, start is a timestamp", + Run: historyOperatorCommandFunc, + Example: HistoryExample, + } + return c +} + +func historyOperatorCommandFunc(cmd *cobra.Command, args []string) { + path := operatorsPrefix + "/" + "records" + if len(args) == 1 { + path += "?from=" + args[0] + } + records, err := doRequest(cmd, path, http.MethodGet, http.Header{}) + if err != nil { + cmd.Println(err) + return + } + cmd.Println(records) +} + func parseUint64s(args []string) ([]uint64, error) { results := make([]uint64, 0, len(args)) for _, arg := range args {