Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support actively requesting update health feedback information by calling RPC to TiKV #1287

Merged
merged 14 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754
github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 h1:nU9wDeMsID8EWawRQVdmRYcNhUrlI4TKogZhXleG4QQ=
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900 h1:snIM8DC846ufdlRclITACXfr1kvVIPU4cuQ6w3JVVY4=
github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754
github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900
github.com/pingcap/tidb v1.1.0-beta.0.20240430081142-7481aa6d0b8b
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 h1:nU9wDeMsID8EWawRQVdmRYcNhUrlI4TKogZhXleG4QQ=
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900 h1:snIM8DC846ufdlRclITACXfr1kvVIPU4cuQ6w3JVVY4=
github.com/pingcap/kvproto v0.0.0-20240513094934-d9297553c900/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E=
Expand Down
62 changes: 62 additions & 0 deletions integration_tests/health_feedback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv_test

import (
"context"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestGetHealthFeedback(t *testing.T) {
if !*withTiKV {
return
}

tikvCluster := NewTestStore(t)
defer tikvCluster.Close()

// Find any TiKV node
store := tikvCluster.GetRegionCache().GetAllStores()[0]
for _, s := range tikvCluster.GetRegionCache().GetAllStores() {
if s.StoreType() == tikvrpc.TiKV {
store = s
}
}
require.NotNil(t, store)

client := tikvCluster.GetTiKVClient()
ctx := context.Background()

for i := 0; i < 3; i++ {
// In normal cases TiKV's slow score should be stable with value 1. Set it to any unstable value and check again
// to ensure the value is indeed received from TiKV.
store.GetHealthStatus().ResetTiKVServerSideSlowScoreForTest(50)

resp, err := client.SendRequest(ctx, store.GetAddr(), tikvrpc.NewRequest(tikvrpc.CmdGetHealthFeedback, &kvrpcpb.GetHealthFeedbackRequest{}), time.Second)
require.NoError(t, err)
getHealthFeedbackResp := resp.Resp.(*kvrpcpb.GetHealthFeedbackResponse)
require.NotNil(t, getHealthFeedbackResp)
require.NotEqual(t, uint64(0), getHealthFeedbackResp.GetHealthFeedback().GetFeedbackSeqNo())
require.Equal(t, int32(1), getHealthFeedbackResp.GetHealthFeedback().GetSlowScore())
require.Equal(t, store.StoreID(), getHealthFeedbackResp.GetHealthFeedback().GetStoreId())
// Updated in batch RPC stream.
require.Equal(t, int64(1), store.GetHealthStatus().GetHealthStatusDetail().TiKVSideSlowScore)
}
}
3 changes: 2 additions & 1 deletion internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pkg/errors"
Expand Down Expand Up @@ -110,7 +111,7 @@ type Client interface {
// ClientEventListener is a listener to handle events produced by `Client`.
type ClientEventListener interface {
// OnHealthFeedback is called when `Client` receives a response that carries the HealthFeedback information.
OnHealthFeedback(feedback *tikvpb.HealthFeedback)
OnHealthFeedback(feedback *kvrpcpb.HealthFeedback)
}

// ClientExt is a client has extended interfaces.
Expand Down
3 changes: 2 additions & 1 deletion internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -783,7 +784,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
}
}

