From e6a5e298a3c81018b5902393ad0ebf103bc6dd82 Mon Sep 17 00:00:00 2001 From: husharp Date: Wed, 10 Jan 2024 15:53:51 +0800 Subject: [PATCH] add trend Signed-off-by: husharp --- errors.toml | 5 + pkg/errs/errno.go | 1 + pkg/member/member.go | 29 +++++- server/metrics.go | 18 ++++ server/server.go | 19 ++++ server/trend/trend.go | 172 +++++++++++++++++++++++++++++++++ server/trend/trend_test.go | 191 +++++++++++++++++++++++++++++++++++++ server/trend/util.go | 151 +++++++++++++++++++++++++++++ 8 files changed, 582 insertions(+), 4 deletions(-) create mode 100644 server/trend/trend.go create mode 100644 server/trend/trend_test.go create mode 100644 server/trend/util.go diff --git a/errors.toml b/errors.toml index 64101000478b..ce09f5216b15 100644 --- a/errors.toml +++ b/errors.toml @@ -776,6 +776,11 @@ error = ''' leader is nil ''' +["PD:server:ErrLeaderUnhealthy"] +error = ''' +leader %s unhealthy, leader-key is [%s] +''' + ["PD:server:ErrRateLimitExceeded"] error = ''' rate limit exceeded diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 8c3e914531bd..c28c792c51bf 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -217,6 +217,7 @@ var ( ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted")) ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded")) ErrLeaderFrequentlyChange = errors.Normalize("leader %s frequently changed, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderFrequentlyChange")) + ErrLeaderUnhealthy = errors.Normalize("leader %s unhealthy, leader-key is [%s]", errors.RFCCodeText("PD:server:ErrLeaderUnhealthy")) ) // logutil errors diff --git a/pkg/member/member.go b/pkg/member/member.go index 4e5322707006..729b2ea68526 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/server/trend" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "go.uber.org/zap" @@ -63,15 +64,26 @@ type EmbeddedEtcdMember struct { memberValue string // lastLeaderUpdatedTime is the last time when the leader is updated. lastLeaderUpdatedTime atomic.Value + + healthyTrend *trend.Trend } // NewMember create a new Member. func NewMember(etcd *embed.Etcd, client *clientv3.Client, id uint64) *EmbeddedEtcdMember { return &EmbeddedEtcdMember{ - etcd: etcd, - client: client, - id: id, + etcd: etcd, + client: client, + id: id, + healthyTrend: trend.NewTrend(10 * time.Second), + } +} + +// GetHealthyTrend is used to get the healthy trend of the leader. +func (m *EmbeddedEtcdMember) GetHealthyTrend() *trend.Trend { + if m == nil { + return nil } + return m.healthyTrend } // ID returns the unique etcd ID for this server in etcd cluster. @@ -186,11 +198,20 @@ func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout in failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue())) }) + avg := m.GetHealthyTrend().AvgRate() + if avg > 1 { + if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil { + return err + } + log.Warn("pd leader is unhealthy", zap.Float64("healthy-trend", avg)) + return errs.ErrLeaderUnhealthy.FastGenByArgs(m.Name(), m.GetLeaderPath()) + } + if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes { - m.leadership.ResetCampaignTimes() if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil { return err } + m.leadership.ResetCampaignTimes() return errs.ErrLeaderFrequentlyChange.FastGenByArgs(m.Name(), m.GetLeaderPath()) } diff --git a/server/metrics.go b/server/metrics.go index 0935008a420a..1d4f3c99e47d 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -160,12 +160,30 @@ var ( Name: "forward_fail_total", Help: "Counter of forward fail.", }, []string{"request", "type"}) + + leaderTrendRecordTime = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "leader_trend_record_time", + Help: "The trend time of PD server healthy.", + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^13 == 8.192 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), + }) + leaderTrendAvgRate = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "leader_trend_avg_rate", + Help: "The trend rate of PD server healthy.", + }) ) func init() { prometheus.MustRegister(timeJumpBackCounter) prometheus.MustRegister(regionHeartbeatCounter) prometheus.MustRegister(regionHeartbeatLatency) + prometheus.MustRegister(leaderTrendRecordTime) + prometheus.MustRegister(leaderTrendAvgRate) prometheus.MustRegister(metadataGauge) prometheus.MustRegister(etcdStateGauge) prometheus.MustRegister(tsoProxyHandleDuration) diff --git a/server/server.go b/server/server.go index fc2cc7466d0c..b5ee0f425255 100644 --- a/server/server.go +++ b/server/server.go @@ -80,6 +80,7 @@ import ( "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/trend" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "go.etcd.io/etcd/mvcc/mvccpb" @@ -90,6 +91,8 @@ import ( const ( serverMetricsInterval = time.Minute + + serverHealthyCheckInterval = time.Second // pdRootPath for all pd servers. pdRootPath = "/pd" pdAPIPrefix = "/pd/" @@ -654,10 +657,26 @@ func (s *Server) serverMetricsLoop() { defer cancel() ticker := time.NewTicker(serverMetricsInterval) defer ticker.Stop() + healthyCheckTicker := time.NewTicker(serverHealthyCheckInterval) + defer healthyCheckTicker.Stop() + val := trend.NewAsyncFromEtcd() + for { select { case <-ticker.C: s.collectEtcdStateMetrics() + case <-healthyCheckTicker.C: + // record fsync value + if s.client != nil && len(s.client.Endpoints()) > 0 { + if val, err := val.GetVal(s.client.Endpoints()[0]); err == nil { + start := time.Now() + s.member.GetHealthyTrend().Record(val, start) + leaderTrendRecordTime.Observe(val) + // TODO: remove it, just for test now. + avg := s.member.GetHealthyTrend().AvgRate() + leaderTrendAvgRate.Set(avg) + } + } case <-ctx.Done(): log.Info("server is closed, exit metrics loop") return diff --git a/server/trend/trend.go b/server/trend/trend.go new file mode 100644 index 000000000000..f8589d9b2f0d --- /dev/null +++ b/server/trend/trend.go @@ -0,0 +1,172 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trend + +import ( + "container/list" + "go.uber.org/zap" + "math" + "sync" + "time" + + "github.com/pingcap/log" +) + +// SampleValue is a sample value. +type SampleValue struct { + value float64 + time time.Time +} + +// SampleWindow is a window of sample values. +type SampleWindow struct { + sum float64 + values *list.List + duration time.Duration +} + +// NewSampleWindow creates a new instance of SampleWindow. +func NewSampleWindow(duration time.Duration) *SampleWindow { + return &SampleWindow{ + sum: 0, + values: list.New(), + duration: duration, + } +} + +// Record adds a new sample value and maintains the window constraints. +func (sw *SampleWindow) Record(value float64, now time.Time) { + sw.values.PushBack(SampleValue{value: value, time: now}) + sw.sum += value + + for sw.values.Len() > 0 { + front := sw.values.Front().Value.(SampleValue) + if now.Sub(front.time) >= sw.duration { + sw.values.Remove(sw.values.Front()) + sw.sum -= front.value + } else { + break + } + } +} + +// Avg calculates the average of the values in the window. +func (sw *SampleWindow) Avg() float64 { + if sw.values.Len() == 0 { + return 0.0 + } + log.Info("sw.sum", zap.Float64("sw.sum", sw.sum), zap.Int("sw.values.Len()", sw.values.Len())) + return sw.sum / float64(sw.values.Len()) +} + +func (sw *SampleWindow) reset(capacity int64) { + sw.sum = 0 + sw.values.Init() + sw.duration = time.Duration(capacity) +} + +// Trend is a trend of a value. +type Trend struct { + sync.RWMutex + + smallWindow *SampleWindow + bigWindow *SampleWindow + + changed bool + cache float64 +} + +// NewTrend creates a new instance of Trend. +func NewTrend(dur time.Duration) *Trend { + return &Trend{ + smallWindow: NewSampleWindow(dur), + bigWindow: NewSampleWindow(dur * 2), + changed: false, + cache: 0.0, + } +} + +// Record adds a new sample value and maintains the window constraints. +func (t *Trend) Record(value float64, now time.Time) { + t.Lock() + defer t.Unlock() + + t.changed = true + t.smallWindow.Record(value, now) + t.bigWindow.Record(value, now) +} + +// TODO: unSensitiveCause reduce influence of spike +const unSensitiveCause = 0.0 + +// AvgRate calculates the average rate of the values in the window. +func (t *Trend) AvgRate() float64 { + t.Lock() + defer t.Unlock() + + if !t.changed { + return t.cache + } + + t.cache = laLbRate(t.smallWindow.Avg(), t.bigWindow.Avg(), unSensitiveCause) + log.Info("trend", zap.Float64("t.cache", t.cache), zap.Float64("smallWindow.Avg()", t.smallWindow.Avg()), zap.Float64("bigWindow.Avg()", t.bigWindow.Avg())) + return t.cache +} + +// Reset resets the trend. +func (t *Trend) Reset() { + t.Lock() + defer t.Unlock() + + t.smallWindow.reset(10) + t.bigWindow.reset(30) + t.changed = false + t.cache = 0.0 +} + +// laLbRateSimple is a simple version of LaLbRate. +// func laLbRateSimple(laAvg float64, lbAvg float64) float64 { +// return (laAvg - lbAvg) / laAvg +// } + +func laLbRate(laAvg float64, lbAvg float64, marginError float64) float64 { + if lbAvg < math.SmallestNonzeroFloat64 { + return 0.0 + } + increased := laAvg - lbAvg + if math.Abs(increased) < math.SmallestNonzeroFloat64 { + return 0.0 + } + if laAvg < lbAvg { + if -increased > marginError { + increased = -increased - marginError + } else { + increased = 0.0 + } + } else if increased > marginError { + increased -= marginError + } else { + increased = 0.0 + } + incSq := increased * increased + if laAvg < lbAvg { + incSq = -incSq + } + res := laAvg * incSq / math.Sqrt(lbAvg) + if laAvg < lbAvg { + return -math.Sqrt(-res) + } + return math.Sqrt(res) +} diff --git a/server/trend/trend_test.go b/server/trend/trend_test.go new file mode 100644 index 000000000000..0e80c65fa937 --- /dev/null +++ b/server/trend/trend_test.go @@ -0,0 +1,191 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trend + +import ( + "fmt" + "github.com/pingcap/log" + "go.uber.org/zap" + "math/rand" + "testing" + "time" +) + +func TestWindows(t *testing.T) { + duration := time.Second * 5 + window := NewSampleWindow(duration) + + value := 10.0 + now := time.Now() + println("now:", now.String()) + window.Record(value, now) + + value = 15.0 + now = now.Add(time.Second * 1) + window.Record(value, now) + + println("window sum:", window.sum) + average := window.Avg() + fmt.Println(average) +} + +func TestTrendDouble(t *testing.T) { + trend := NewTrend(time.Second) + + now := time.Now() + value := 3 * 1.0 + for i := 0; i < 10; i++ { + now = now.Add(time.Millisecond * 100) + trend.Record(value, now) + } + + value *= 2 + for i := 0; i < 10; i++ { + now = now.Add(time.Millisecond * 100) + trend.Record(value, now) + } + + result := trend.AvgRate() + + println(trend.smallWindow.Avg(), trend.smallWindow.sum, trend.smallWindow.values.Len()) + println(trend.bigWindow.Avg(), trend.bigWindow.sum, trend.bigWindow.values.Len()) + fmt.Println(result) +} + +func TestTrendGetAsyncMock(t *testing.T) { + trend := NewTrend(1 * time.Second) + + go func() { + timer := time.NewTicker(time.Millisecond * 10) + defer timer.Stop() + for { + select { + case <-timer.C: + result := trend.AvgRate() + log.Info("result", zap.Float64("result", result)) + } + } + }() + + log.Info("--------------------- normal ---------------------") + // fill with normal val + now := time.Now() + for i := 0; i < 30; i++ { + now = now.Add(time.Millisecond * 100) + time.Sleep(time.Millisecond * 100) + res := rand.Intn(3) + trend.Record(float64(res+1), now) + } + + // fill with spike + log.Info("--------------------- spike111111 ---------------------") + for i := 0; i < 10; i++ { + now = now.Add(time.Millisecond * 100) + time.Sleep(time.Millisecond * 100) + res := rand.Intn(3) + trend.Record(float64(res+5), now) + } + + log.Info("--------------------- normal ---------------------") + for i := 0; i < 20; i++ { + now = now.Add(time.Millisecond * 100) + time.Sleep(time.Millisecond * 100) + res := rand.Intn(3) + trend.Record(float64(res+1), now) + } +} + +func TestTrendGetAsync(t *testing.T) { + trend := NewTrend(10 * time.Second) + a := NewAsyncFromEtcd() + + go func() { + timer := time.NewTicker(time.Second * 1) + defer timer.Stop() + for { + select { + case <-timer.C: + result := trend.AvgRate() + log.Info("result", zap.Float64("result", result)) + } + } + }() + + log.Info("--------------------- normal ---------------------") + // fill with normal val + for i := 0; i < 30; i++ { + time.Sleep(time.Second * 1) + now := time.Now() + res, _ := a.GetVal("http://127.0.0.1:2384") + log.Info("res", zap.Float64("res", res)) + trend.Record(res, now) + } + + // fill with spike + log.Info("--------------------- spike111111 ---------------------") + for i := 0; i < 10; i++ { + time.Sleep(time.Second * 1) + now := time.Now() + res, _ := a.GetVal("http://127.0.0.1:2384") + log.Info("res", zap.Float64("res", res)) + trend.Record(res*10+10, now) + } + + log.Info("--------------------- normal ---------------------") + + for i := 0; i < 20; i++ { + time.Sleep(time.Second * 1) + now := time.Now() + res, _ := a.GetVal("http://127.0.0.1:2384") + log.Info("res", zap.Float64("res", res)) + trend.Record(res, now) + } + + println(trend.smallWindow.Avg(), trend.smallWindow.sum, trend.smallWindow.values.Len()) + println(trend.bigWindow.Avg(), trend.bigWindow.sum, trend.bigWindow.values.Len()) +} + +func TestTrendSpike(t *testing.T) { + trend := NewTrend(time.Second) + + now := time.Now() + value := 3 * 1.0 + for i := 0; i < 15; i++ { + now = now.Add(time.Millisecond * 100) + trend.Record(value, now) + } + + value = 3 * 1000.0 + for i := 0; i < 15; i++ { + now = now.Add(time.Millisecond * 100) + trend.Record(value, now) + } + + result := trend.AvgRate() + println(trend.smallWindow.Avg() > trend.bigWindow.Avg()) + fmt.Println(result) +} + +func TestGetAsyncFromEtcd(t *testing.T) { + + a := NewAsyncFromEtcd() + res, _ := a.GetVal("http://127.0.0.1:2384") + log.Info("res", zap.Float64("res", res)) + + time.Sleep(time.Second * 1) + res, _ = a.GetVal("http://127.0.0.1:2384") + + log.Info("res", zap.Float64("res", res)) +} diff --git a/server/trend/util.go b/server/trend/util.go new file mode 100644 index 000000000000..e87b06ca091d --- /dev/null +++ b/server/trend/util.go @@ -0,0 +1,151 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trend + +import ( + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/pingcap/log" + "go.etcd.io/etcd/pkg/transport" + "go.uber.org/zap" +) + +// AsyncFromEtcd is a trend of async duration from etcd. +type AsyncFromEtcd struct { + lastMetric map[string]float64 + curMetric map[string]float64 + subMetric []float64 +} + +// NewAsyncFromEtcd creates a new instance of AsyncFromEtcd. +func NewAsyncFromEtcd() *AsyncFromEtcd { + return &AsyncFromEtcd{ + lastMetric: make(map[string]float64), + curMetric: make(map[string]float64), + subMetric: make([]float64, 20), + } +} + +// GetVal gets the async duration from etcd. +func (a *AsyncFromEtcd) GetVal(ep string) (float64, error) { + curSec := 0.0 + res := getMetrics(ep, "etcd_disk_wal_fsync_duration_seconds") + + a.subMetric = a.subMetric[:0] + for _, metric := range res.metrics { + ss := strings.Split(metric, " ") + if len(ss) != 2 { + return 0, fmt.Errorf("failed to parse metric %s", metric) + } + if val, err := strconv.ParseFloat(ss[1], 64); err == nil { + a.lastMetric[ss[0]] = a.curMetric[ss[0]] + a.curMetric[ss[0]] = val + a.subMetric = append(a.subMetric, a.curMetric[ss[0]]-a.lastMetric[ss[0]]) + } else { + return 0, err + } + } + + // last val is `etcd_disk_wal_fsync_duration_seconds_count` + if len(a.subMetric) == 0 { + return 0, fmt.Errorf("failed to get etcd_disk_wal_fsync_duration_seconds_count") + } + count := a.subMetric[len(a.subMetric)-1] + threshold := count * 0.999 + // last val is `cnt` and `sum` + for i := 0; i < len(a.subMetric)-2; i++ { + // 1-8192 == 2^0-2^13 + // 2^1 -> + // because only 10 buckets in the small window, 8192 will make different buckets have the same value + // we can aggregate the data to make the buckets more accurate + if a.subMetric[i] >= threshold { + curSec = float64(i) + break + } + } + + return curSec, nil +} + +func getMetrics(ep string, curName string) metric { + lines, err := fetchMetrics(ep) + if err != nil { + log.Info("failed to fetch metrics", zap.Error(err)) + return metric{} + } + return parse(lines, curName) +} + +func fetchMetrics(ep string) (lines []string, err error) { + tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, time.Second, time.Second, time.Second) + if err != nil { + return nil, err + } + cli := &http.Client{Transport: tr} + resp, err := cli.Get(fmt.Sprintf("%s/metrics", ep)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + b, rerr := io.ReadAll(resp.Body) + if rerr != nil { + return nil, rerr + } + lines = strings.Split(string(b), "\n") + return lines, nil +} + +type metric struct { + // metrics name + name string + + metrics []string +} + +func (m metric) String() (s string) { + s += strings.Join(m.metrics, "\n") + return s +} + +func parse(lines []string, curName string) (m metric) { + for _, line := range lines { + if strings.HasPrefix(line, "# HELP ") { + // add previous metric and initialize + if m.name == curName { + return + } + m = metric{metrics: make([]string, 0)} + + ss := strings.Split(strings.Replace(line, "# HELP ", "", 1), " ") + m.name = ss[0] + continue + } + + if strings.HasPrefix(line, "# TYPE ") { + continue + } + + if m.name == curName { + m.metrics = append(m.metrics, line) + } + } + + return +}