Skip to content

Commit

Permalink
add trend
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 9, 2024
1 parent dd8df25 commit bb948c4
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 5 deletions.
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ cannot set invalid configuration

["PD:server:ErrLeaderFrequentlyChange"]
error = '''
leader %s frequently changed, leader-key is [%s]
leader %s unhealthy, leader-key is [%s]
'''

["PD:server:ErrLeaderNil"]
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ var (
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
ErrLeaderFrequentlyChange = errors.Normalize("leader %s frequently changed, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderFrequentlyChange"))
ErrLeaderUnhealthy = errors.Normalize("leader %s unhealthy, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderFrequentlyChange"))
)

// logutil errors
Expand Down
30 changes: 26 additions & 4 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package member
import (
"context"
"fmt"
"math"
"math/rand"
"os"
"path"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/server/trend"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
Expand Down Expand Up @@ -63,15 +65,26 @@ type EmbeddedEtcdMember struct {
memberValue string
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value

healthyTrend *trend.Trend
}

// NewMember create a new Member.
func NewMember(etcd *embed.Etcd, client *clientv3.Client, id uint64) *EmbeddedEtcdMember {
return &EmbeddedEtcdMember{
etcd: etcd,
client: client,
id: id,
etcd: etcd,
client: client,
id: id,
healthyTrend: trend.NewTrend(10 * time.Second),
}
}

// GetHealthyTrend is used to get the healthy trend of the leader.
func (m *EmbeddedEtcdMember) GetHealthyTrend() *trend.Trend {
if m == nil {
return nil
}
return m.healthyTrend
}

// ID returns the unique etcd ID for this server in etcd cluster.
Expand Down Expand Up @@ -186,11 +199,20 @@ func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout in
failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue()))
})

avg := m.GetHealthyTrend().AvgRate()
if math.Abs(avg) > 0.1 {
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err
}
log.Warn("pd leader is unhealthy", zap.Float64("healthy-trend", avg))
return errs.ErrLeaderUnhealthy.FastGenByArgs(m.Name(), m.GetLeaderPath())
}

if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
m.leadership.ResetCampaignTimes()
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err
}
m.leadership.ResetCampaignTimes()
return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath())
}

