Skip to content

Commit

Permalink
add trend
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 10, 2024
1 parent 8b8c78a commit e6a5e29
Show file tree
Hide file tree
Showing 8 changed files with 582 additions and 4 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,11 @@ error = '''
leader is nil
'''

["PD:server:ErrLeaderUnhealthy"]
error = '''
leader %s unhealthy, leader-key is [%s]
'''

["PD:server:ErrRateLimitExceeded"]
error = '''
rate limit exceeded
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ var (
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
ErrLeaderFrequentlyChange = errors.Normalize("leader %s frequently changed, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderFrequentlyChange"))
ErrLeaderUnhealthy = errors.Normalize("leader %s unhealthy, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderUnhealthy"))
)

// logutil errors
Expand Down
29 changes: 25 additions & 4 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/server/trend"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
Expand Down Expand Up @@ -63,15 +64,26 @@ type EmbeddedEtcdMember struct {
memberValue string
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value

healthyTrend *trend.Trend
}

// NewMember create a new Member.
func NewMember(etcd *embed.Etcd, client *clientv3.Client, id uint64) *EmbeddedEtcdMember {
return &EmbeddedEtcdMember{
etcd: etcd,
client: client,
id: id,
etcd: etcd,
client: client,
id: id,
healthyTrend: trend.NewTrend(10 * time.Second),
}
}

// GetHealthyTrend is used to get the healthy trend of the leader.
func (m *EmbeddedEtcdMember) GetHealthyTrend() *trend.Trend {
if m == nil {
return nil

Check warning on line 84 in pkg/member/member.go

View check run for this annotation

Codecov / codecov/patch

pkg/member/member.go#L84

Added line #L84 was not covered by tests
}
return m.healthyTrend
}

// ID returns the unique etcd ID for this server in etcd cluster.
Expand Down Expand Up @@ -186,11 +198,20 @@ func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout in
failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue()))
})

avg := m.GetHealthyTrend().AvgRate()
if avg > 1 {
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err

Check warning on line 204 in pkg/member/member.go

View check run for this annotation

Codecov / codecov/patch

pkg/member/member.go#L203-L204

Added lines #L203 - L204 were not covered by tests
}
log.Warn("pd leader is unhealthy", zap.Float64("healthy-trend", avg))
return errs.ErrLeaderUnhealthy.FastGenByArgs(m.Name(), m.GetLeaderPath())

Check warning on line 207 in pkg/member/member.go

View check run for this annotation

Codecov / codecov/patch

pkg/member/member.go#L206-L207

Added lines #L206 - L207 were not covered by tests
}

if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
m.leadership.ResetCampaignTimes()
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err
}
m.leadership.ResetCampaignTimes()
return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath())
}

Expand Down
18 changes: 18 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,30 @@ var (
Name: "forward_fail_total",
Help: "Counter of forward fail.",
}, []string{"request", "type"})

leaderTrendRecordTime = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "leader_trend_record_time",
Help: "The trend time of PD server healthy.",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
leaderTrendAvgRate = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "server",
Name: "leader_trend_avg_rate",
Help: "The trend rate of PD server healthy.",
})
)

func init() {
prometheus.MustRegister(timeJumpBackCounter)
prometheus.MustRegister(regionHeartbeatCounter)
prometheus.MustRegister(regionHeartbeatLatency)
prometheus.MustRegister(leaderTrendRecordTime)
prometheus.MustRegister(leaderTrendAvgRate)
prometheus.MustRegister(metadataGauge)
prometheus.MustRegister(etcdStateGauge)
prometheus.MustRegister(tsoProxyHandleDuration)
Expand Down
19 changes: 19 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import (
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/trend"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/mvcc/mvccpb"
Expand All @@ -90,6 +91,8 @@ import (

const (
serverMetricsInterval = time.Minute

serverHealthyCheckInterval = time.Second
// pdRootPath for all pd servers.
pdRootPath = "/pd"
pdAPIPrefix = "/pd/"
Expand Down Expand Up @@ -654,10 +657,26 @@ func (s *Server) serverMetricsLoop() {
defer cancel()
ticker := time.NewTicker(serverMetricsInterval)
defer ticker.Stop()
healthyCheckTicker := time.NewTicker(serverHealthyCheckInterval)
defer healthyCheckTicker.Stop()
val := trend.NewAsyncFromEtcd()

for {
select {
case <-ticker.C:
s.collectEtcdStateMetrics()
case <-healthyCheckTicker.C:
// record fsync value
if s.client != nil && len(s.client.Endpoints()) > 0 {
if val, err := val.GetVal(s.client.Endpoints()[0]); err == nil {
start := time.Now()
s.member.GetHealthyTrend().Record(val, start)
leaderTrendRecordTime.Observe(val)
// TODO: remove it, just for test now.
avg := s.member.GetHealthyTrend().AvgRate()
leaderTrendAvgRate.Set(avg)
}
}
case <-ctx.Done():
log.Info("server is closed, exit metrics loop")
return
Expand Down
172 changes: 172 additions & 0 deletions server/trend/trend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trend

import (
"container/list"
"go.uber.org/zap"
"math"
"sync"
"time"

"github.com/pingcap/log"
)

// SampleValue is a sample value.
type SampleValue struct {
value float64
time time.Time
}

// SampleWindow is a window of sample values.
type SampleWindow struct {
sum float64
values *list.List
duration time.Duration
}

// NewSampleWindow creates a new instance of SampleWindow.
func NewSampleWindow(duration time.Duration) *SampleWindow {
return &SampleWindow{
sum: 0,
values: list.New(),
duration: duration,
}
}

// Record adds a new sample value and maintains the window constraints.
func (sw *SampleWindow) Record(value float64, now time.Time) {
sw.values.PushBack(SampleValue{value: value, time: now})
sw.sum += value

for sw.values.Len() > 0 {
front := sw.values.Front().Value.(SampleValue)
if now.Sub(front.time) >= sw.duration {
sw.values.Remove(sw.values.Front())
sw.sum -= front.value
} else {
break
}
}
}

// Avg calculates the average of the values in the window.
func (sw *SampleWindow) Avg() float64 {
if sw.values.Len() == 0 {
return 0.0

Check warning on line 68 in server/trend/trend.go

View check run for this annotation

Codecov / codecov/patch

server/trend/trend.go#L68

Added line #L68 was not covered by tests
}
log.Info("sw.sum", zap.Float64("sw.sum", sw.sum), zap.Int("sw.values.Len()", sw.values.Len()))
return sw.sum / float64(sw.values.Len())
}

func (sw *SampleWindow) reset(capacity int64) {
sw.sum = 0
sw.values.Init()
sw.duration = time.Duration(capacity)

Check warning on line 77 in server/trend/trend.go

View check run for this annotation

Codecov / codecov/patch

server/trend/trend.go#L75-L77

Added lines #L75 - L77 were not covered by tests
}

// Trend is a trend of a value.
type Trend struct {
sync.RWMutex

smallWindow *SampleWindow
bigWindow *SampleWindow

changed bool
cache float64
}

// NewTrend creates a new instance of Trend.
func NewTrend(dur time.Duration) *Trend {
return &Trend{
smallWindow: NewSampleWindow(dur),
bigWindow: NewSampleWindow(dur * 2),
changed: false,
cache: 0.0,
}
}

// Record adds a new sample value and maintains the window constraints.
func (t *Trend) Record(value float64, now time.Time) {
t.Lock()
defer t.Unlock()

t.changed = true
t.smallWindow.Record(value, now)
t.bigWindow.Record(value, now)
}

// TODO: unSensitiveCause reduce influence of spike
const unSensitiveCause = 0.0

// AvgRate calculates the average rate of the values in the window.
func (t *Trend) AvgRate() float64 {
t.Lock()
defer t.Unlock()

if !t.changed {
return t.cache
}

t.cache = laLbRate(t.smallWindow.Avg(), t.bigWindow.Avg(), unSensitiveCause)
log.Info("trend", zap.Float64("t.cache", t.cache), zap.Float64("smallWindow.Avg()", t.smallWindow.Avg()), zap.Float64("bigWindow.Avg()", t.bigWindow.Avg()))
return t.cache
}

// Reset resets the trend.
func (t *Trend) Reset() {
t.Lock()
defer t.Unlock()

Check warning on line 131 in server/trend/trend.go

View check run for this annotation

Codecov / codecov/patch

server/trend/trend.go#L130-L131

Added lines #L130 - L131 were not covered by tests

t.smallWindow.reset(10)
t.bigWindow.reset(30)
t.changed = false
t.cache = 0.0

Check warning on line 136 in server/trend/trend.go

View check run for this annotation

Codecov / codecov/patch

server/trend/trend.go#L133-L136

Added lines #L133 - L136 were not covered by tests
}

// laLbRateSimple is a simple version of LaLbRate.
// func laLbRateSimple(laAvg float64, lbAvg float64) float64 {
// return (laAvg - lbAvg) / laAvg
// }

func laLbRate(laAvg float64, lbAvg float64, marginError float64) float64 {
if lbAvg < math.SmallestNonzeroFloat64 {
return 0.0
}
increased := laAvg - lbAvg
if math.Abs(increased) < math.SmallestNonzeroFloat64 {
return 0.0
}
if laAvg < lbAvg {
if -increased > marginError {
increased = -increased - marginError
} else {
increased = 0.0

Check warning on line 156 in server/trend/trend.go

View check run for this annotation

Codecov / codecov/patch

server/trend/trend.go#L156

Added line #L156 was not covered by tests
}
} else if increased > marginError {
increased -= marginError
} else {
increased = 0.0

Check warning on line 161 in server/trend/trend.go

View check run for this annotation

Codecov / codecov/patch

server/trend/trend.go#L161

Added line #L161 was not covered by tests
}
incSq := increased * increased
if laAvg < lbAvg {
incSq = -incSq
}
res := laAvg * incSq / math.Sqrt(lbAvg)
if laAvg < lbAvg {
return -math.Sqrt(-res)
}
return math.Sqrt(res)
}
Loading

0 comments on commit e6a5e29

Please sign in to comment.