Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic/test #8

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
c6f7726
add one metrics
bufferflies Jul 11, 2022
efc783c
Merge branches 'master' and 'master' of github.com:tikv/pd
bufferflies Jul 13, 2022
2ac5ce1
Merge branch 'master' of github.com:tikv/pd
bufferflies Jul 14, 2022
1606d99
Merge branch 'master' of github.com:tikv/pd
bufferflies Jul 14, 2022
500e3a8
Revert "add one metrics"
bufferflies Jul 14, 2022
c56199f
Merge branch 'master' of github.com:tikv/pd
bufferflies Jul 18, 2022
af1b6b6
init store limit
bufferflies Jul 18, 2022
590cf75
add unit test
bufferflies Jul 19, 2022
5ed9032
panic
bufferflies Jul 19, 2022
e3daa3b
recv-snap-limit
bufferflies Jul 19, 2022
bfa9c11
add log
bufferflies Jul 19, 2022
c46d0ff
token
bufferflies Jul 19, 2022
defc392
add log
bufferflies Jul 26, 2022
f826930
add log
bufferflies Jul 26, 2022
14cffea
refactor snap size
bufferflies Jul 27, 2022
e3da3cd
add config for snapshot
bufferflies Jul 29, 2022
d5cfd10
add sender snapshot limit
bufferflies Aug 2, 2022
3fe5beb
raft
bufferflies Aug 8, 2022
f4f9d1d
cache influence
bufferflies Aug 15, 2022
3323cb2
update origin
bufferflies Aug 15, 2022
1d819d6
Merge branch 'feature/influence' into dynamic/test
bufferflies Aug 15, 2022
3433cdc
merge cache influence
bufferflies Aug 15, 2022
6d8e7f1
update kvproto
bufferflies Aug 16, 2022
ae53a4c
add log
bufferflies Aug 16, 2022
4a3fbea
fix recv size bug
bufferflies Aug 16, 2022
1710746
add limit
bufferflies Aug 16, 2022
c1f6ad9
report sent size
bufferflies Aug 17, 2022
51b6365
unsent
bufferflies Aug 17, 2022
fabdd34
remove log and send filter
bufferflies Aug 17, 2022
087a4a7
add sec
bufferflies Aug 22, 2022
5fcf717
rlock
bufferflies Aug 22, 2022
915a270
panic
bufferflies Aug 22, 2022
dea16d2
cost
bufferflies Aug 22, 2022
42cd811
add feedback for recv
bufferflies Aug 23, 2022
1b21d1e
log
bufferflies Aug 23, 2022
d3e29eb
panic
bufferflies Aug 23, 2022
6cda434
adjust size
bufferflies Aug 23, 2022
785312b
min vaue
bufferflies Aug 23, 2022
de11f4f
avoid inc
bufferflies Aug 23, 2022
dec2c66
adjust pi
bufferflies Aug 23, 2022
177f5e1
add generator size
bufferflies Aug 24, 2022
10150c8
feedback send
bufferflies Aug 24, 2022
db52bc3
add log
bufferflies Aug 25, 2022
2e57ee9
filter recv
bufferflies Aug 25, 2022
8afc236
add switch
bufferflies Aug 25, 2022
dd6f288
log
bufferflies Aug 25, 2022
e206710
update kvproto
bufferflies Sep 5, 2022
d674f5a
conflict
bufferflies Sep 5, 2022
2725727
uint64 to int64
bufferflies Sep 5, 2022
c569511
min tolerate sec
bufferflies Sep 6, 2022
9a2d329
use send
bufferflies Sep 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,5 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
)

replace github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad => github.com/bufferflies/kvproto v0.0.0-20220906102144-8a561f3d9940
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo=
github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI=
github.com/bufferflies/kvproto v0.0.0-20220906102144-8a561f3d9940 h1:r7v9B91FXFIXKk6ong7+rEkjvjAi2/fdzGRL90rFXiM=
github.com/bufferflies/kvproto v0.0.0-20220906102144-8a561f3d9940/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g=
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets=
github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs=
Expand Down Expand Up @@ -417,8 +419,6 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad h1:lGKxsEwdE0pVXzHYD1SQ1vfa3t/bFVU/latrQz8b/w0=
github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down
67 changes: 67 additions & 0 deletions pkg/controller/pi_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

const DEFAULT_ERROR_BUFFER = 100

// PIController
type PIController struct {
inflight *inflight
proportion float64
integral float64
lastSum float64
}

// NewPIController
func NewPIController(proportion, integral float64) *PIController {
return &PIController{
inflight: newInflight(DEFAULT_ERROR_BUFFER),
lastSum: 0.0,
proportion: proportion,
integral: integral,
}
}

// AddError
func (p *PIController) AddError(err float64) float64 {
//old := p.inflight.Add(err)
p.lastSum = p.lastSum + err
return p.proportion*err + p.integral*p.lastSum
}

type inflight struct {
array []float64
start int
size int
}

func newInflight(size int) *inflight {
return &inflight{
array: make([]float64, size),
size: size,
}
}

func (f *inflight) Add(element float64) float64 {
idx := f.index()
old := f.array[idx]
f.array[idx] = element
f.start++
return old
}

