Skip to content

Commit

Permalink
server: add /operators/records API to list completed operators (tik…
Browse files Browse the repository at this point in the history
…v#4631)

close tikv#4630

Signed-off-by: bufferflies <1045931706@qq.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
bufferflies and ti-chi-bot authored Feb 16, 2022
1 parent 26c6682 commit 1b8f823
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 41 deletions.
27 changes: 27 additions & 0 deletions server/api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package api
import (
"net/http"
"strconv"
"time"

"github.com/gorilla/mux"
"github.com/tikv/pd/pkg/apiutil"
Expand Down Expand Up @@ -344,6 +345,32 @@ func (h *operatorHandler) Delete(w http.ResponseWriter, r *http.Request) {
h.r.JSON(w, http.StatusOK, "The pending operator is canceled.")
}

// @Tags operator
// @Summary lists the finished operators since the given timestamp in second.
// @Param from query integer false "From Unix timestamp"
// @Produce json
// @Success 200 {object} []operator.OpRecord
// @Failure 400 {string} string "The request is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/records [get]
func (h *operatorHandler) Records(w http.ResponseWriter, r *http.Request) {
var from time.Time
if fromStr := r.URL.Query()["from"]; len(fromStr) > 0 {
fromInt, err := strconv.ParseInt(fromStr[0], 10, 64)
if err != nil {
h.r.JSON(w, http.StatusBadRequest, err.Error())
return
}
from = time.Unix(fromInt, 0)
}
records, err := h.GetRecords(from)
if err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.r.JSON(w, http.StatusOK, records)
}

func parseStoreIDsAndPeerRole(ids interface{}, roles interface{}) (map[uint64]placement.PeerRoleType, bool) {
items, ok := ids.([]interface{})
if !ok {
Expand Down
14 changes: 14 additions & 0 deletions server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -79,6 +81,9 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {
regionURL := fmt.Sprintf("%s/operators/%d", s.urlPrefix, region.GetId())
operator := mustReadURL(c, regionURL)
c.Assert(strings.Contains(operator, "operator not found"), IsTrue)
recordURL := fmt.Sprintf("%s/operators/records?from=%s", s.urlPrefix, strconv.FormatInt(time.Now().Unix(), 10))
records := mustReadURL(c, recordURL)
c.Assert(strings.Contains(records, "operator not found"), IsTrue)

mustPutStore(c, s.svr, 3, metapb.StoreState_Up, nil)
err := postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"add-peer", "region_id": 1, "store_id": 3}`))
Expand All @@ -89,6 +94,8 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {

_, err = doDelete(testDialClient, regionURL)
c.Assert(err, IsNil)
records = mustReadURL(c, recordURL)
c.Assert(strings.Contains(records, "admin-add-peer {add peer: store [3]}"), IsTrue)

err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"remove-peer", "region_id": 1, "store_id": 2}`))
c.Assert(err, IsNil)
Expand All @@ -98,6 +105,8 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {

_, err = doDelete(testDialClient, regionURL)
c.Assert(err, IsNil)
records = mustReadURL(c, recordURL)
c.Assert(strings.Contains(records, "admin-remove-peer {rm peer: store [2]}"), IsTrue)

mustPutStore(c, s.svr, 4, metapb.StoreState_Up, nil)
err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"add-learner", "region_id": 1, "store_id": 4}`))
Expand All @@ -114,6 +123,11 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {
c.Assert(err, NotNil)
err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [1, 2, 3]}`))
c.Assert(err, NotNil)

// Fail to get operator if from is latest.
time.Sleep(time.Second)
records = mustReadURL(c, fmt.Sprintf("%s/operators/records?from=%s", s.urlPrefix, strconv.FormatInt(time.Now().Unix(), 10)))
c.Assert(strings.Contains(records, "operator not found"), IsTrue)
}

func (s *testOperatorSuite) TestMergeRegionOperator(c *C) {
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
operatorHandler := newOperatorHandler(handler, rd)
registerFunc(apiRouter, "GetOperators", "/operators", operatorHandler.List, setMethods("GET"))
registerFunc(apiRouter, "SetOperators", "/operators", operatorHandler.Post, setMethods("POST"))
registerFunc(apiRouter, "GetOperatorRecords", "/operators/records", operatorHandler.Records, setMethods("GET"))
registerFunc(apiRouter, "GetRegionOperator", "/operators/{region_id}", operatorHandler.Get, setMethods("GET"))
registerFunc(apiRouter, "DeleteRegionOperator", "/operators/{region_id}", operatorHandler.Delete, setMethods("DELETE"))

Expand Down
1 change: 0 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
case <-ticker.C:
c.checkStores()
c.collectMetrics()
c.coordinator.opController.PruneHistory()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,8 +1410,8 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR
return &pdpb.GetOperatorResponse{
Header: s.header(),
RegionId: requestID,
Desc: []byte(r.Op.Desc()),
Kind: []byte(r.Op.Kind().String()),
Desc: []byte(r.Desc()),
Kind: []byte(r.Kind().String()),
Status: r.Status,
}, nil
}
Expand Down
13 changes: 13 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,19 @@ func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) {
return c.GetHistory(start), nil
}