func (c *batchCommandsClient) onHealthFeedback(feedback *tikvpb.HealthFeedback) {
func (c *batchCommandsClient) onHealthFeedback(feedback *kvrpcpb.HealthFeedback) {
if h := c.eventListener.Load(); h != nil {
(*h).OnHealthFeedback(feedback)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,16 +823,16 @@ func TestPrioritySentLimit(t *testing.T) {
}

type testClientEventListener struct {
healthFeedbackCh chan *tikvpb.HealthFeedback
healthFeedbackCh chan *kvrpcpb.HealthFeedback
}

func newTestClientEventListener() *testClientEventListener {
return &testClientEventListener{
healthFeedbackCh: make(chan *tikvpb.HealthFeedback, 100),
healthFeedbackCh: make(chan *kvrpcpb.HealthFeedback, 100),
}
}

func (l *testClientEventListener) OnHealthFeedback(feedback *tikvpb.HealthFeedback) {
func (l *testClientEventListener) OnHealthFeedback(feedback *kvrpcpb.HealthFeedback) {
l.healthFeedbackCh <- feedback
}

Expand Down
2 changes: 1 addition & 1 deletion internal/client/mockserver/mock_tikv_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
err = ss.Send(&tikvpb.BatchCommandsResponse{
Responses: responses,
RequestIds: req.GetRequestIds(),
HealthFeedback: &tikvpb.HealthFeedback{
HealthFeedback: &kvrpcpb.HealthFeedback{
StoreId: 1,
FeedbackSeqNo: feedbackSeq,
SlowScore: 1,
Expand Down
39 changes: 25 additions & 14 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/config"
Expand Down Expand Up @@ -631,6 +630,8 @@ type RegionCache struct {
codec apicodec.Codec
enableForwarding bool

requestHealthFeedbackCallback func(ctx context.Context, addr string) error

mu regionIndexMu

stores storeCache
Expand All @@ -642,7 +643,8 @@ type RegionCache struct {
}

type regionCacheOptions struct {
noHealthTick bool
noHealthTick bool
requestHealthFeedbackCallback func(ctx context.Context, addr string) error
}

type RegionCacheOpt func(*regionCacheOptions)
Expand All @@ -651,6 +653,12 @@ func RegionCacheNoHealthTick(o *regionCacheOptions) {
o.noHealthTick = true
}

func WithRequestHealthFeedbackCallback(callback func(ctx context.Context, addr string) error) RegionCacheOpt {
return func(options *regionCacheOptions) {
options.requestHealthFeedbackCallback = callback
}
}

// NewRegionCache creates a RegionCache.
func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
var options regionCacheOptions
Expand All @@ -659,7 +667,8 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
}

c := &RegionCache{
pdClient: pdClient,
pdClient: pdClient,
requestHealthFeedbackCallback: options.requestHealthFeedbackCallback,
}

c.codec = apicodec.NewCodecV1(apicodec.ModeRaw)
Expand Down Expand Up @@ -704,7 +713,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
return false
}, time.Duration(refreshStoreInterval/4)*time.Second, c.stores.getCheckStoreEvents())
if !options.noHealthTick {
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
c.bg.schedule(c.checkAndUpdateStoreHealthStatus, time.Duration(refreshStoreInterval/4)*time.Second)
}
c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second)
if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 {
Expand Down Expand Up @@ -2618,7 +2627,7 @@ func (r *Region) ContainsByEnd(key []byte) bool {
}

// checkAndUpdateStoreHealthStatus checks and updates health stats on each store.
func (c *RegionCache) checkAndUpdateStoreHealthStatus() {
func (c *RegionCache) checkAndUpdateStoreHealthStatus(ctx context.Context, now time.Time) bool {
defer func() {
r := recover()
if r != nil {
Expand All @@ -2630,16 +2639,18 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() {
}
}
}()
healthDetails := make(map[uint64]HealthStatusDetail)
now := time.Now()
var stores []*Store
c.stores.forEach(func(store *Store) {
store.healthStatus.tick(now)
healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail()
stores = append(stores, store)
})
for store, details := range healthDetails {
metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.ClientSideSlowScore))
metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.TiKVSideSlowScore))
for _, store := range stores {
store.healthStatus.tick(ctx, now, store, c.requestHealthFeedbackCallback)
healthDetails := store.healthStatus.GetHealthStatusDetail()
metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.ClientSideSlowScore))
metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.TiKVSideSlowScore))
}

return false
}

// reportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
Expand All @@ -2662,7 +2673,7 @@ func contains(startKey, endKey, key []byte) bool {
(bytes.Compare(key, endKey) < 0 || len(endKey) == 0)
}

