Skip to content

Commit

Permalink
pkg: add progress indicator (#4682)
Browse files Browse the repository at this point in the history
ref #4640

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Apr 14, 2022
1 parent b7b785f commit 07a6bcd
Show file tree
Hide file tree
Showing 10 changed files with 590 additions and 24 deletions.
122 changes: 122 additions & 0 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package progress

import (
"math"
"sync"
"time"
)

// Manager is used to maintain the progresses we care about.
type Manager struct {
sync.RWMutex
progesses map[string]*progressIndicator
}

// NewManager creates a new Manager.
func NewManager() *Manager {
return &Manager{
progesses: make(map[string]*progressIndicator),
}
}

// progressIndicator reflects a specified progress.
type progressIndicator struct {
total float64
left float64
startTime time.Time
speedPerSec float64
}

// Reset resets the progress manager.
func (m *Manager) Reset() {
m.Lock()
defer m.Unlock()

m.progesses = make(map[string]*progressIndicator)
}

// AddProgress adds a progress into manager if it doesn't exist.
func (m *Manager) AddProgress(progress string, total float64) (exist bool) {
m.Lock()
defer m.Unlock()

if _, exist = m.progesses[progress]; !exist {
m.progesses[progress] = &progressIndicator{
total: total,
left: total,
startTime: time.Now(),
}
}
return
}

// UpdateProgress updates a progress into manager if it doesn't exist.
func (m *Manager) UpdateProgress(progress string, left float64) {
m.Lock()
defer m.Unlock()

if p, exist := m.progesses[progress]; exist {
p.left = left
if p.total < left {
p.total = left
}
p.speedPerSec = (p.total - p.left) / time.Since(p.startTime).Seconds()
}
}

// RemoveProgress removes a progress from manager.
func (m *Manager) RemoveProgress(progress string) (exist bool) {
m.Lock()
defer m.Unlock()

if _, exist = m.progesses[progress]; exist {
delete(m.progesses, progress)
return
}
return
}

// GetProgresses gets progresses according to the filter.
func (m *Manager) GetProgresses(filter func(p string) bool) []string {
m.Lock()
defer m.Unlock()

processes := []string{}
for p := range m.progesses {
if filter(p) {
processes = append(processes, p)
}
}
return processes
}

// Status returns the current progress status of a give name.
func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed float64) {
m.RLock()
defer m.RUnlock()
if p, exist := m.progesses[progress]; exist {
process = 1 - p.left/p.total
speedPerSec := (p.total - p.left) / time.Since(p.startTime).Seconds()
leftSeconds = p.left / speedPerSec
if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) {
leftSeconds = math.MaxFloat64
}
currentSpeed = speedPerSec
return
}
return 0, 0, 0
}
63 changes: 63 additions & 0 deletions pkg/progress/progress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package progress_test

import (
"math"
"strings"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/tikv/pd/pkg/progress"
)

func Test(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testProgressSuite{})

type testProgressSuite struct{}

func (s *testProgressSuite) Test(c *C) {
n := "test"
m := progress.NewManager()
c.Assert(m.AddProgress(n, 100), IsFalse)
p, ls, cs := m.Status(n)
c.Assert(p, Equals, 0.0)
c.Assert(ls, Equals, math.MaxFloat64)
c.Assert(cs, Equals, 0.0)
time.Sleep(time.Second)
c.Assert(m.AddProgress(n, 100), IsTrue)
m.UpdateProgress(n, 30)
p, ls, cs = m.Status(n)
c.Assert(p, Equals, 0.7)
// 30/(70/1s+) > 30/70
c.Assert(ls, Greater, 30.0/70.0)
// 70/1s+ > 70
c.Assert(cs, Less, 70.0)
ps := m.GetProgresses(func(p string) bool {
return strings.Contains(p, n)
})
c.Assert(ps, HasLen, 1)
c.Assert(ps[0], Equals, n)
ps = m.GetProgresses(func(p string) bool {
return strings.Contains(p, "a")
})
c.Assert(ps, HasLen, 0)
c.Assert(m.RemoveProgress(n), IsTrue)
c.Assert(m.RemoveProgress(n), IsFalse)
}
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/stores/limit", storesHandler.SetAllStoresLimit, setMethods("POST"), setAuditBackend(localLog))
registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.SetStoreLimitScene, setMethods("POST"), setAuditBackend(localLog))
registerFunc(clusterRouter, "/stores/limit/scene", storesHandler.GetStoreLimitScene, setMethods("GET"))
registerFunc(clusterRouter, "/stores/progress", storesHandler.GetStoresProgress, setMethods("GET"))

labelsHandler := newLabelsHandler(svr, rd)
registerFunc(clusterRouter, "/labels", labelsHandler.GetLabels, setMethods("GET"))
Expand Down
53 changes: 52 additions & 1 deletion server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (h *storesHandler) SetStoreLimitScene(w http.ResponseWriter, r *http.Reques
// @Tags store
// @Summary Get limit scene in the cluster.
// @Produce json
// @Success 200 {string} string "Set store limit scene successfully."
// @Success 200 {string} string "Get store limit scene successfully."
// @Router /stores/limit/scene [get]
func (h *storesHandler) GetStoreLimitScene(w http.ResponseWriter, r *http.Request) {
typeName := r.URL.Query().Get("type")
Expand All @@ -610,6 +610,57 @@ func (h *storesHandler) GetStoreLimitScene(w http.ResponseWriter, r *http.Reques
h.rd.JSON(w, http.StatusOK, scene)
}

// Progress contains status about a progress.
type Progress struct {
Action string `json:"action"`
StoreID uint64 `json:"store_id,omitempty"`
Progress float64 `json:"progress"`
CurrentSpeed float64 `json:"current_speed"`
LeftSeconds float64 `json:"left_seconds"`
}

// @Tags stores
// @Summary Get store progress in the cluster.
// @Produce json
// @Success 200 {object} Progress
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores/progress [get]
func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request) {
if v := r.URL.Query().Get("id"); v != "" {
storeID, err := strconv.ParseUint(v, 10, 64)
if err != nil {
apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(err))
return
}

action, progress, leftSeconds, currentSpeed := h.Handler.GetProgressByID(v)
sp := &Progress{
StoreID: storeID,
Action: action,
Progress: progress,
CurrentSpeed: currentSpeed,
LeftSeconds: leftSeconds,
}

h.rd.JSON(w, http.StatusOK, sp)
return
}
if v := r.URL.Query().Get("action"); v != "" {
progress, leftSeconds, currentSpeed := h.Handler.GetProgressByAction(v)
sp := &Progress{
Action: v,
Progress: progress,
CurrentSpeed: currentSpeed,
LeftSeconds: leftSeconds,
}

h.rd.JSON(w, http.StatusOK, sp)
return
}
h.rd.JSON(w, http.StatusBadRequest, "need query parameters")
}

// @Tags store
// @Summary Get stores in the cluster.
// @Param state query array true "Specify accepted store states."
Expand Down
Loading

0 comments on commit 07a6bcd

Please sign in to comment.