// GetRecords returns finished operators since start.
func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) {
c, err := h.GetOperatorController()
if err != nil {
return nil, err
}
records := c.GetRecords(from)
if len(records) == 0 {
return nil, ErrOperatorNotFound
}
return records, nil
}

// SetAllStoresLimit is used to set limit of all stores.
func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error {
c, err := h.GetRaftCluster()
Expand Down
31 changes: 31 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,37 @@ func (o *Operator) History() []OpHistory {
return histories
}

// OpRecord is used to log and visualize completed operators.
type OpRecord struct {
*Operator
FinishTime time.Time
duration time.Duration
}

func (o *OpRecord) String() string {
return fmt.Sprintf("%s (finishAt:%v, duration:%v)", o.Operator.String(), o.FinishTime, o.duration)
}

// MarshalJSON returns the status of operator as a JSON string
func (o *OpRecord) MarshalJSON() ([]byte, error) {
return []byte(`"` + o.String() + `"`), nil
}

// Record transfers the operator to OpRecord.
func (o *Operator) Record(finishTime time.Time) *OpRecord {
step := atomic.LoadInt32(&o.currentStep)
record := &OpRecord{
Operator: o,
FinishTime: finishTime,
}
start := o.GetStartTime()
if o.Status() != SUCCESS && 0 < step && int(step-1) < len(o.stepsTime) {
start = time.Unix(0, o.stepsTime[int(step-1)])
}
record.duration = finishTime.Sub(start)
return record
}

// GetAdditionalInfo returns additional info with string
func (o *Operator) GetAdditionalInfo() string {
if len(o.AdditionalInfos) != 0 {
Expand Down
9 changes: 9 additions & 0 deletions server/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,12 @@ func (s *testOperatorSuite) TestSchedulerKind(c *C) {
c.Assert(v.op.SchedulerKind(), Equals, v.expect)
}
}

func (s *testOperatorSuite) TestRecord(c *C) {
operator := s.newTestOperator(1, OpLeader, AddLearner{ToStore: 1, PeerID: 1}, RemovePeer{FromStore: 1, PeerID: 1})
now := time.Now()
time.Sleep(time.Second)
ob := operator.Record(now)
c.Assert(ob.FinishTime, Equals, now)
c.Assert(ob.duration.Seconds(), Greater, time.Second.Seconds())
}
61 changes: 24 additions & 37 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package schedule

import (
"container/heap"
"container/list"
"context"
"fmt"
"strconv"
Expand Down Expand Up @@ -45,7 +44,6 @@ const (
)

var (
historyKeepTime = 5 * time.Minute
slowNotifyInterval = 5 * time.Second
fastNotifyInterval = 2 * time.Second
// PushOperatorTickInterval is the interval try to push the operator.
Expand All @@ -64,7 +62,6 @@ type OperatorController struct {
operators map[uint64]*operator.Operator
hbStreams *hbstream.HeartbeatStreams
fastOperators *cache.TTLUint64
histories *list.List
counts map[operator.OpKind]uint64
opRecords *OperatorRecords
wop WaitingOperator
Expand All @@ -79,7 +76,6 @@ func NewOperatorController(ctx context.Context, cluster Cluster, hbStreams *hbst
cluster: cluster,
operators: make(map[uint64]*operator.Operator),
hbStreams: hbStreams,
histories: list.New(),
fastOperators: cache.NewIDTTL(ctx, time.Minute, FastOperatorFinishTime),
counts: make(map[operator.OpKind]uint64),
opRecords: NewOperatorRecords(ctx),
Expand Down Expand Up @@ -114,7 +110,6 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
// The operator status should be STARTED.
// Check will call CheckSuccess and CheckTimeout.
step := op.Check(region)

switch op.Status() {
case operator.STARTED:
operatorCounter.WithLabelValues(op.Desc(), "check").Inc()
Expand All @@ -123,7 +118,6 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
}
oc.SendScheduleCommand(region, step, source)
case operator.SUCCESS:
oc.pushHistory(op)
if oc.RemoveOperator(op) {
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-success").Inc()
oc.PromoteWaitingOperator()
Expand Down Expand Up @@ -727,43 +721,34 @@ func addLearnerNode(id, storeID uint64) *pdpb.RegionHeartbeatResponse {
}
}

func (oc *OperatorController) pushHistory(op *operator.Operator) {
oc.Lock()
defer oc.Unlock()
for _, h := range op.History() {
oc.histories.PushFront(h)
}
}

func (oc *OperatorController) pushFastOperator(op *operator.Operator) {
oc.fastOperators.Put(op.RegionID(), op)
}

// PruneHistory prunes a part of operators' history.
func (oc *OperatorController) PruneHistory() {
oc.Lock()
defer oc.Unlock()
p := oc.histories.Back()
for p != nil && time.Since(p.Value.(operator.OpHistory).FinishTime) > historyKeepTime {
prev := p.Prev()
oc.histories.Remove(p)
p = prev
// GetRecords gets operators' records.
func (oc *OperatorController) GetRecords(from time.Time) []*operator.OpRecord {
records := make([]*operator.OpRecord, 0, oc.opRecords.ttl.Len())
for _, id := range oc.opRecords.ttl.GetAllID() {
op := oc.opRecords.Get(id)
if op == nil || op.FinishTime.Before(from) {
continue
}
records = append(records, op.Record(op.FinishTime))
}
return records
}

// GetHistory gets operators' history.
func (oc *OperatorController) GetHistory(start time.Time) []operator.OpHistory {
oc.RLock()
defer oc.RUnlock()
histories := make([]operator.OpHistory, 0, oc.histories.Len())
for p := oc.histories.Front(); p != nil; p = p.Next() {
history := p.Value.(operator.OpHistory)
if history.FinishTime.Before(start) {
break
history := make([]operator.OpHistory, 0, oc.opRecords.ttl.Len())
for _, id := range oc.opRecords.ttl.GetAllID() {
op := oc.opRecords.Get(id)
if op == nil || op.FinishTime.Before(start) {
continue
}
histories = append(histories, history)
history = append(history, op.History()...)
}
return histories
return history
}

// updateCounts updates resource counts using current pending operators.
Expand Down Expand Up @@ -847,21 +832,23 @@ func (oc *OperatorController) SetOperator(op *operator.Operator) {

// OperatorWithStatus records the operator and its status.
type OperatorWithStatus struct {
Op *operator.Operator
Status pdpb.OperatorStatus
*operator.Operator
Status pdpb.OperatorStatus
FinishTime time.Time
}

// NewOperatorWithStatus creates an OperatorStatus from an operator.
func NewOperatorWithStatus(op *operator.Operator) *OperatorWithStatus {
return &OperatorWithStatus{
Op: op,
Status: operator.OpStatusToPDPB(op.Status()),
Operator: op,
Status: operator.OpStatusToPDPB(op.Status()),
FinishTime: time.Now(),
}
}

// MarshalJSON returns the status of operator as a JSON string
func (o *OperatorWithStatus) MarshalJSON() ([]byte, error) {
return []byte(`"` + fmt.Sprintf("status: %s, operator: %s", o.Status.String(), o.Op.String()) + `"`), nil
return []byte(`"` + fmt.Sprintf("status: %s, operator: %s", o.Status.String(), o.Operator.String()) + `"`), nil
}

// OperatorRecords remains the operator and its status for a while.
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func newRegionInfo(id uint64, startKey, endKey string, size, keys int64, leader
func checkRemoveOperatorSuccess(c *C, oc *OperatorController, op *operator.Operator) {
c.Assert(oc.RemoveOperator(op), IsTrue)
c.Assert(op.IsEnd(), IsTrue)
c.Assert(oc.GetOperatorStatus(op.RegionID()).Op, DeepEquals, op)
c.Assert(oc.GetOperatorStatus(op.RegionID()).Operator, DeepEquals, op)
}

func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) {
Expand Down
6 changes: 6 additions & 0 deletions tests/pdctl/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package operator_test

import (
"context"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -179,8 +180,13 @@ func (s *operatorTestSuite) TestOperator(c *C) {
output, e := pdctl.ExecuteCommand(cmd, testCase.show...)
c.Assert(e, IsNil)
c.Assert(strings.Contains(string(output), testCase.expect), IsTrue)
t := time.Now()
_, e = pdctl.ExecuteCommand(cmd, testCase.reset...)
c.Assert(e, IsNil)
historyCmd := []string{"-u", pdAddr, "operator", "history", strconv.FormatInt(t.Unix(), 10)}
records, e := pdctl.ExecuteCommand(cmd, historyCmd...)
c.Assert(e, IsNil)
c.Assert(strings.Contains(string(records), "admin"), IsTrue)
}

// operator add merge-region <source_region_id> <target_region_id>
Expand Down
Loading

0 comments on commit 1b8f823

Please sign in to comment.