func (f *inflight) index() int {
return f.start % f.size
}
31 changes: 31 additions & 0 deletions pkg/controller/pi_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestInflight(t *testing.T) {
re := assert.New(t)
in := newInflight(3)
re.Equal(in.Add(1), float64(0))
re.Equal(in.Add(2), float64(0))
re.Equal(in.Add(3), float64(0))
re.Equal(in.Add(4), float64(1))
re.Equal(in.Add(5), float64(2))
}
27 changes: 27 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const (
persistLimitWaitTime = 100 * time.Millisecond
removingAction = "removing"
preparingAction = "preparing"
minTolerateDurationSec = 5
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -731,6 +732,28 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
}

for _, stat := range stats.GetSnapshotStats() {
dur := stat.GetSendDurationSec() + stat.GetGenerateDurationSec()
if dur < minTolerateDurationSec {
dur = minTolerateDurationSec
}
e := int64(dur)*2 - int64(stat.GetTotalDurationSec())
log.Info("snapshot complete",
zap.Uint64("store-id", stats.GetStoreId()),
zap.Uint64("region-id", stat.GetRegionId()),
zap.Uint64("generate-snapshot-sec", stat.GetGenerateDurationSec()),
zap.Uint64("send-snapshot-sec", stat.GetSendDurationSec()),
zap.Uint64("takes", stat.GetTotalDurationSec()),
zap.Uint64("transport-size", stat.GetTransportSize()),
zap.Stringer("default-limit", storelimit.DefaultSnapLimit),
zap.Int64("error", e),
)

store.Feedback(float64(e), storelimit.DefaultSnapLimit)
storeErrorGauge.WithLabelValues(strconv.FormatUint(store.GetID(), 10)).Add(float64(e))

}
// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
return nil
Expand Down Expand Up @@ -2093,6 +2116,10 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) {
for _, limitType := range storelimit.TypeNameValue {
c.core.ResetStoreLimit(storeID, limitType)
}

for _, snapType := range storelimit.SnapTypeNameValue {
c.core.ResetSnapLimit(storeID, snapType)
}
delete(cfg.StoreLimit, storeID)
c.opt.SetScheduleConfig(cfg)
var err error
Expand Down
6 changes: 6 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"bytes"
"context"
"github.com/tikv/pd/server/core/storelimit"
"net/http"
"strconv"
"sync"
Expand Down Expand Up @@ -150,6 +151,11 @@ func (c *coordinator) patrolRegions() {
continue
}

store := c.cluster.GetStore(region.GetLeader().GetStoreId())
if store == nil || !store.IsAvailableSnap(storelimit.SendSnapShot) {
continue
}

ops := c.checkers.CheckRegion(region)

key = region.GetEndKey()
Expand Down
18 changes: 18 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,22 @@ var (
Name: "store_sync",
Help: "The state of store sync config",
}, []string{"address", "state"})

storeSnapShotSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "snapshot",
Name: "size",
Help: "Indicate the snapshot report size",
}, []string{"store", "type"})

storeErrorGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "store",
Name: "error",
Help: "the error of store",
}, []string{"store"})
)

func init() {
Expand All @@ -135,4 +151,6 @@ func init() {
prometheus.MustRegister(storesSpeedGauge)
prometheus.MustRegister(storesETAGauge)
prometheus.MustRegister(storeSyncConfigEvent)
prometheus.MustRegister(storeSnapShotSizeGauge)
prometheus.MustRegister(storeErrorGauge)
}
8 changes: 8 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ const (
defaultLogFormat = "text"

defaultMaxMovableHotPeerSize = int64(512)

defaultSendSnapshotSize = int64(1000)
)

// Special keys for Labels
Expand Down Expand Up @@ -764,6 +766,8 @@ type ScheduleConfig struct {
// MaxMovableHotPeerSize is the threshold of region size for balance hot region and split bucket scheduler.
// Hot region must be split before moved if it's region size is greater than MaxMovableHotPeerSize.
MaxMovableHotPeerSize int64 `toml:"max-movable-hot-peer-size" json:"max-movable-hot-peer-size,omitempty"`

SendSnapshotSize int64 `toml:"send-snapshot-size" json:"send-snapshot-size"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -865,6 +869,10 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
if !meta.IsDefined("enable-cross-table-merge") {
c.EnableCrossTableMerge = defaultEnableCrossTableMerge
}

if !meta.IsDefined("send-snapshot-size") {
adjustInt64(&c.SendSnapshotSize, defaultSendSnapshotSize)
}
adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio)
adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio)

Expand Down
9 changes: 9 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,15 @@ func (o *PersistOptions) IsLocationReplacementEnabled() bool {
return o.GetScheduleConfig().EnableLocationReplacement
}

// GetSendSnapshotSize returns the send snapshot size.
func (o *PersistOptions) GetSendSnapshotSize() int64 {
size := o.GetScheduleConfig().SendSnapshotSize
if size <= 0 {
size = defaultSendSnapshotSize
}
return size
}

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistOptions) GetMaxMovableHotPeerSize() int64 {
size := o.GetScheduleConfig().MaxMovableHotPeerSize
Expand Down
7 changes: 7 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Typ
bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...)
}

// ResetSnapLimit resets the snapshot limit for the given store.
func (bc *BasicCluster) ResetSnapLimit(storeID uint64, limitType storelimit.SnapType, cap ...int64) {
bc.Lock()
defer bc.Unlock()
bc.Stores.ResetSnapLimit(storeID, limitType, cap...)
}

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64) {
bc.Lock()
Expand Down
Loading