func (c *RegionCache) onHealthFeedback(feedback *tikvpb.HealthFeedback) {
func (c *RegionCache) onHealthFeedback(feedback *kvrpcpb.HealthFeedback) {
store, ok := c.stores.get(feedback.GetStoreId())
if !ok {
logutil.BgLogger().Info("dropped health feedback info due to unknown store id", zap.Uint64("storeID", feedback.GetStoreId()))
Expand All @@ -2683,6 +2694,6 @@ type regionCacheClientEventListener struct {
}

// OnHealthFeedback implements the `client.ClientEventListener` interface.
func (l *regionCacheClientEventListener) OnHealthFeedback(feedback *tikvpb.HealthFeedback) {
func (l *regionCacheClientEventListener) OnHealthFeedback(feedback *kvrpcpb.HealthFeedback) {
l.c.onHealthFeedback(feedback)
}
31 changes: 20 additions & 11 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config/retry"
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
uatomic "go.uber.org/atomic"
)
Expand Down Expand Up @@ -2088,10 +2089,14 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
}

func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
store := newStore(1, "", "", "", tikvrpc.TiKV, resolved, nil)
store.livenessState = uint32(reachable)
ctx := context.Background()

stats := newStoreHealthStatus(1)
s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1))
now := time.Now()
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1))
s.False(stats.tikvSideSlowScore.hasTiKVFeedback.Load())
s.False(stats.IsSlow())
Expand All @@ -2108,30 +2113,34 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
s.True(stats.IsSlow())

now = now.Add(time.Minute * 2)
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.Equal(int64(60), stats.GetHealthStatusDetail().TiKVSideSlowScore)
s.False(stats.IsSlow())

now = now.Add(time.Minute * 3)
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore)
s.False(stats.IsSlow())

now = now.Add(time.Minute)
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore)
s.False(stats.IsSlow())
}

func (s *testRegionCacheSuite) TestStoreHealthStatus() {
store := newStore(1, "", "", "", tikvrpc.TiKV, resolved, nil)
store.livenessState = uint32(reachable)
ctx := context.Background()

stats := newStoreHealthStatus(1)
now := time.Now()
s.False(stats.IsSlow())

for !stats.clientSideSlowScore.isSlow() {
stats.clientSideSlowScore.recordSlowScoreStat(time.Minute)
}
stats.tick(now)
stats.tick(ctx, now, store, nil)
s.True(stats.IsSlow())
s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore)

Expand All @@ -2142,7 +2151,7 @@ func (s *testRegionCacheSuite) TestStoreHealthStatus() {

for stats.clientSideSlowScore.isSlow() {
stats.clientSideSlowScore.recordSlowScoreStat(time.Millisecond)
stats.tick(now)
stats.tick(ctx, now, store, nil)
}
s.True(stats.IsSlow())
s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore)
Expand All @@ -2160,7 +2169,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() {
s.True(exists)
s.False(store1.healthStatus.IsSlow())

feedbackMsg := &tikvpb.HealthFeedback{
feedbackMsg := &kvrpcpb.HealthFeedback{
StoreId: s.store1,
FeedbackSeqNo: 1,
SlowScore: 100,
Expand All @@ -2169,7 +2178,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() {
s.True(store1.healthStatus.IsSlow())
s.Equal(int64(100), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore)

feedbackMsg = &tikvpb.HealthFeedback{
feedbackMsg = &kvrpcpb.HealthFeedback{
StoreId: s.store1,
FeedbackSeqNo: 2,
SlowScore: 90,
Expand All @@ -2178,7 +2187,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() {
s.cache.onHealthFeedback(feedbackMsg)
s.Equal(int64(100), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore)

feedbackMsg = &tikvpb.HealthFeedback{
feedbackMsg = &kvrpcpb.HealthFeedback{
StoreId: s.store1,
FeedbackSeqNo: 3,
SlowScore: 90,
Expand All @@ -2187,7 +2196,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() {
s.cache.onHealthFeedback(feedbackMsg)
s.Equal(int64(90), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore)

feedbackMsg = &tikvpb.HealthFeedback{
feedbackMsg = &kvrpcpb.HealthFeedback{
StoreId: s.store1,
FeedbackSeqNo: 4,
SlowScore: 50,
Expand Down
Loading
Loading