Expand Down
13 changes: 13 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
errorspkg "errors"
"fmt"
"github.com/tikv/pd/server/trend"
"math/rand"
"net/http"
"os"
Expand Down Expand Up @@ -90,6 +91,8 @@ import (

const (
serverMetricsInterval = time.Minute

serverHealthyCheckInterval = time.Second
// pdRootPath for all pd servers.
pdRootPath = "/pd"
pdAPIPrefix = "/pd/"
Expand Down Expand Up @@ -654,10 +657,19 @@ func (s *Server) serverMetricsLoop() {
defer cancel()
ticker := time.NewTicker(serverMetricsInterval)
defer ticker.Stop()
healthyCheckTicker := time.NewTicker(serverHealthyCheckInterval)
defer healthyCheckTicker.Stop()
for {
select {
case <-ticker.C:
s.collectEtcdStateMetrics()
case <-healthyCheckTicker.C:
// record fsync value
if s.client != nil && len(s.client.Endpoints()) > 0 {
if val, err := trend.GetAsyncFromEtcd(s.client.Endpoints()[0]); err == nil {
s.member.GetHealthyTrend().Record(val*1000, time.Now())
}
}
case <-ctx.Done():
log.Info("server is closed, exit metrics loop")
return
Expand Down Expand Up @@ -1680,6 +1692,7 @@ func (s *Server) campaignLeader() {

// maintain the PD leadership, after this, TSO can be service.
s.member.KeepLeader(ctx)
s.member.GetHealthyTrend().Reset()
log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name()))

if !s.IsAPIServiceMode() {
Expand Down
166 changes: 166 additions & 0 deletions server/trend/trend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trend

import (
"container/list"
"math"
"sync"
"time"
)

// SampleValue is a sample value.
type SampleValue struct {
value float64
time time.Time
}

// SampleWindow is a window of sample values.
type SampleWindow struct {
sum float64
values *list.List
duration time.Duration
}

// NewSampleWindow creates a new instance of SampleWindow.
func NewSampleWindow(duration time.Duration) *SampleWindow {
return &SampleWindow{
sum: 0,
values: list.New(),
duration: duration,
}
}

// Record adds a new sample value and maintains the window constraints.
func (sw *SampleWindow) Record(value float64, now time.Time) {
sw.values.PushBack(SampleValue{value: value, time: now})
sw.sum += value

for sw.values.Len() > 0 {
front := sw.values.Front().Value.(SampleValue)
if now.Sub(front.time) >= sw.duration {
sw.values.Remove(sw.values.Front())
sw.sum -= front.value
} else {
break
}
}
}

// Avg calculates the average of the values in the window.
func (sw *SampleWindow) Avg() float64 {
if sw.values.Len() == 0 {
return 0.0
}
return sw.sum / float64(sw.values.Len())
}

func (sw *SampleWindow) reset(capacity int64) {
sw.sum = 0
sw.values.Init()
sw.duration = time.Duration(capacity)
}

// Trend is a trend of a value.
type Trend struct {
sync.RWMutex

smallWindow *SampleWindow
bigWindow *SampleWindow

changed bool
cache float64
}

// NewTrend creates a new instance of Trend.
func NewTrend(dur time.Duration) *Trend {
return &Trend{
smallWindow: NewSampleWindow(dur),
bigWindow: NewSampleWindow(dur * 3),
changed: false,
cache: 0.0,
}
}

// Record adds a new sample value and maintains the window constraints.
func (t *Trend) Record(value float64, now time.Time) {
t.Lock()
t.changed = true
t.smallWindow.Record(value, now)
t.bigWindow.Record(value, now)
t.Unlock()
}

// TODO: unSensitiveCause reduce influence of spike
const unSensitiveCause = 0.0

// AvgRate calculates the average rate of the values in the window.
func (t *Trend) AvgRate() float64 {
t.Lock()
defer t.Unlock()

if !t.changed {
return t.cache
}

t.cache = laLbRate(t.smallWindow.Avg(), t.bigWindow.Avg(), unSensitiveCause)
return t.cache
}

// Reset resets the trend.
func (t *Trend) Reset() {
t.Lock()
defer t.Unlock()

t.smallWindow.reset(10)
t.bigWindow.reset(30)
t.changed = false
t.cache = 0.0
}

// laLbRateSimple is a simple version of LaLbRate.
// func laLbRateSimple(laAvg float64, lbAvg float64) float64 {
// return (laAvg - lbAvg) / laAvg
// }

func laLbRate(laAvg float64, lbAvg float64, marginError float64) float64 {
if lbAvg < math.SmallestNonzeroFloat64 {
return 0.0
}
increased := laAvg - lbAvg
if math.Abs(increased) < math.SmallestNonzeroFloat64 {
return 0.0
}
if laAvg < lbAvg {
if -increased > marginError {
increased = -increased - marginError
} else {
increased = 0.0
}
} else if increased > marginError {
increased -= marginError
} else {
increased = 0.0
}
incSq := increased * increased
if laAvg < lbAvg {
incSq = -incSq
}
res := laAvg * incSq / math.Sqrt(lbAvg)
if laAvg < lbAvg {
return -math.Sqrt(-res)
}
return math.Sqrt(res)
}
88 changes: 88 additions & 0 deletions server/trend/trend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trend

import (
"fmt"
"testing"
"time"
)

func TestWindows(t *testing.T) {
duration := time.Second * 5
window := NewSampleWindow(duration)

value := 10.0
now := time.Now()
println("now:", now.String())
window.Record(value, now)

value = 15.0
now = now.Add(time.Second * 1)
window.Record(value, now)

println("window sum:", window.sum)
average := window.Avg()
fmt.Println(average)
}

func TestTrendDouble(t *testing.T) {
trend := NewTrend(time.Second)

now := time.Now()
value := 3 * 1.0
for i := 0; i < 10; i++ {
now = now.Add(time.Millisecond * 100)
trend.Record(value, now)
}

value *= 2
for i := 0; i < 10; i++ {
now = now.Add(time.Millisecond * 100)
trend.Record(value, now)
}

result := trend.AvgRate()

println(trend.smallWindow.Avg(), trend.smallWindow.sum, trend.smallWindow.values.Len())
println(trend.bigWindow.Avg(), trend.bigWindow.sum, trend.bigWindow.values.Len())
fmt.Println(result)
}

func TestTrendSpike(t *testing.T) {
trend := NewTrend(time.Second)

now := time.Now()
value := 3 * 1.0
for i := 0; i < 15; i++ {
now = now.Add(time.Millisecond * 100)
trend.Record(value, now)
}

value = 3 * 1000.0
for i := 0; i < 15; i++ {
now = now.Add(time.Millisecond * 100)
trend.Record(value, now)
}

result := trend.AvgRate()
println(trend.smallWindow.Avg() > trend.bigWindow.Avg())
fmt.Println(result)
}

func TestGetAsyncFromEtcd(t *testing.T) {
res, _ := GetAsyncFromEtcd("http://127.0.0.1:2384")
println(res * 1000)
}
Loading

0 comments on commit bb948c4

Please sign in to comment.