diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 61665dbaa..2e46582ff 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -2,9 +2,9 @@ name: Integration Test on: push: - branches: [ master, tidb-6.5 ] + branches: [ master, tidb-6.5, tidb-6.5-with-kv-timeout-feature ] pull_request: - branches: [ master, tidb-6.5 ] + branches: [ master, tidb-6.5, tidb-6.5-with-kv-timeout-feature ] jobs: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8f9f9e2c8..aba7388fd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,9 +2,9 @@ name: Unit Test on: push: - branches: [ master, tidb-6.5 ] + branches: [ master, tidb-6.5, tidb-6.5-with-kv-timeout-feature ] pull_request: - branches: [ master, tidb-6.5 ] + branches: [ master, tidb-6.5, tidb-6.5-with-kv-timeout-feature ] jobs: test: diff --git a/integration_tests/main_test.go b/integration_tests/main_test.go index fcd7a050a..5998804ba 100644 --- a/integration_tests/main_test.go +++ b/integration_tests/main_test.go @@ -27,6 +27,7 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"), // TODO: fix ttlManager goroutine leak + goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"), } goleak.VerifyTestMain(m, opts...) diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index a9880227b..cc13b38c2 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -287,12 +287,13 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe() { func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { reqStats := tikv.NewRegionRequestRuntimeStats() - tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) - tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond) snapshot := s.store.GetSnapshot(0) - snapshot.SetRuntimeStats(&txnkv.SnapshotRuntimeStats{}) - snapshot.MergeRegionRequestStats(reqStats.Stats) - snapshot.MergeRegionRequestStats(reqStats.Stats) + runtimeStats := &txnkv.SnapshotRuntimeStats{} + snapshot.SetRuntimeStats(runtimeStats) + snapshot.MergeRegionRequestStats(reqStats) + snapshot.MergeRegionRequestStats(reqStats) bo := tikv.NewBackofferWithVars(context.Background(), 2000, nil) err := bo.BackoffWithMaxSleepTxnLockFast(5, errors.New("test")) s.Nil(err) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 3c1129bc5..fc052dc46 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -296,14 +296,18 @@ func (a *batchConn) fetchMorePendingRequests( const idleTimeout = 3 * time.Minute +// BatchSendLoopPanicCounter is only used for testing. +var BatchSendLoopPanicCounter int64 = 0 + func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { defer func() { if r := recover(); r != nil { metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchSendLoop).Inc() logutil.BgLogger().Error("batchSendLoop", - zap.Reflect("r", r), + zap.Any("r", r), zap.Stack("stack")) - logutil.BgLogger().Info("restart batchSendLoop") + atomic.AddInt64(&BatchSendLoopPanicCounter, 1) + logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("count", atomic.LoadInt64(&BatchSendLoopPanicCounter))) go a.batchSendLoop(cfg) } }() @@ -347,6 +351,12 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } func (a *batchConn) getClientAndSend() { + if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil { + if timeout, ok := val.(int); ok && timeout > 0 { + time.Sleep(time.Duration(timeout * int(time.Millisecond))) + } + } + // Choose a connection by round-robbin. var ( cli *batchCommandsClient @@ -430,7 +440,7 @@ func (s *batchCommandsStream) recv() (resp *tikvpb.BatchCommandsResponse, err er if r := recover(); r != nil { metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc() logutil.BgLogger().Error("batchCommandsClient.recv panic", - zap.Reflect("r", r), + zap.Any("r", r), zap.Stack("stack")) err = errors.New("batch conn recv paniced") } @@ -598,7 +608,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport if r := recover(); r != nil { metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc() logutil.BgLogger().Error("batchRecvLoop", - zap.Reflect("r", r), + zap.Any("r", r), zap.Stack("stack")) logutil.BgLogger().Info("restart batchRecvLoop") go c.batchRecvLoop(cfg, tikvTransportLayerLoad, streamClient) @@ -781,7 +791,7 @@ func sendBatchRequest( select { case batchConn.batchCommandsCh <- entry: case <-ctx.Done(): - logutil.BgLogger().Debug("send request is cancelled", + logutil.Logger(ctx).Debug("send request is cancelled", zap.String("to", addr), zap.String("cause", ctx.Err().Error())) return nil, errors.WithStack(ctx.Err()) case <-timer.C: @@ -797,7 +807,7 @@ func sendBatchRequest( return tikvrpc.FromBatchCommandsResponse(res) case <-ctx.Done(): atomic.StoreInt32(&entry.canceled, 1) - logutil.BgLogger().Debug("wait response is cancelled", + logutil.Logger(ctx).Debug("wait response is cancelled", zap.String("to", addr), zap.String("cause", ctx.Err().Error())) return nil, errors.WithStack(ctx.Err()) case <-timer.C: diff --git a/internal/client/client_fail_test.go b/internal/client/client_fail_test.go index 43e769769..e7d413970 100644 --- a/internal/client/client_fail_test.go +++ b/internal/client/client_fail_test.go @@ -36,7 +36,6 @@ package client import ( "context" - "fmt" "sync/atomic" "testing" "time" @@ -47,6 +46,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/config" + "github.com/tikv/client-go/v2/internal/client/mock_server" "github.com/tikv/client-go/v2/tikvrpc" ) @@ -54,11 +54,11 @@ func TestPanicInRecvLoop(t *testing.T) { require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`)) require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`)) - server, port := startMockTikvService() + server, port := mock_server.StartMockTikvService() require.True(t, port > 0) defer server.Stop() - addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) + addr := server.Addr() rpcClient := NewRPCClient() rpcClient.option.dialTimeout = time.Second / 3 @@ -82,10 +82,10 @@ func TestPanicInRecvLoop(t *testing.T) { } func TestRecvErrorInMultipleRecvLoops(t *testing.T) { - server, port := startMockTikvService() + server, port := mock_server.StartMockTikvService() require.True(t, port > 0) defer server.Stop() - addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) + addr := server.Addr() // Enable batch and limit the connection count to 1 so that // there is only one BatchCommands stream for each host or forwarded host. diff --git a/internal/client/client_test.go b/internal/client/client_test.go index fd8916f22..c01f753d4 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -55,6 +55,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/config" + "github.com/tikv/client-go/v2/internal/client/mock_server" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/tikvrpc" "go.uber.org/zap" @@ -116,11 +117,11 @@ func TestCancelTimeoutRetErr(t *testing.T) { } func TestSendWhenReconnect(t *testing.T) { - server, port := startMockTikvService() + server, port := mock_server.StartMockTikvService() require.True(t, port > 0) rpcClient := NewRPCClient() - addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) + addr := server.Addr() conn, err := rpcClient.getConnArray(addr, true) assert.Nil(t, err) @@ -240,7 +241,7 @@ func TestCollapseResolveLock(t *testing.T) { } func TestForwardMetadataByUnaryCall(t *testing.T) { - server, port := startMockTikvService() + server, port := mock_server.StartMockTikvService() require.True(t, port > 0) defer server.Stop() addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) @@ -255,7 +256,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) { var checkCnt uint64 // Check no corresponding metadata if ForwardedHost is empty. - server.setMetaChecker(func(ctx context.Context) error { + server.SetMetaChecker(func(ctx context.Context) error { atomic.AddUint64(&checkCnt, 1) // gRPC may set some metadata by default, e.g. "context-type". md, ok := metadata.FromIncomingContext(ctx) @@ -283,7 +284,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) { checkCnt = 0 forwardedHost := "127.0.0.1:6666" // Check the metadata exists. - server.setMetaChecker(func(ctx context.Context) error { + server.SetMetaChecker(func(ctx context.Context) error { atomic.AddUint64(&checkCnt, 1) // gRPC may set some metadata by default, e.g. "context-type". md, ok := metadata.FromIncomingContext(ctx) @@ -308,10 +309,10 @@ func TestForwardMetadataByUnaryCall(t *testing.T) { } func TestForwardMetadataByBatchCommands(t *testing.T) { - server, port := startMockTikvService() + server, port := mock_server.StartMockTikvService() require.True(t, port > 0) defer server.Stop() - addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) + addr := server.Addr() // Enable batch and limit the connection count to 1 so that // there is only one BatchCommands stream for each host or forwarded host. @@ -324,7 +325,7 @@ func TestForwardMetadataByBatchCommands(t *testing.T) { var checkCnt uint64 setCheckHandler := func(forwardedHost string) { - server.setMetaChecker(func(ctx context.Context) error { + server.SetMetaChecker(func(ctx context.Context) error { atomic.AddUint64(&checkCnt, 1) md, ok := metadata.FromIncomingContext(ctx) if forwardedHost == "" { @@ -641,10 +642,10 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { conf.TiKVClient.MaxBatchSize = 128 })() - server, port := startMockTikvService() + server, port := mock_server.StartMockTikvService() require.True(t, port > 0) require.True(t, server.IsRunning()) - addr := server.addr + addr := server.Addr() client := NewRPCClient() defer func() { err := client.Close() @@ -681,7 +682,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { logutil.BgLogger().Info("restart mock tikv server") server.Start(addr) require.True(t, server.IsRunning()) - require.Equal(t, addr, server.addr) + require.Equal(t, addr, server.Addr()) // Wait batch client to auto reconnect. start := time.Now() @@ -700,7 +701,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { cli.unlockForSend() break } - if time.Since(start) > time.Second*5 { + if time.Since(start) > time.Second*10 { // It shouldn't take too long for batch_client to reconnect. require.Fail(t, "wait batch client reconnect timeout") } diff --git a/internal/client/main_test.go b/internal/client/main_test.go index c4d2be246..6a22714f0 100644 --- a/internal/client/main_test.go +++ b/internal/client/main_test.go @@ -26,6 +26,7 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/internal/retry.newBackoffFn.func1"), + goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/internal/retry.(*Config).createBackoffFn.newBackoffFn.func2"), } goleak.VerifyTestMain(m, opts...) } diff --git a/internal/client/mock_tikv_service_test.go b/internal/client/mock_server/mock_tikv_service.go similarity index 77% rename from internal/client/mock_tikv_service_test.go rename to internal/client/mock_server/mock_tikv_service.go index 40234c1d5..392d3a5fb 100644 --- a/internal/client/mock_tikv_service_test.go +++ b/internal/client/mock_server/mock_tikv_service.go @@ -18,7 +18,7 @@ // https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/mock_tikv_service_test.go // -package client +package mock_server import ( "context" @@ -36,7 +36,7 @@ import ( "google.golang.org/grpc" ) -type server struct { +type MockServer struct { tikvpb.TikvServer grpcServer *grpc.Server addr string @@ -49,21 +49,28 @@ type server struct { } } -func (s *server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { +func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { + if err := s.checkMetadata(ctx); err != nil { + return nil, err + } + return &kvrpcpb.GetResponse{}, nil +} + +func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { if err := s.checkMetadata(ctx); err != nil { return nil, err } return &kvrpcpb.PrewriteResponse{}, nil } -func (s *server) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error { +func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err } return ss.Send(&coprocessor.Response{}) } -func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { +func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err } @@ -94,13 +101,13 @@ func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { } } -func (s *server) setMetaChecker(check func(context.Context) error) { +func (s *MockServer) SetMetaChecker(check func(context.Context) error) { s.metaChecker.Lock() s.metaChecker.check = check s.metaChecker.Unlock() } -func (s *server) checkMetadata(ctx context.Context) error { +func (s *MockServer) checkMetadata(ctx context.Context) error { s.metaChecker.Lock() defer s.metaChecker.Unlock() if s.metaChecker.check != nil { @@ -109,16 +116,20 @@ func (s *server) checkMetadata(ctx context.Context) error { return nil } -func (s *server) IsRunning() bool { +func (s *MockServer) IsRunning() bool { return atomic.LoadInt64(&s.running) == 1 } -func (s *server) Stop() { +func (s *MockServer) Addr() string { + return s.addr +} + +func (s *MockServer) Stop() { s.grpcServer.Stop() atomic.StoreInt64(&s.running, 0) } -func (s *server) Start(addr string) int { +func (s *MockServer) Start(addr string) int { if addr == "" { addr = fmt.Sprintf("%s:%d", "127.0.0.1", 0) } @@ -148,9 +159,9 @@ func (s *server) Start(addr string) int { return port } -// Try to start a gRPC server and retrun the server instance and binded port. -func startMockTikvService() (*server, int) { - server := &server{} +// StartMockTikvService try to start a gRPC server and retrun the server instance and binded port. +func StartMockTikvService() (*MockServer, int) { + server := &MockServer{} port := server.Start("") return server, port } diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 622e70aec..aabeb8149 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -508,7 +508,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* r := recover() if r != nil { logutil.BgLogger().Error("panic in the checkAndResolve goroutine", - zap.Reflect("r", r), + zap.Any("r", r), zap.Stack("stack trace")) } }() @@ -576,15 +576,22 @@ func (c *RPCContext) String() string { type contextPatcher struct { staleRead *bool replicaRead *bool + + timeoutFactor float64 } -func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) { +func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context, timeout time.Duration) time.Duration { if patcher.staleRead != nil { pbCtx.StaleRead = *patcher.staleRead } if patcher.replicaRead != nil { pbCtx.ReplicaRead = *patcher.replicaRead } + if patcher.timeoutFactor > 0 { + pbCtx.MaxExecutionDurationMs = uint64(float64(pbCtx.MaxExecutionDurationMs) * patcher.timeoutFactor) + timeout = time.Duration(float64(timeout) * patcher.timeoutFactor) + } + return timeout } type storeSelectorOp struct { @@ -667,7 +674,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep storeFailEpoch := atomic.LoadUint32(&store.epoch) if storeFailEpoch != regionStore.storeEpochs[storeIdx] { cachedRegion.invalidate(Other) - logutil.BgLogger().Info("invalidate current region, because others failed on same store", + logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), zap.String("store", store.addr)) return nil, nil @@ -782,7 +789,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, storeFailEpoch := atomic.LoadUint32(&store.epoch) if storeFailEpoch != regionStore.storeEpochs[storeIdx] { cachedRegion.invalidate(Other) - logutil.BgLogger().Info("invalidate current region, because others failed on same store", + logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), zap.String("store", store.addr)) // TiFlash will always try to find out a valid peer, avoiding to retry too many times. @@ -1819,7 +1826,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext (meta.GetRegionEpoch().GetConfVer() < ctx.Region.confVer || meta.GetRegionEpoch().GetVersion() < ctx.Region.ver) { err := errors.Errorf("region epoch is ahead of tikv. rpc ctx: %+v, currentRegions: %+v", ctx, currentRegions) - logutil.BgLogger().Info("region epoch is ahead of tikv", zap.Error(err)) + logutil.Logger(bo.GetCtx()).Info("region epoch is ahead of tikv", zap.Error(err)) return true, bo.Backoff(retry.BoRegionMiss, err) } } @@ -2448,10 +2455,23 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} + newStore.livenessState = atomic.LoadUint32(&s.livenessState) + if newStore.getLivenessState() != reachable { + newStore.unreachableSince = s.unreachableSince + go newStore.checkUntilHealth(c, newStore.getLivenessState(), storeReResolveInterval) + } c.storeMu.Lock() c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() s.setResolveState(deleted) + logutil.BgLogger().Info("store address or labels changed, add new store and mark old store deleted", + zap.Uint64("store", s.storeID), + zap.String("old-addr", s.addr), + zap.Any("old-labels", s.labels), + zap.String("old-liveness", s.getLivenessState().String()), + zap.String("new-addr", newStore.addr), + zap.Any("new-labels", newStore.labels), + zap.String("new-liveness", newStore.getLivenessState().String())) return false, nil } s.changeResolveStateTo(needCheck, resolved) @@ -2589,6 +2609,8 @@ func (s livenessState) String() string { } } +var storeReResolveInterval = 30 * time.Second + func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) { // This mechanism doesn't support non-TiKV stores currently. if s.storeType != tikvrpc.TiKV { @@ -2600,14 +2622,27 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt // It may be already started by another thread. if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { s.unreachableSince = time.Now() - go s.checkUntilHealth(c) + reResolveInterval := storeReResolveInterval + if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { + if dur, err := time.ParseDuration(val.(string)); err == nil { + reResolveInterval = dur + } + } + go s.checkUntilHealth(c, liveness, reResolveInterval) } } -func (s *Store) checkUntilHealth(c *RegionCache) { - defer atomic.StoreUint32(&s.livenessState, uint32(reachable)) - +func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) { ticker := time.NewTicker(time.Second) + defer func() { + ticker.Stop() + if liveness != reachable { + logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop", + zap.Uint64("storeID", s.storeID), + zap.String("state", s.getResolveState().String()), + zap.String("liveness", s.getLivenessState().String())) + } + }() lastCheckPDTime := time.Now() for { @@ -2615,26 +2650,29 @@ func (s *Store) checkUntilHealth(c *RegionCache) { case <-c.ctx.Done(): return case <-ticker.C: - if time.Since(lastCheckPDTime) > time.Second*30 { + if s.getResolveState() == deleted { + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) + return + } + if time.Since(lastCheckPDTime) > reResolveInterval { lastCheckPDTime = time.Now() valid, err := s.reResolve(c) if err != nil { logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) } else if !valid { - logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr)) + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) return } } bo := retry.NewNoopBackoff(c.ctx) - l := s.requestLiveness(bo, c) - if l == reachable { + liveness = s.requestLiveness(bo, c) + atomic.StoreUint32(&s.livenessState, uint32(liveness)) + if liveness == reachable { logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) - return } - atomic.StoreUint32(&s.livenessState, uint32(l)) } } } @@ -2642,7 +2680,20 @@ func (s *Store) checkUntilHealth(c *RegionCache) { func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) { // It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead. if val, err := util.EvalFailpoint("injectLiveness"); err == nil { - switch val.(string) { + liveness := val.(string) + if strings.Contains(liveness, " ") { + for _, item := range strings.Split(liveness, " ") { + kv := strings.Split(item, ":") + if len(kv) != 2 { + continue + } + if kv[0] == s.addr { + liveness = kv[1] + break + } + } + } + switch liveness { case "unreachable": return unreachable case "reachable": diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index caefb2ae5..4e2c54c86 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1665,3 +1665,88 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() { }, 3*time.Second, 200*time.Millisecond) s.checkCache(remaining) } + +func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { + // init region cache + s.cache.LocateKey(s.bo, []byte("a")) + + store1 := s.cache.getStoreByStoreID(s.store1) + s.Require().NotNil(store1) + s.Require().Equal(resolved, store1.getResolveState()) + + // setup mock liveness func + store1Liveness := uint32(unreachable) + tf := func(s *Store, bo *retry.Backoffer) livenessState { + if s.storeID == store1.storeID { + return livenessState(atomic.LoadUint32(&store1Liveness)) + } + return reachable + } + s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + + // start health check loop + atomic.StoreUint32(&store1.livenessState, store1Liveness) + go store1.checkUntilHealth(s.cache, livenessState(store1Liveness), time.Second) + + // update store meta + s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...) + + // assert that the old store should be deleted and it's not reachable + s.Eventually(func() bool { + return store1.getResolveState() == deleted && store1.getLivenessState() != reachable + }, 3*time.Second, time.Second) + + // assert that the new store should be added and it's also not reachable + newStore1 := s.cache.getStoreByStoreID(s.store1) + s.Require().NotEqual(reachable, newStore1.getLivenessState()) + + // recover store1 + atomic.StoreUint32(&store1Liveness, uint32(reachable)) + + // assert that the new store should be reachable + s.Eventually(func() bool { + return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable + }, 3*time.Second, time.Second) +} + +func (s *testRegionCacheSuite) TestIssue1401() { + // init region cache + s.cache.LocateKey(s.bo, []byte("a")) + + store1 := s.cache.getStoreByStoreID(s.store1) + s.Require().NotNil(store1) + s.Require().Equal(resolved, store1.getResolveState()) + // change store1 label. + labels := store1.labels + labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"}) + s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...) + + // mark the store is unreachable and need check. + atomic.StoreUint32(&store1.livenessState, uint32(unreachable)) + store1.setResolveState(needCheck) + + // setup mock liveness func + tf := func(s *Store, bo *retry.Backoffer) livenessState { + return reachable + } + s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + + // start health check loop + go store1.checkUntilHealth(s.cache, unreachable, time.Second*30) + + // mock asyncCheckAndResolveLoop worker to check and resolve store. + s.cache.checkAndResolve(nil, func(s *Store) bool { + return s.getResolveState() == needCheck + }) + + // assert that the old store should be deleted. + s.Eventually(func() bool { + return store1.getResolveState() == deleted + }, 3*time.Second, time.Second) + // assert the new store should be added and it should be resolved and reachable. + newStore1 := s.cache.getStoreByStoreID(s.store1) + s.Eventually(func() bool { + return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable + }, 3*time.Second, time.Second) + s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161")) +} diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index e47f0d809..433aa7354 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -35,16 +35,18 @@ package locate import ( - "bytes" "context" "fmt" "math/rand" + "reflect" "strconv" "strings" "sync" "sync/atomic" "time" + "unsafe" + uatomic "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -80,6 +82,30 @@ func LoadShuttingDown() uint32 { return atomic.LoadUint32(&shuttingDown) } +// ReplicaSelectorExperimentalOptions defines experimental options of replica selector. +type ReplicaSelectorExperimentalOptions struct { + StaleRead struct { + PreventRetryFollower bool + RetryLeaderTimeoutFactor float64 + } +} + +var selectorExpOpts uatomic.Pointer[ReplicaSelectorExperimentalOptions] + +// SetReplicaSelectorExperimentalOptions sets experimental options of replica selector. +func SetReplicaSelectorExperimentalOptions(opts ReplicaSelectorExperimentalOptions) { + selectorExpOpts.Store(&opts) +} + +// GetReplicaSelectorExperimentalOptions gets experimental options of replica selector. +func GetReplicaSelectorExperimentalOptions() (opts ReplicaSelectorExperimentalOptions) { + ptr := selectorExpOpts.Load() + if ptr == nil { + return ReplicaSelectorExperimentalOptions{} + } + return *ptr +} + // RegionRequestSender sends KV/Cop requests to tikv server. It handles network // errors and some region errors internally. // @@ -107,18 +133,35 @@ type RegionRequestSender struct { replicaSelector *replicaSelector failStoreIDs map[uint64]struct{} failProxyStoreIDs map[uint64]struct{} - RegionRequestRuntimeStats + Stats *RegionRequestRuntimeStats + AccessStats *ReplicaAccessStats +} + +func (s *RegionRequestSender) String() string { + if s.replicaSelector == nil { + return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector) + } + return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector.String()) } // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { - Stats map[tikvrpc.CmdType]*RPCRuntimeStats + RPCStats map[tikvrpc.CmdType]*RPCRuntimeStats + RequestErrorStats +} + +// RequestErrorStats records the request error(region error and rpc error) count. +type RequestErrorStats struct { + // ErrStats record the region error and rpc error, and their count. + // Attention: avoid too many error types, ErrStats only record the first 16 different errors. + ErrStats map[string]int + OtherErrCnt int } // NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. -func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { - return RegionRequestRuntimeStats{ - Stats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), +func NewRegionRequestRuntimeStats() *RegionRequestRuntimeStats { + return &RegionRequestRuntimeStats{ + RPCStats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), } } @@ -129,14 +172,55 @@ type RPCRuntimeStats struct { Consume int64 } +// RecordRPCRuntimeStats uses to record the rpc count and duration stats. +func (r *RegionRequestRuntimeStats) RecordRPCRuntimeStats(cmd tikvrpc.CmdType, d time.Duration) { + stat, ok := r.RPCStats[cmd] + if !ok { + r.RPCStats[cmd] = &RPCRuntimeStats{ + Count: 1, + Consume: int64(d), + } + return + } + stat.Count++ + stat.Consume += int64(d) +} + +// RecordRPCErrorStats uses to record the request error(region error label and rpc error) info and count. +func (r *RequestErrorStats) RecordRPCErrorStats(errLabel string) { + if r.ErrStats == nil { + // lazy init to avoid unnecessary allocation. + r.ErrStats = make(map[string]int) + } + if len(r.ErrStats) < 16 { + // Avoid too many error. + r.ErrStats[errLabel]++ + } else { + r.OtherErrCnt++ + } +} + +// getErrMsg returns error message. if the error has cause error, then return cause error message. +func getErrMsg(err error) string { + if err == nil { + return "" + } + if causeErr := errors.Cause(err); causeErr != nil { + return causeErr.Error() + } + return err.Error() +} + // String implements fmt.Stringer interface. func (r *RegionRequestRuntimeStats) String() string { + if r == nil { + return "" + } var builder strings.Builder - for k, v := range r.Stats { + for k, v := range r.RPCStats { if builder.Len() > 0 { builder.WriteByte(',') } - // append string: fmt.Sprintf("%s:{num_rpc:%v, total_time:%s}", k.String(), v.Count, util.FormatDuration(time.Duration(v.Consume))") builder.WriteString(k.String()) builder.WriteString(":{num_rpc:") builder.WriteString(strconv.FormatInt(v.Count, 10)) @@ -144,27 +228,61 @@ func (r *RegionRequestRuntimeStats) String() string { builder.WriteString(util.FormatDuration(time.Duration(v.Consume))) builder.WriteString("}") } + if errStatsStr := r.RequestErrorStats.String(); errStatsStr != "" { + builder.WriteString(", rpc_errors:") + builder.WriteString(errStatsStr) + } + return builder.String() +} + +// String implements fmt.Stringer interface. +func (r *RequestErrorStats) String() string { + if len(r.ErrStats) == 0 { + return "" + } + var builder strings.Builder + builder.WriteString("{") + for err, cnt := range r.ErrStats { + if builder.Len() > 2 { + builder.WriteString(", ") + } + builder.WriteString(err) + builder.WriteString(":") + builder.WriteString(strconv.Itoa(cnt)) + } + if r.OtherErrCnt > 0 { + builder.WriteString(", other_error:") + builder.WriteString(strconv.Itoa(r.OtherErrCnt)) + } + builder.WriteString("}") return builder.String() } // Clone returns a copy of itself. -func (r *RegionRequestRuntimeStats) Clone() RegionRequestRuntimeStats { +func (r *RegionRequestRuntimeStats) Clone() *RegionRequestRuntimeStats { newRs := NewRegionRequestRuntimeStats() - for cmd, v := range r.Stats { - newRs.Stats[cmd] = &RPCRuntimeStats{ - Count: v.Count, - Consume: v.Consume, + for k, v := range r.RPCStats { + newRs.RPCStats[k] = v + } + if len(r.ErrStats) > 0 { + newRs.ErrStats = make(map[string]int) + for k, v := range r.ErrStats { + newRs.ErrStats[k] = v } + newRs.OtherErrCnt = r.OtherErrCnt } return newRs } // Merge merges other RegionRequestRuntimeStats. -func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { - for cmd, v := range rs.Stats { - stat, ok := r.Stats[cmd] +func (r *RegionRequestRuntimeStats) Merge(rs *RegionRequestRuntimeStats) { + if rs == nil { + return + } + for cmd, v := range rs.RPCStats { + stat, ok := r.RPCStats[cmd] if !ok { - r.Stats[cmd] = &RPCRuntimeStats{ + r.RPCStats[cmd] = &RPCRuntimeStats{ Count: v.Count, Consume: v.Consume, } @@ -173,20 +291,114 @@ func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { stat.Count += v.Count stat.Consume += v.Consume } + if len(rs.ErrStats) > 0 { + if r.ErrStats == nil { + r.ErrStats = make(map[string]int) + } + for err, cnt := range rs.ErrStats { + r.ErrStats[err] += cnt + } + r.OtherErrCnt += rs.OtherErrCnt + } +} + +// ReplicaAccessStats records the replica access info. +type ReplicaAccessStats struct { + // AccessInfos records the access info + AccessInfos []ReplicaAccessInfo + // avoid to consume too much memory, after more than 5 records, count them by peerID in `OverflowAccessStat` map. + OverflowAccessStat map[uint64]*RequestErrorStats } -// RecordRegionRequestRuntimeStats records request runtime stats. -func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { - stat, ok := stats[cmd] +// ReplicaAccessInfo indicates the access path detail info of a request. +type ReplicaAccessInfo struct { + Peer uint64 + Store uint64 + ReqReadTp ReqReadType + Err string +} + +type ReqReadType byte + +const ( + ReqLeader ReqReadType = iota + ReqReplicaRead + ReqStaleRead +) + +func (s *ReplicaAccessStats) recordReplicaAccessInfo(staleRead, replicaRead bool, peerID, storeID uint64, err string) { + if len(s.AccessInfos) < 5 { + tp := ReqLeader + if replicaRead { + tp = ReqReplicaRead + } else if staleRead { + tp = ReqStaleRead + } + s.AccessInfos = append(s.AccessInfos, ReplicaAccessInfo{ + Peer: peerID, + Store: storeID, + ReqReadTp: tp, + Err: err, + }) + return + } + if s.OverflowAccessStat == nil { + s.OverflowAccessStat = make(map[uint64]*RequestErrorStats) + } + stat, ok := s.OverflowAccessStat[peerID] if !ok { - stats[cmd] = &RPCRuntimeStats{ - Count: 1, - Consume: int64(d), + stat = &RequestErrorStats{} + s.OverflowAccessStat[peerID] = stat + } + stat.RecordRPCErrorStats(err) +} + +// String implements fmt.Stringer interface. +func (s *ReplicaAccessStats) String() string { + if s == nil { + return "" + } + var builder strings.Builder + for i, info := range s.AccessInfos { + if i > 0 { + builder.WriteString(", ") + } + switch info.ReqReadTp { + case ReqLeader: + builder.WriteString("{") + case ReqReplicaRead: + builder.WriteString("{replica_read, ") + case ReqStaleRead: + builder.WriteString("{stale_read, ") + } + builder.WriteString("peer:") + builder.WriteString(strconv.FormatUint(info.Peer, 10)) + builder.WriteString(", store:") + builder.WriteString(strconv.FormatUint(info.Store, 10)) + builder.WriteString(", err:") + builder.WriteString(info.Err) + builder.WriteString("}") + } + if len(s.OverflowAccessStat) > 0 { + builder.WriteString(", overflow_count:{") + cnt := 0 + for peerID, stat := range s.OverflowAccessStat { + if stat == nil { + continue + } + if cnt > 0 { + builder.WriteString(", ") + } + builder.WriteString("{peer:") + builder.WriteString(strconv.FormatUint(peerID, 10)) + builder.WriteString(", error_stats:") + builder.WriteString(stat.String()) + builder.WriteString("}") + cnt++ } - return + builder.WriteString("}") } - stat.Count++ - stat.Consume += int64(d) + return builder.String() } // NewRegionRequestSender creates a new sender. @@ -235,11 +447,23 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *tikvrpc.Request, return resp, err } +func (s *RegionRequestSender) recordRPCAccessInfo(req *tikvrpc.Request, rpcCtx *RPCContext, err string) { + if req == nil || rpcCtx == nil || rpcCtx.Peer == nil || rpcCtx.Store == nil { + return + } + if s.AccessStats == nil { + s.AccessStats = &ReplicaAccessStats{} + } + s.AccessStats.recordReplicaAccessInfo(req.StaleRead, req.ReplicaRead, rpcCtx.Peer.GetId(), rpcCtx.Store.storeID, err) +} + type replica struct { store *Store peer *metapb.Peer epoch uint32 attempts int + // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. + deadlineErrUsingConfTimeout bool } func (r *replica) isEpochStale() bool { @@ -261,6 +485,70 @@ type replicaSelector struct { targetIdx AccessIndex // replicas[proxyIdx] is the store used to redirect requests this time proxyIdx AccessIndex + + ReplicaSelectorExperimentalOptions +} + +func selectorStateToString(state selectorState) string { + replicaSelectorState := "nil" + if state != nil { + switch state.(type) { + case *accessKnownLeader: + replicaSelectorState = "accessKnownLeader" + case *accessFollower: + replicaSelectorState = "accessFollower" + case *accessByKnownProxy: + replicaSelectorState = "accessByKnownProxy" + case *tryFollower: + replicaSelectorState = "tryFollower" + case *tryNewProxy: + replicaSelectorState = "tryNewProxy" + case *invalidLeader: + replicaSelectorState = "invalidLeader" + case *invalidStore: + replicaSelectorState = "invalidStore" + case *stateBase: + replicaSelectorState = "stateBase" + case nil: + replicaSelectorState = "nil" + } + } + return replicaSelectorState +} + +func (s *replicaSelector) String() string { + selectorStateStr := "nil" + if s != nil { + selectorStateStr = selectorStateToString(s.state) + } + var replicaStatus []string + cacheRegionIsValid := "unknown" + leaderPeerID := uint64(0) + if s != nil { + if s.region != nil { + leaderPeerID = s.region.GetLeaderPeerID() + if s.region.isValid() { + cacheRegionIsValid = "true" + } else { + cacheRegionIsValid = "false" + } + } + for _, replica := range s.replicas { + replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, "+ + "attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", + replica.peer.GetId(), + replica.store.storeID, + replica.isEpochStale(), + replica.attempts, + replica.epoch, + atomic.LoadUint32(&replica.store.epoch), + replica.store.getResolveState(), + replica.store.getLivenessState(), + )) + } + } + return fmt.Sprintf("{state: %v, cacheRegionIsValid: %v, leaderPeerID: %v, replicaStatus: %v}", + selectorStateStr, cacheRegionIsValid, leaderPeerID, replicaStatus) } // selectorState is the interface of states of the replicaSelector. @@ -337,7 +625,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec // a request. So, before the new leader is elected, we should not send requests // to the unreachable old leader to avoid unnecessary timeout. if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} return nil, stateChanged{} } selector.targetIdx = state.leaderIdx @@ -352,7 +640,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep return } if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} } if liveness != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) @@ -360,7 +648,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep } func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} } // tryFollower is the state where we cannot access the known leader @@ -372,22 +660,24 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { // the leader will be updated to replicas[0] and give it another chance. type tryFollower struct { stateBase - // if the leader is unavailable, but it still holds the leadership, fallbackFromLeader is true and replica read is enabled. - fallbackFromLeader bool - leaderIdx AccessIndex - lastIdx AccessIndex - labels []*metapb.StoreLabel + leaderIdx AccessIndex + lastIdx AccessIndex + labels []*metapb.StoreLabel + // fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`. + fromAccessKnownLeader bool } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + hasDeadlineExceededErr := false filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) { for i := 0; i < len(selector.replicas); i++ { idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + selectReplica := selector.replicas[idx] + hasDeadlineExceededErr = hasDeadlineExceededErr || selectReplica.deadlineErrUsingConfTimeout if idx == state.leaderIdx { continue } - selectReplica := selector.replicas[idx] - if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable { + if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout { return idx, selectReplica } } @@ -418,6 +708,10 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( // If all followers are tried and fail, backoff and retry. if selector.targetIdx < 0 { + if hasDeadlineExceededErr { + // when meet deadline exceeded error, do fast retry without invalidate region cache. + return nil, nil + } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -426,17 +720,17 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( if err != nil || rpcCtx == nil { return rpcCtx, err } - if state.fallbackFromLeader { - staleRead := false - rpcCtx.contextPatcher.staleRead = &staleRead + if !state.fromAccessKnownLeader { replicaRead := true rpcCtx.contextPatcher.replicaRead = &replicaRead } + staleRead := false + rpcCtx.contextPatcher.staleRead = &staleRead return rpcCtx, nil } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if !state.fallbackFromLeader { + if state.fromAccessKnownLeader { peer := selector.targetReplica().peer if !selector.region.switchWorkLeaderToPeer(peer) { logutil.BgLogger().Warn("the store must exist", @@ -617,25 +911,32 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector leader := selector.replicas[state.leaderIdx] leaderEpochStale := leader.isEpochStale() leaderUnreachable := leader.store.getLivenessState() != reachable - leaderInvalid := leaderEpochStale || leaderUnreachable || state.IsLeaderExhausted(leader) - if len(state.option.labels) > 0 { - logutil.BgLogger().Warn("unable to find stores with given labels", + leaderExhausted := state.IsLeaderExhausted(leader) + leaderTimeout := leader.deadlineErrUsingConfTimeout + if len(state.option.labels) > 0 && !state.option.leaderOnly { + logutil.BgLogger().Debug("unable to find a store with given labels", zap.Uint64("region", selector.region.GetID()), - zap.Bool("leader-epoch-stale", leaderEpochStale), - zap.Bool("leader-unreachable", leaderUnreachable), - zap.Bool("leader-invalid", leaderInvalid), + zap.Bool("stale-read", state.isStaleRead), zap.Any("labels", state.option.labels)) } - if leaderInvalid { + if leaderEpochStale || leaderUnreachable || leaderExhausted || leaderTimeout { + logutil.BgLogger().Warn("unable to find a valid leader", + zap.Uint64("region", selector.region.GetID()), + zap.Bool("stale-read", state.isStaleRead), + zap.Bool("epoch-stale", leaderEpochStale), + zap.Bool("unreachable", leaderUnreachable), + zap.Bool("exhausted", leaderExhausted), + zap.Bool("timeout", leaderTimeout)) // In stale-read, the request will fallback to leader after the local follower failure. // If the leader is also unavailable, we can fallback to the follower and use replica-read flag again, // The remote follower not tried yet, and the local follower can retry without stale-read flag. - if state.isStaleRead { + // If leader tried and received deadline exceeded error, try follower. + if (state.isStaleRead && !selector.StaleRead.PreventRetryFollower) || + (!state.isStaleRead && leader.deadlineErrUsingConfTimeout) { selector.state = &tryFollower{ - fallbackFromLeader: true, - leaderIdx: state.leaderIdx, - lastIdx: state.leaderIdx, - labels: state.option.labels, + leaderIdx: state.leaderIdx, + lastIdx: state.leaderIdx, + labels: state.option.labels, } if leaderEpochStale { selector.regionCache.scheduleReloadRegion(selector.region) @@ -656,6 +957,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if resetStaleRead { staleRead := false rpcCtx.contextPatcher.staleRead = &staleRead + rpcCtx.contextPatcher.timeoutFactor = selector.StaleRead.RetryLeaderTimeoutFactor } return rpcCtx, nil } @@ -680,7 +982,8 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic } func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { - if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable { + // the epoch is staled or retry exhausted, or the store is unreachable. + if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout { return false } if state.option.leaderOnly && idx == state.leaderIdx { @@ -760,6 +1063,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik state, -1, -1, + GetReplicaSelectorExperimentalOptions(), }, nil } @@ -798,6 +1102,17 @@ func (s *replicaSelector) proxyReplica() *replica { return nil } +// sliceIdentical checks whether two slices are referencing the same block of memory. Two `nil`s are also considered +// the same. +func sliceIdentical[T any](a, b []T) bool { + if len(a) != len(b) { + return false + } + aHeader := (*reflect.SliceHeader)(unsafe.Pointer(&a)) + bHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + return aHeader.Data == bHeader.Data +} + func (s *replicaSelector) refreshRegionStore() { oldRegionStore := s.regionStore newRegionStore := s.region.getStore() @@ -810,7 +1125,7 @@ func (s *replicaSelector) refreshRegionStore() { // So we just compare the address here. // When stores change, we mark this replicaSelector as invalid to let the caller // recreate a new replicaSelector. - if &oldRegionStore.stores != &newRegionStore.stores { + if !sliceIdentical(oldRegionStore.stores, newRegionStore.stores) { s.state = &invalidStore{} return } @@ -887,6 +1202,28 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { s.state.onSendFailure(bo, s, err) } +func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool { + if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) { + // Configurable timeout should less than `ReadTimeoutShort`. + return false + } + switch req.Type { + case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan, + tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream: + if target := s.targetReplica(); target != nil { + target.deadlineErrUsingConfTimeout = true + } + if accessLeader, ok := s.state.(*accessKnownLeader); ok { + // If leader return deadline exceeded error, we should try to access follower next time. + s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} + } + return true + default: + // Only work for read requests, return false for non-read requests. + return false + } +} + func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { store := accessReplica.store liveness := store.requestLiveness(bo, s.regionCache) @@ -1030,6 +1367,8 @@ func IsFakeRegionError(err *errorpb.Error) bool { return err != nil && err.GetEpochNotMatch() != nil && len(err.GetEpochNotMatch().CurrentRegions) == 0 } +const slowLogSendReqTime = 100 * time.Millisecond + // SendReqCtx sends a request to tikv server and return response and RPCCtx of this RPC. func (s *RegionRequestSender) SendReqCtx( bo *retry.Backoffer, @@ -1096,6 +1435,8 @@ func (s *RegionRequestSender) SendReqCtx( s.reset() tryTimes := 0 + startTime := time.Now() + startBackOff := bo.GetTotalSleep() defer func() { if tryTimes > 0 { metrics.TiKVRequestRetryTimesHistogram.Observe(float64(tryTimes)) @@ -1114,10 +1455,8 @@ func (s *RegionRequestSender) SendReqCtx( }() } - totalErrors := make(map[string]int) for { if tryTimes > 0 { - req.IsRetryRequest = true if tryTimes%100 == 0 { logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", tryTimes)) } @@ -1143,7 +1482,9 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. - s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, tryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { + s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, tryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, err } @@ -1166,14 +1507,32 @@ func (s *RegionRequestSender) SendReqCtx( } } + timeout = rpcCtx.contextPatcher.applyTo(&req.Context, timeout) + if req.InputRequestSource != "" && s.replicaSelector != nil { + s.replicaSelector.patchRequestSource(req, rpcCtx) + } + if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { + return nil, nil, err + } + var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) + req.IsRetryRequest = true if err != nil { - msg := fmt.Sprintf("send request failed, err: %v", err.Error()) - s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { + msg := fmt.Sprintf("send request failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, tryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } return nil, nil, err } + if _, err1 := util.EvalFailpoint("afterSendReqToRegion"); err1 == nil { + if hook := bo.GetCtx().Value("sendReqToRegionFinishHook"); hook != nil { + h := hook.(func(*tikvrpc.Request, *tikvrpc.Response, error)) + h(req, resp, err) + } + } + // recheck whether the session/query is killed during the Next() boVars := bo.GetVars() if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 { @@ -1195,19 +1554,21 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, err } if regionErr != nil { - regionErrLabel := regionErrorToLabel(regionErr) - totalErrors[regionErrLabel]++ retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) if err != nil { - msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) - s.logSendReqError(bo, msg, regionID, tryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { + msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error()) + s.logSendReqError(bo, msg, regionID, tryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } return nil, nil, err } if retry { tryTimes++ continue } - s.logSendReqError(bo, "send request meet region error without retry", regionID, tryTimes, req, totalErrors) + if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 { + s.logSendReqError(bo, "send request meet region error without retry", regionID, tryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout) + } } else { if s.replicaSelector != nil { s.replicaSelector.onSendSuccess() @@ -1220,73 +1581,49 @@ func (s *RegionRequestSender) SendReqCtx( } } -func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) { - var replicaStatus []string - replicaSelectorState := "nil" - cacheRegionIsValid := "unknown" - if s.replicaSelector != nil { - switch s.replicaSelector.state.(type) { - case *accessKnownLeader: - replicaSelectorState = "accessKnownLeader" - case *accessFollower: - replicaSelectorState = "accessFollower" - case *accessByKnownProxy: - replicaSelectorState = "accessByKnownProxy" - case *tryFollower: - replicaSelectorState = "tryFollower" - case *tryNewProxy: - replicaSelectorState = "tryNewProxy" - case *invalidLeader: - replicaSelectorState = "invalidLeader" - case *invalidStore: - replicaSelectorState = "invalidStore" - case *stateBase: - replicaSelectorState = "stateBase" - case nil: - replicaSelectorState = "nil" - } - if s.replicaSelector.region != nil { - if s.replicaSelector.region.isValid() { - cacheRegionIsValid = "true" - } else { - cacheRegionIsValid = "false" - } - } - for _, replica := range s.replicaSelector.replicas { - replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v", - replica.peer.GetId(), - replica.store.storeID, - replica.isEpochStale(), - replica.attempts, - replica.epoch, - atomic.LoadUint32(&replica.store.epoch), - replica.store.getResolveState(), - replica.store.getLivenessState(), - )) - } +func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, tryTimes int, req *tikvrpc.Request, cost time.Duration, currentBackoffMs int, timeout time.Duration) { + var builder strings.Builder + // build the total round stats string. + builder.WriteString("{total-backoff: ") + builder.WriteString(util.FormatDuration(time.Duration(bo.GetTotalSleep() * int(time.Millisecond)))) + builder.WriteString(", total-backoff-times: ") + builder.WriteString(strconv.Itoa(bo.GetTotalBackoffTimes())) + if s.Stats != nil { + builder.WriteString(", total-rpc: {") + builder.WriteString(s.Stats.String()) + builder.WriteString("}") } - var totalErrorStr bytes.Buffer - for err, cnt := range totalErrors { - if totalErrorStr.Len() > 0 { - totalErrorStr.WriteString(", ") - } - totalErrorStr.WriteString(err) - totalErrorStr.WriteString(":") - totalErrorStr.WriteString(strconv.Itoa(cnt)) + builder.WriteString("}") + totalRoundStats := builder.String() + + // build the current round stats string. + builder.Reset() + builder.WriteString("{time: ") + builder.WriteString(util.FormatDuration(cost)) + builder.WriteString(", backoff: ") + builder.WriteString(util.FormatDuration(time.Duration(currentBackoffMs * int(time.Millisecond)))) + builder.WriteString(", timeout: ") + builder.WriteString(util.FormatDuration(timeout)) + builder.WriteString(", req-max-exec-timeout: ") + builder.WriteString(util.FormatDuration(time.Duration(int64(req.Context.MaxExecutionDurationMs) * int64(time.Millisecond)))) + builder.WriteString(", try-times: ") + builder.WriteString(strconv.Itoa(tryTimes)) + if s.AccessStats != nil { + builder.WriteString(", replica-access: {") + builder.WriteString(s.AccessStats.String()) + builder.WriteString("}") } + builder.WriteString("}") + currentRoundStats := builder.String() logutil.Logger(bo.GetCtx()).Info(msg, zap.Uint64("req-ts", req.GetStartTS()), zap.String("req-type", req.Type.String()), zap.String("region", regionID.String()), - zap.String("region-is-valid", cacheRegionIsValid), - zap.Int("retry-times", retryTimes), zap.String("replica-read-type", req.ReplicaReadType.String()), - zap.String("replica-selector-state", replicaSelectorState), zap.Bool("stale-read", req.StaleRead), - zap.String("replica-status", strings.Join(replicaStatus, "; ")), - zap.Int("total-backoff-ms", bo.GetTotalSleep()), - zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()), - zap.String("total-region-errors", totalErrorStr.String())) + zap.Stringer("request-sender", s), + zap.String("total-round-stats", totalRoundStats), + zap.String("current-round-stats", currentRoundStats)) } // RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx. @@ -1356,10 +1693,6 @@ func fetchRespInfo(resp *tikvrpc.Response) string { func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCContext, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, retry bool, err error) { req.ApiVersion = s.apiVersion - if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { - return nil, false, err - } - rpcCtx.contextPatcher.applyTo(&req.Context) // judge the store limit switch. if limit := kv.StoreLimit.Load(); limit > 0 { if err := s.getStoreToken(rpcCtx.Store, limit); err != nil { @@ -1416,7 +1749,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo start := time.Now() resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout) if s.Stats != nil { - RecordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) + s.Stats.RecordRPCRuntimeStats(req.Type, time.Since(start)) if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil { if val.(bool) { if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 { @@ -1474,11 +1807,16 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo if err != nil { s.rpcError = err - + if s.Stats != nil { + errStr := getErrMsg(err) + s.Stats.RecordRPCErrorStats(errStr) + s.recordRPCAccessInfo(req, rpcCtx, errStr) + } // Because in rpc logic, context.Cancel() will be transferred to rpcContext.Cancel error. For rpcContext cancel, // we need to retry the request. But for context cancel active, for example, limitExec gets the required rows, // we shouldn't retry the request, it will go to backoff and hang in retry logic. if ctx.Err() != nil && errors.Cause(ctx.Err()) == context.Canceled { + metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeIDLabel(rpcCtx)).Inc() return nil, false, errors.WithStack(ctx.Err()) } @@ -1487,7 +1825,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo return nil, false, err } } - if e := s.onSendFail(bo, rpcCtx, err); e != nil { + if e := s.onSendFail(bo, rpcCtx, req, err); e != nil { return nil, false, err } return nil, true, nil @@ -1495,6 +1833,13 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo return } +func storeIDLabel(rpcCtx *RPCContext) string { + if rpcCtx != nil && rpcCtx.Store != nil { + return strconv.FormatUint(rpcCtx.Store.storeID, 10) + } + return "nil" +} + func (s *RegionRequestSender) getStoreToken(st *Store, limit int64) error { // Checking limit is not thread safe, preferring this for avoiding load in loop. count := st.tokenCount.Load() @@ -1517,29 +1862,44 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) { logutil.BgLogger().Warn("release store token failed, count equals to 0") } -func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, err error) error { +func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context())) defer span1.Finish() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } + storeLabel := storeIDLabel(ctx) // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled { + metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeLabel).Inc() return errors.WithStack(err) } else if LoadShuttingDown() > 0 { + metrics.TiKVRPCErrorCounter.WithLabelValues("shutting-down", storeLabel).Inc() return errors.WithStack(tikverr.ErrTiDBShuttingDown) + } else if isCauseByDeadlineExceeded(err) { + if s.replicaSelector != nil && s.replicaSelector.onReadReqConfigurableTimeout(req) { + errLabel := "read-timeout-" + strconv.FormatUint(req.MaxExecutionDurationMs, 10) + "ms" + metrics.TiKVRPCErrorCounter.WithLabelValues(errLabel, storeLabel).Inc() + return nil + } } if status.Code(errors.Cause(err)) == codes.Canceled { select { case <-bo.GetCtx().Done(): + metrics.TiKVRPCErrorCounter.WithLabelValues("grpc-canceled", storeLabel).Inc() return errors.WithStack(err) default: // If we don't cancel, but the error code is Canceled, it must be from grpc remote. // This may happen when tikv is killed and exiting. // Backoff and retry in this case. - logutil.BgLogger().Warn("receive a grpc cancel signal from remote", zap.Error(err)) + logutil.Logger(bo.GetCtx()).Warn("receive a grpc cancel signal from remote", zap.Error(err)) } } + if errStr := getErrMsg(err); len(errStr) > 0 { + metrics.TiKVRPCErrorCounter.WithLabelValues(errStr, storeLabel).Inc() + } else { + metrics.TiKVRPCErrorCounter.WithLabelValues("unknown", storeLabel).Inc() + } if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute { s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err) @@ -1563,6 +1923,13 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e return err } +func isCauseByDeadlineExceeded(err error) bool { + causeErr := errors.Cause(err) + return causeErr == context.DeadlineExceeded || // batch-client will return this error. + status.Code(causeErr) == codes.DeadlineExceeded || // when batch-client is disabled, grpc will return this error. + strings.Contains(err.Error(), "context deadline exceeded") // when batch-client is disabled, grpc may return this error with unknown code. +} + // NeedReloadRegion checks is all peers has sent failed, if so need reload. func (s *RegionRequestSender) NeedReloadRegion(ctx *RPCContext) (need bool) { if s.failStoreIDs == nil { @@ -1591,6 +1958,20 @@ func (s *RegionRequestSender) NeedReloadRegion(ctx *RPCContext) (need bool) { return } +// regionErrorToLogging constructs the logging content with extra information like returned leader peer id. +func regionErrorToLogging(e *errorpb.Error, errLabel string) string { + str := errLabel + if e.GetNotLeader() != nil { + notLeader := e.GetNotLeader() + if notLeader.GetLeader() != nil { + str = fmt.Sprintf("%v_with_leader_%v", str, notLeader.GetLeader().GetId()) + } else { + str = fmt.Sprintf("%v_with_no_leader", str) + } + } + return str +} + func regionErrorToLabel(e *errorpb.Error) string { if e.GetNotLeader() != nil { return "not_leader" @@ -1601,6 +1982,9 @@ func regionErrorToLabel(e *errorpb.Error) string { } else if e.GetEpochNotMatch() != nil { return "epoch_not_match" } else if e.GetServerIsBusy() != nil { + if strings.Contains(e.GetServerIsBusy().GetReason(), "deadline is exceeded") { + return "deadline_exceeded" + } return "server_is_busy" } else if e.GetStaleCommand() != nil { return "stale_command" @@ -1631,10 +2015,16 @@ func regionErrorToLabel(e *errorpb.Error) string { // the `mismatch peer id` error does not has a specific error type, so we have to match the error message. // TODO: add a specific error type for `mismatch peer id`. return "mismatch_peer_id" + } else if isDeadlineExceeded(e) { + return "deadline_exceeded" } return "unknown" } +func isDeadlineExceeded(e *errorpb.Error) bool { + return strings.Contains(e.GetMessage(), "Deadline is exceeded") +} + func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error) (shouldRetry bool, err error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context())) @@ -1642,13 +2032,17 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } - // NOTE: Please add the region error handler in the same order of errorpb.Error. - errLabel := regionErrorToLabel(regionErr) - metrics.TiKVRegionErrorCounter.WithLabelValues(errLabel).Inc() + regionErrLabel := regionErrorToLabel(regionErr) + metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrLabel, storeIDLabel(ctx)).Inc() + if s.Stats != nil { + s.Stats.RecordRPCErrorStats(regionErrLabel) + s.recordRPCAccessInfo(req, ctx, regionErrorToLogging(regionErr, regionErrLabel)) + } + // NOTE: Please add the region error handler in the same order of errorpb.Error. if notLeader := regionErr.GetNotLeader(); notLeader != nil { // Retry if error is `NotLeader`. - logutil.BgLogger().Debug("tikv reports `NotLeader` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `NotLeader` retry later", zap.String("notLeader", notLeader.String()), zap.String("ctx", ctx.String())) @@ -1681,7 +2075,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if regionErr.GetRecoveryInProgress() != nil { s.regionCache.InvalidateCachedRegion(ctx.Region) - logutil.BgLogger().Debug("tikv reports `RecoveryInProgress`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `RecoveryInProgress`", zap.Stringer("ctx", ctx)) err = bo.Backoff(retry.BoRegionRecoveryInProgress, errors.Errorf("region recovery in progress, ctx: %v", ctx)) if err != nil { return false, err @@ -1693,14 +2087,14 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // if a request meets the FlashbackInProgress error, it should stop retrying immediately // to avoid unnecessary backoff and potential unexpected data status to the user. if regionErr.GetFlashbackInProgress() != nil { - logutil.BgLogger().Debug("tikv reports `FlashbackInProgress`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `FlashbackInProgress`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) return false, errors.Errorf("region %d is in flashback progress", regionErr.GetFlashbackInProgress().GetRegionId()) } // This error means a second-phase flashback request is sent to a region that is not // prepared for the flashback before, it should stop retrying immediately to avoid // unnecessary backoff. if regionErr.GetFlashbackNotPrepared() != nil { - logutil.BgLogger().Debug("tikv reports `FlashbackNotPrepared`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `FlashbackNotPrepared`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) return false, errors.Errorf("region %d is not prepared for the flashback", regionErr.GetFlashbackNotPrepared().GetRegionId()) } @@ -1712,13 +2106,13 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext } if regionErr.GetKeyNotInRegion() != nil { - logutil.BgLogger().Debug("tikv reports `KeyNotInRegion`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `KeyNotInRegion`", zap.Stringer("req", req), zap.Stringer("ctx", ctx)) s.regionCache.InvalidateCachedRegion(ctx.Region) return false, nil } if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil { - logutil.BgLogger().Debug("tikv reports `EpochNotMatch` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `EpochNotMatch` retry later", zap.Stringer("EpochNotMatch", epochNotMatch), zap.Stringer("ctx", ctx)) retry, err := s.regionCache.OnRegionEpochNotMatch(bo, ctx, epochNotMatch.CurrentRegions) @@ -1728,8 +2122,14 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext return retry, err } - if regionErr.GetServerIsBusy() != nil { - logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later", + if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil { + if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") { + if s.replicaSelector.onReadReqConfigurableTimeout(req) { + return true, nil + } + } + logutil.Logger(bo.GetCtx()).Debug( + "tikv reports `ServerIsBusy` retry later", zap.String("reason", regionErr.GetServerIsBusy().GetReason()), zap.Stringer("ctx", ctx)) if s.replicaSelector.canFallback2Follower() { @@ -1751,7 +2151,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // We can't know whether the request is committed or not, so it's an undetermined error too, // but we don't handle it now. if regionErr.GetStaleCommand() != nil { - logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx)) if s.replicaSelector != nil { // Needn't backoff because the new leader should be elected soon // and the replicaSelector will try the next peer. @@ -1766,7 +2166,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil { // store not match - logutil.BgLogger().Debug("tikv reports `StoreNotMatch` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `StoreNotMatch` retry later", zap.Stringer("storeNotMatch", storeNotMatch), zap.Stringer("ctx", ctx)) ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh) @@ -1778,12 +2178,12 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext } if regionErr.GetRaftEntryTooLarge() != nil { - logutil.BgLogger().Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx)) return false, errors.New(regionErr.String()) } if regionErr.GetMaxTimestampNotSynced() != nil { - logutil.BgLogger().Debug("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx)) err = bo.Backoff(retry.BoMaxTsNotSynced, errors.Errorf("max timestamp not synced, ctx: %v", ctx)) if err != nil { return false, err @@ -1793,7 +2193,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // A read request may be sent to a peer which has not been initialized yet, we should retry in this case. if regionErr.GetRegionNotInitialized() != nil { - logutil.BgLogger().Debug("tikv reports `RegionNotInitialized` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `RegionNotInitialized` retry later", zap.Uint64("store-id", ctx.Store.storeID), zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()), zap.Stringer("ctx", ctx)) @@ -1806,7 +2206,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // The read-index can't be handled timely because the region is splitting or merging. if regionErr.GetReadIndexNotReady() != nil { - logutil.BgLogger().Debug("tikv reports `ReadIndexNotReady` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `ReadIndexNotReady` retry later", zap.Uint64("store-id", ctx.Store.storeID), zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()), zap.Stringer("ctx", ctx)) @@ -1819,7 +2219,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext } if regionErr.GetProposalInMergingMode() != nil { - logutil.BgLogger().Debug("tikv reports `ProposalInMergingMode`", zap.Stringer("ctx", ctx)) + logutil.Logger(bo.GetCtx()).Debug("tikv reports `ProposalInMergingMode`", zap.Stringer("ctx", ctx)) // The region is merging and it can't provide service until merge finished, so backoff. err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("region is merging, ctx: %v", ctx)) if err != nil { @@ -1832,7 +2232,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // This error is specific to stale read and the target replica is randomly selected. If the request is sent // to the leader, the data must be ready, so we don't backoff here. if regionErr.GetDataIsNotReady() != nil { - logutil.BgLogger().Warn("tikv reports `DataIsNotReady` retry later", + logutil.Logger(bo.GetCtx()).Debug("tikv reports `DataIsNotReady` retry later", zap.Uint64("store-id", ctx.Store.storeID), zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()), zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()), @@ -1849,12 +2249,17 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext return true, nil } - logutil.BgLogger().Debug("tikv reports region failed", + if isDeadlineExceeded(regionErr) && s.replicaSelector != nil && s.replicaSelector.onReadReqConfigurableTimeout(req) { + return true, nil + } + + logutil.Logger(bo.GetCtx()).Debug( + "tikv reports region failed", zap.Stringer("regionErr", regionErr), zap.Stringer("ctx", ctx)) if s.replicaSelector != nil { - if errLabel == "mismatch_peer_id" { + if regionErrLabel == "mismatch_peer_id" { s.replicaSelector.invalidateRegion() return false, nil } @@ -1921,3 +2326,51 @@ func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Res metrics.StaleReadRemoteInBytes.Add(float64(size)) } } + +func (s *replicaSelector) replicaType(rpcCtx *RPCContext) string { + leaderIdx := -1 + switch v := s.state.(type) { + case *accessKnownLeader: + return "leader" + case *tryFollower: + return "follower" + case *accessFollower: + leaderIdx = int(v.leaderIdx) + } + if leaderIdx > -1 && rpcCtx != nil && rpcCtx.Peer != nil { + for idx, replica := range s.replicas { + if replica.peer.Id == rpcCtx.Peer.Id { + if idx == leaderIdx { + return "leader" + } + return "follower" + } + } + } + return "unknown" +} + +func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCContext) { + var sb strings.Builder + sb.WriteString(req.InputRequestSource) + sb.WriteByte('-') + defer func() { + req.RequestSource = sb.String() + }() + + replicaType := s.replicaType(rpcCtx) + + if req.IsRetryRequest { + sb.WriteString("retry_") + sb.WriteString(req.ReadType) + sb.WriteByte('_') + sb.WriteString(replicaType) + return + } + if req.StaleRead { + req.ReadType = "stale_" + replicaType + } else { + req.ReadType = replicaType + } + sb.WriteString(req.ReadType) +} diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index c56c521ef..c7c755353 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -36,6 +36,7 @@ package locate import ( "context" + "fmt" "strconv" "sync/atomic" "testing" @@ -45,14 +46,17 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pkg/errors" "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" ) func TestRegionRequestToThreeStores(t *testing.T) { @@ -132,6 +136,82 @@ func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeader() { resp, err := s.regionRequestSender.SendReq(bo, req, loc.Region, time.Second) s.Nil(err) s.NotNil(resp) + s.Nil(resp.GetRegionError()) +} + +func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeaderErrorWithNewLeaderInfo() { + cnt := 0 + var location *KeyLocation + cli := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + cnt++ + switch cnt { + case 1: + region := s.cache.GetCachedRegionWithRLock(location.Region) + s.NotNil(region) + leaderPeerIdx := int(region.getStore().workTiKVIdx) + peers := region.meta.Peers + // return no leader with new leader info + response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{ + RegionId: req.RegionId, + Leader: peers[(leaderPeerIdx+1)%len(peers)], + }}, + }} + case 2: + response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + Value: []byte("a"), + }} + default: + return nil, fmt.Errorf("unexpected request") + } + return response, err + }} + + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + req.ReplicaReadType = kv.ReplicaReadLeader + var err error + location, err = s.cache.LocateKey(s.bo, []byte("a")) + s.Nil(err) + s.NotNil(location) + bo := retry.NewBackoffer(context.Background(), 1000) + resp, _, err := NewRegionRequestSender(s.cache, cli).SendReqCtx(bo, req, location.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + s.NotNil(resp) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + // It's unreasoneable to retry in upper layer, such as cop request, the upper layer will need to rebuild cop request and retry, there are some unnecessary overhead. + s.Equal(cnt, 2) + r := s.cache.GetCachedRegionWithRLock(location.Region) + s.True(r.isValid()) +} + +func (s *testRegionRequestToThreeStoresSuite) TestSliceIdentical() { + a := make([]int, 0) + b := a + s.True(sliceIdentical(a, b)) + // This is not guaranteed. + // b = make([]int, 0) + // s.False(sliceIdentical(a, b)) + + a = append(a, 1, 2, 3) + b = a + s.True(sliceIdentical(a, b)) + b = a[:2] + s.False(sliceIdentical(a, b)) + b = a[1:] + s.False(sliceIdentical(a, b)) + a = a[1:] + s.True(sliceIdentical(a, b)) + + a = nil + b = nil + + s.True(sliceIdentical(a, b)) + a = make([]int, 0) + s.False(sliceIdentical(a, b)) + a = append(a, 1) + s.False(sliceIdentical(a, b)) } func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore() (*Store, string) { @@ -656,7 +736,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { // Normal bo := retry.NewBackoffer(context.Background(), -1) sender := s.regionRequestSender - resp, err := sender.SendReq(bo, req, region.Region, time.Second) + resp, err := sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.True(bo.GetTotalBackoffTimes() == 0) @@ -665,7 +745,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { bo = retry.NewBackoffer(context.Background(), -1) s.cluster.ChangeLeader(s.regionID, s.peerIDs[1]) s.cluster.StopStore(s.storeIDs[0]) - resp, err = sender.SendReq(bo, req, region.Region, time.Second) + resp, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) @@ -674,8 +754,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable)) // Leader is updated because of send success, so no backoff. + reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) - resp, err = sender.SendReq(bo, req, region.Region, time.Second) + resp, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) @@ -977,9 +1058,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() regionStore := region.getStore() leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr s.NotEqual(leaderAddr, "") - for i := 0; i < 10; i++ { + for i := 0; i < 30; i++ { bo := retry.NewBackofferWithVars(context.Background(), 100, nil) - resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) @@ -1022,7 +1103,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() for i := 0; i < 100; i++ { bo := retry.NewBackofferWithVars(context.Background(), 1, nil) - resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) // since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0. @@ -1229,3 +1310,317 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { // `tryFollower` always try the local peer firstly s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } + +func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { + leaderAddr := "" + reqTargetAddrs := make(map[string]struct{}) + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() + bo := retry.NewBackoffer(context.Background(), 10000) + mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + reqTargetAddrs[addr] = struct{}{} + if req.Context.MaxExecutionDurationMs < 10 { + return nil, context.DeadlineExceeded + } + if addr != leaderAddr && !req.Context.ReplicaRead && !req.Context.StaleRead { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil + }} + getLocFn := func() *KeyLocation { + loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) + s.Nil(err) + region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region) + leaderStore, _, _, _ := region.WorkStorePeer(region.getStore()) + leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore) + s.Nil(err) + return loc + } + resetStats := func() { + reqTargetAddrs = make(map[string]struct{}) + s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() + } + + //Test different read type. + staleReadTypes := []bool{false, true} + replicaReadTypes := []kv.ReplicaReadType{kv.ReplicaReadLeader, kv.ReplicaReadFollower, kv.ReplicaReadMixed} + for _, staleRead := range staleReadTypes { + for _, tp := range replicaReadTypes { + log.Info("TestSendReqFirstTimeout", zap.Bool("stale-read", staleRead), zap.String("replica-read-type", tp.String())) + resetStats() + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + if staleRead { + req.EnableStaleRead() + } else { + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + } + loc := getLocFn() + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) + s.Nil(err) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(3), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 3 rpc + s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + // warn: must reset MaxExecutionDurationMs before retry. + resetStats() + if staleRead { + req.EnableStaleRead() + } else { + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + } + req.Context.MaxExecutionDurationMs = 0 + resp, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(1), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 1 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + } + } + + // Test for write request. + tf := func(s *Store, bo *retry.Backoffer) livenessState { + return reachable + } + s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + resetStats() + req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{}) + req.ReplicaReadType = kv.ReplicaReadLeader + loc := getLocFn() + bo = retry.NewBackoffer(context.Background(), 1000) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) + s.Nil(resp) + s.Equal(context.DeadlineExceeded, err) + backoffTimes := bo.GetBackoffTimes() + s.True(backoffTimes["tikvRPC"] > 0) // write request timeout won't do fast retry, so backoff times should be more than 0. +} + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeout() { + var ( + leaderAddr string + leaderLabel []*metapb.StoreLabel + ) + bo := retry.NewBackoffer(context.Background(), 10000) + mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if addr == leaderAddr { + return nil, context.DeadlineExceeded + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil + }} + s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) + s.regionRequestSender.Stats = NewRegionRequestRuntimeStats() + getLocFn := func() *KeyLocation { + loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) + s.Nil(err) + region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region) + leaderStore, _, _, _ := region.WorkStorePeer(region.getStore()) + leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore) + s.Nil(err) + leaderLabel = []*metapb.StoreLabel{{Key: "id", Value: strconv.FormatUint(leaderStore.StoreID(), 10)}} + return loc + } + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + req.EnableStaleRead() + loc := getLocFn() + var ops []StoreSelectorOption + ops = append(ops, WithMatchLabels(leaderLabel)) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats.RPCStats)) + s.Equal(int64(2), s.regionRequestSender.Stats.RPCStats[tikvrpc.CmdGet].Count) // 2 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. +} + +func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { + leaderStore, _ := s.loadAndGetLeaderStore() + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: []byte("key"), + }) + req.InputRequestSource = "test" + + setReadType := func(req *tikvrpc.Request, readType string) { + req.StaleRead = false + req.ReplicaRead = false + switch readType { + case "leader": + return + case "follower": + req.ReplicaRead = true + req.ReplicaReadType = kv.ReplicaReadFollower + case "stale_follower", "stale_leader": + req.EnableStaleRead() + default: + panic("unreachable") + } + } + + setTargetReplica := func(selector *replicaSelector, readType string) { + var leader bool + switch readType { + case "leader", "stale_leader": + leader = true + case "follower", "stale_follower": + leader = false + default: + panic("unreachable") + } + for idx, replica := range selector.replicas { + if replica.store.storeID == leaderStore.storeID && leader { + selector.targetIdx = AccessIndex(idx) + return + } + if replica.store.storeID != leaderStore.storeID && !leader { + selector.targetIdx = AccessIndex(idx) + return + } + } + panic("unreachable") + } + + firstReadReplicas := []string{"leader", "follower", "stale_follower", "stale_leader"} + retryReadReplicas := []string{"leader", "follower"} + for _, firstReplica := range firstReadReplicas { + for _, retryReplica := range retryReadReplicas { + bo := retry.NewBackoffer(context.Background(), -1) + req.IsRetryRequest = false + setReadType(req, firstReplica) + replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req) + s.Nil(err) + setTargetReplica(replicaSelector, firstReplica) + rpcCtx, err := replicaSelector.buildRPCContext(bo) + s.Nil(err) + replicaSelector.patchRequestSource(req, rpcCtx) + s.Equal("test-"+firstReplica, req.RequestSource) + + // retry + setReadType(req, retryReplica) + replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req) + s.Nil(err) + setTargetReplica(replicaSelector, retryReplica) + rpcCtx, err = replicaSelector.buildRPCContext(bo) + s.Nil(err) + req.IsRetryRequest = true + replicaSelector.patchRequestSource(req, rpcCtx) + s.Equal("test-retry_"+firstReplica+"_"+retryReplica, req.RequestSource) + } + } +} + +func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelectorExperimentalOptions() { + key := []byte("key") + bo := retry.NewBackoffer(context.Background(), -1) + + loc, err := s.cache.LocateKey(bo, key) + s.Require().NoError(err) + + region := s.cache.GetCachedRegionWithRLock(loc.Region) + leader, _, _, _ := region.WorkStorePeer(region.getStore()) + follower, _, _, _ := region.FollowerStorePeer(region.getStore(), 0, &storeSelectorOp{}) + + newStaleReadReq := func() *tikvrpc.Request { + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadMixed, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + return req + } + + errRespTimeout := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{Reason: "mock: deadline is exceeded"}}}} + errRespDataNotReady := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}} + + mockLeaderTimeout := func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr == leader.addr { + return errRespTimeout, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + } + + mockLeaderSlow := func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr == leader.addr && timeout <= 300*time.Millisecond { + return errRespTimeout, nil + } + if addr == follower.addr { + if req.StaleRead { + return errRespDataNotReady, nil + } else { + return errRespTimeout, nil + } + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil + } + + s.Run("PreventRetryFollowerOn", func() { + // 1. access local leader -> timeout + // 2. cannot retry leader (since timeout), cannot fallback to tryFollower -> pseudo error + defer SetReplicaSelectorExperimentalOptions(GetReplicaSelectorExperimentalOptions()) + var opts ReplicaSelectorExperimentalOptions + opts.StaleRead.PreventRetryFollower = true + SetReplicaSelectorExperimentalOptions(opts) + + s.regionRequestSender.client = &fnClient{fn: mockLeaderTimeout} + s.cache.LocateKey(bo, key) + + resp, _, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(leader.labels)) + s.Require().NoError(err) + regionErr, err := resp.GetRegionError() + s.Require().NoError(err) + s.Require().NotNil(regionErr) + s.Require().True(IsFakeRegionError(regionErr)) + }) + + s.Run("PreventRetryFollowerOff", func() { + // 1. access local leader -> timeout + // 2. cannot retry leader (since timeout), fallback to tryFollower, access follower -> ok + s.regionRequestSender.client = &fnClient{fn: mockLeaderTimeout} + s.cache.LocateKey(bo, key) + + resp, rpcCtx, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(leader.labels)) + s.Require().NoError(err) + s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + }) + + s.Run("RetryLeaderTimeoutFactorOn", func() { + // 1. access local follower -> data is not ready + // 2. retry leader with timeout*2 -> ok + defer SetReplicaSelectorExperimentalOptions(GetReplicaSelectorExperimentalOptions()) + var opts ReplicaSelectorExperimentalOptions + opts.StaleRead.RetryLeaderTimeoutFactor = 2 + SetReplicaSelectorExperimentalOptions(opts) + + s.regionRequestSender.client = &fnClient{fn: mockLeaderSlow} + s.cache.LocateKey(bo, key) + + resp, _, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, 200*time.Millisecond, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Require().NoError(err) + s.Require().Equal(leader.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + }) + + s.Run("RetryLeaderTimeoutFactorOff", func() { + // 1. access local follower -> data is not ready + // 2. retry leader with timeout -> timeout + // 3. fallback to tryFollower, access local follower with replica read -> timeout + // 4. access the other follower -> ok + s.regionRequestSender.client = &fnClient{fn: mockLeaderSlow} + s.cache.LocateKey(bo, key) + + resp, rpcCtx, err := s.regionRequestSender.SendReqCtx(bo, newStaleReadReq(), loc.Region, 200*time.Millisecond, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Require().NoError(err) + s.Require().Equal(rpcCtx.Addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Require().NotEqual(leader.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + s.Require().NotEqual(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) + }) +} diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 3e187cf80..484daec66 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -37,8 +37,11 @@ package locate import ( "context" "fmt" + "math/rand" "net" + "strconv" "sync" + "sync/atomic" "testing" "time" "unsafe" @@ -50,8 +53,11 @@ import ( "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/internal/client" + "github.com/tikv/client-go/v2/internal/client/mock_server" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/tikvrpc" @@ -659,3 +665,144 @@ func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() { s.Nil(regionErr) s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) } + +func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() { + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.MaxBatchSize = 0 + })() + + server, port := mock_server.StartMockTikvService() + s.True(port > 0) + server.SetMetaChecker(func(ctx context.Context) error { + return context.DeadlineExceeded + }) + rpcClient := client.NewRPCClient() + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) + }} + defer func() { + rpcClient.Close() + server.Stop() + }() + + bo := retry.NewBackofferWithVars(context.Background(), 2000, nil) + region, err := s.cache.LocateRegionByID(bo, s.region) + s.Nil(err) + s.NotNil(region) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1}) + resp, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Millisecond*10) + s.Nil(err) + s.NotNil(resp) + regionErr, _ := resp.GetRegionError() + s.True(IsFakeRegionError(regionErr)) + s.Equal(0, bo.GetTotalBackoffTimes()) // use kv read timeout will do fast retry, so backoff times should be 0. +} + +func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { + // This test should use `go test -race` to run. + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.MaxBatchSize = 128 + })() + + server, port := mock_server.StartMockTikvService() + s.True(port > 0) + rpcClient := client.NewRPCClient() + fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) + }} + tf := func(s *Store, bo *retry.Backoffer) livenessState { + return reachable + } + + defer func() { + rpcClient.Close() + server.Stop() + }() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + ctx, cancel := context.WithCancel(context.Background()) + bo := retry.NewBackofferWithVars(ctx, int(client.ReadTimeoutShort.Milliseconds()), nil) + region, err := s.cache.LocateRegionByID(bo, s.region) + s.Nil(err) + s.NotNil(region) + go func() { + // mock for kill query execution or timeout. + time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1)) + cancel() + }() + req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1}) + regionRequestSender := NewRegionRequestSender(s.cache, fnClient) + regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) + } + }() + } + wg.Wait() + // batchSendLoop should not panic. + s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicCounter), int64(0)) +} + +func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() { + reqStats := NewRegionRequestRuntimeStats() + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Second*2) + reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Millisecond*200) + reqStats.RecordRPCErrorStats("context canceled") + reqStats.RecordRPCErrorStats("context canceled") + reqStats.RecordRPCErrorStats("region_not_found") + reqStats.Merge(NewRegionRequestRuntimeStats()) + reqStats2 := NewRegionRequestRuntimeStats() + reqStats2.Merge(reqStats.Clone()) + expecteds := []string{ + // Since map iteration order is random, we need to check all possible orders. + "Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{region_not_found:1, context canceled:2}", + "Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{context canceled:2, region_not_found:1}", + "Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{context canceled:2, region_not_found:1}", + "Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{region_not_found:1, context canceled:2}", + } + s.Contains(expecteds, reqStats.String()) + s.Contains(expecteds, reqStats2.String()) + for i := 0; i < 50; i++ { + reqStats.RecordRPCErrorStats("err_" + strconv.Itoa(i)) + } + s.Regexp("{.*err_.*:1.*, other_error:36}", reqStats.RequestErrorStats.String()) + s.Regexp(".*num_rpc.*total_time.*, rpc_errors:{.*err.*, other_error:36}", reqStats.String()) + + access := &ReplicaAccessStats{} + access.recordReplicaAccessInfo(true, false, 1, 2, "data_not_ready") + access.recordReplicaAccessInfo(false, false, 3, 4, "not_leader") + access.recordReplicaAccessInfo(false, true, 5, 6, "server_is_Busy") + s.Equal("{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}", access.String()) + for i := 0; i < 20; i++ { + access.recordReplicaAccessInfo(false, false, 5+uint64(i)%2, 6, "server_is_Busy") + } + expecteds = []string{ + // Since map iteration order is random, we need to check all possible orders. + "{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:5, error_stats:{server_is_Busy:9}}, {peer:6, error_stats:{server_is_Busy:9}}}", + "{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:6, error_stats:{server_is_Busy:9}}, {peer:5, error_stats:{server_is_Busy:9}}}", + } + s.Contains(expecteds, access.String()) +} + +type noCauseError struct { + error +} + +func (_ noCauseError) Cause() error { + return nil +} + +func TestGetErrMsg(t *testing.T) { + err := noCauseError{error: errors.New("no cause err")} + require.Equal(t, nil, errors.Cause(err)) + require.Panicsf(t, func() { + _ = errors.Cause(err).Error() + }, "should panic") + require.Equal(t, "no cause err", getErrMsg(err)) +} diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go index bdefc7993..a2723e05b 100644 --- a/internal/retry/backoff.go +++ b/internal/retry/backoff.go @@ -133,7 +133,7 @@ func (b *Backoffer) BackoffWithMaxSleepTxnLockFast(maxSleepMs int, err error) er // and never sleep more than maxSleepMs for each sleep. func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err error) error { if strings.Contains(err.Error(), tikverr.MismatchClusterID) { - logutil.BgLogger().Fatal("critical error", zap.Error(err)) + logutil.Logger(b.ctx).Fatal("critical error", zap.Error(err)) } select { case <-b.ctx.Done(): @@ -143,7 +143,13 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e if b.noop { return err } - if b.maxSleep > 0 && (b.totalSleep-b.excludedSleep) >= b.maxSleep { + maxBackoffTimeExceeded := (b.totalSleep - b.excludedSleep) >= b.maxSleep + maxExcludedTimeExceeded := false + if maxLimit, ok := isSleepExcluded[cfg.name]; ok { + maxExcludedTimeExceeded = b.excludedSleep >= maxLimit && b.excludedSleep >= b.maxSleep + } + maxTimeExceeded := maxBackoffTimeExceeded || maxExcludedTimeExceeded + if b.maxSleep > 0 && maxTimeExceeded { longestSleepCfg, longestSleepTime := b.longestSleepCfg() errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", cfg.String(), b.maxSleep) for i, err := range b.errors { @@ -163,13 +169,14 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e backoffDetail.WriteString(":") backoffDetail.WriteString(strconv.Itoa(times)) } - errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v", totalTimes, backoffDetail.String()) + errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v, maxBackoffTimeExceeded: %v, maxExcludedTimeExceeded: %v", + totalTimes, backoffDetail.String(), maxBackoffTimeExceeded, maxExcludedTimeExceeded) returnedErr := err if longestSleepCfg != nil { errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime) returnedErr = longestSleepCfg.err } - logutil.BgLogger().Warn(errMsg) + logutil.Logger(b.ctx).Warn(errMsg) // Use the backoff type that contributes most to the timeout to generate a MySQL error. return errors.WithStack(returnedErr) } diff --git a/internal/retry/backoff_test.go b/internal/retry/backoff_test.go index a4c8dfe42..895c77e86 100644 --- a/internal/retry/backoff_test.go +++ b/internal/retry/backoff_test.go @@ -72,7 +72,8 @@ func TestBackoffErrorType(t *testing.T) { err = b.Backoff(BoTxnNotFound, errors.New("txn not found")) if err != nil { // Next backoff should return error of backoff that sleeps for longest time. - assert.ErrorIs(t, err, BoTxnNotFound.err) + cfg, _ := b.longestSleepCfg() + assert.ErrorIs(t, err, cfg.err) return } } @@ -95,3 +96,15 @@ func TestBackoffDeepCopy(t *testing.T) { assert.ErrorIs(t, err, BoMaxDataNotReady.err) } } + +func TestBackoffWithMaxExcludedExceed(t *testing.T) { + setBackoffExcluded(BoTiKVServerBusy.name, 1) + b := NewBackofferWithVars(context.TODO(), 1, nil) + err := b.Backoff(BoTiKVServerBusy, errors.New("server is busy")) + assert.Nil(t, err) + + // As the total excluded sleep is greater than the max limited value, error should be returned. + err = b.Backoff(BoTiKVServerBusy, errors.New("server is busy")) + assert.NotNil(t, err) + assert.Greater(t, b.excludedSleep, b.maxSleep) +} diff --git a/internal/retry/config.go b/internal/retry/config.go index 19632d9f4..16032edda 100644 --- a/internal/retry/config.go +++ b/internal/retry/config.go @@ -129,11 +129,18 @@ var ( BoTxnLockFast = NewConfig(txnLockFastName, &metrics.BackoffHistogramLockFast, NewBackoffFnCfg(2, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) ) -var isSleepExcluded = map[string]struct{}{ - BoTiKVServerBusy.name: {}, +var isSleepExcluded = map[string]int{ + BoTiKVServerBusy.name: 600000, // The max excluded limit is 10min. // add BoTiFlashServerBusy if appropriate } +// setBackoffExcluded is used for test only. +func setBackoffExcluded(name string, maxVal int) { + if _, ok := isSleepExcluded[name]; ok { + isSleepExcluded[name] = maxVal + } +} + const ( // NoJitter makes the backoff sequence strict exponential. NoJitter = 1 + iota diff --git a/metrics/metrics.go b/metrics/metrics.go index 5db99cd33..3d7eb2526 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -50,6 +50,7 @@ var ( TiKVCoprocessorHistogram *prometheus.HistogramVec TiKVLockResolverCounter *prometheus.CounterVec TiKVRegionErrorCounter *prometheus.CounterVec + TiKVRPCErrorCounter *prometheus.CounterVec TiKVTxnWriteKVCountHistogram prometheus.Histogram TiKVTxnWriteSizeHistogram prometheus.Histogram TiKVRawkvCmdHistogram *prometheus.HistogramVec @@ -198,7 +199,15 @@ func initMetrics(namespace, subsystem string) { Subsystem: subsystem, Name: "region_err_total", Help: "Counter of region errors.", - }, []string{LblType}) + }, []string{LblType, LblStore}) + + TiKVRPCErrorCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rpc_err_total", + Help: "Counter of rpc errors.", + }, []string{LblType, LblStore}) TiKVTxnWriteKVCountHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ @@ -641,6 +650,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVCoprocessorHistogram) prometheus.MustRegister(TiKVLockResolverCounter) prometheus.MustRegister(TiKVRegionErrorCounter) + prometheus.MustRegister(TiKVRPCErrorCounter) prometheus.MustRegister(TiKVTxnWriteKVCountHistogram) prometheus.MustRegister(TiKVTxnWriteSizeHistogram) prometheus.MustRegister(TiKVRawkvCmdHistogram) diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index f5eefaa2d..e9e61db0f 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -41,14 +41,17 @@ import ( "hash/crc64" "testing" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/tikv" ) func TestRawKV(t *testing.T) { + tikv.EnableFailpoints() suite.Run(t, new(testRawkvSuite)) } @@ -77,9 +80,11 @@ func (s *testRawkvSuite) SetupTest() { s.peer1 = peerIDs[0] s.peer2 = peerIDs[1] s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) + s.Nil(failpoint.Enable("tikvclient/injectReResolveInterval", `return("1s")`)) } func (s *testRawkvSuite) TearDownTest() { + s.Nil(failpoint.Disable("tikvclient/injectReResolveInterval")) s.mvccStore.Close() } @@ -110,6 +115,9 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + getVal, err := client.Get(context.Background(), testKey) s.Nil(err) @@ -172,6 +180,9 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + getVal, err := client.Get(context.Background(), testKey) s.Nil(err) s.Equal(getVal, testValue) @@ -200,6 +211,9 @@ func (s *testRawkvSuite) TestReplaceStore() { s.cluster.RemovePeer(s.region1, s.peer1) s.cluster.ChangeLeader(s.region1, peer3) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + err = client.Put(context.Background(), testKey, testValue) s.Nil(err) } @@ -234,6 +248,9 @@ func (s *testRawkvSuite) TestColumnFamilyForClient() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test get client.SetColumnFamily(cf1) getVal, err := client.Get(context.Background(), testKeyCf1) @@ -303,6 +320,9 @@ func (s *testRawkvSuite) TestColumnFamilyForOptions() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test get getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1)) s.Nil(err) @@ -370,6 +390,9 @@ func (s *testRawkvSuite) TestBatch() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test BatchGet returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf)) s.Nil(err) @@ -426,6 +449,9 @@ func (s *testRawkvSuite) TestScan() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test scan startKey, endKey := []byte("key1"), []byte("keyz") limit := 3 @@ -498,6 +524,9 @@ func (s *testRawkvSuite) TestDeleteRange() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test DeleteRange startKey, endKey := []byte("key3"), []byte(nil) err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf)) @@ -535,6 +564,9 @@ func (s *testRawkvSuite) TestCompareAndSwap() { s.cluster.ChangeLeader(s.region1, s.peer2) s.cluster.RemovePeer(s.region1, s.peer1) + s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`)) + defer failpoint.Disable("tikvclient/injectLiveness") + // test CompareAndSwap for false atomic _, _, err = client.CompareAndSwap( context.Background(), diff --git a/tikv/region.go b/tikv/region.go index 4fd0a2564..567595ade 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -79,6 +79,9 @@ type RPCCancellerCtxKey = locate.RPCCancellerCtxKey // simply return the error to caller. type RegionRequestSender = locate.RegionRequestSender +// ReplicaSelectorExperimentalOptions defines experimental options of replica selector. +type ReplicaSelectorExperimentalOptions = locate.ReplicaSelectorExperimentalOptions + // StoreSelectorOption configures storeSelectorOp. type StoreSelectorOption = locate.StoreSelectorOption @@ -108,11 +111,6 @@ var ( ModeTxn Mode = client.ModeTxn ) -// RecordRegionRequestRuntimeStats records request runtime stats. -func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { - locate.RecordRegionRequestRuntimeStats(stats, cmd, d) -} - // Store contains a kv process's address. type Store = locate.Store @@ -152,13 +150,23 @@ func StoreShuttingDown(v uint32) { locate.StoreShuttingDown(v) } +// SetReplicaSelectorExperimentalOptions sets experimental options of replica selector. +func SetReplicaSelectorExperimentalOptions(opts ReplicaSelectorExperimentalOptions) { + locate.SetReplicaSelectorExperimentalOptions(opts) +} + +// GetReplicaSelectorExperimentalOptions gets experimental options of replica selector. +func GetReplicaSelectorExperimentalOptions() (opts ReplicaSelectorExperimentalOptions) { + return locate.GetReplicaSelectorExperimentalOptions() +} + // WithMatchLabels indicates selecting stores with matched labels func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption { return locate.WithMatchLabels(labels) } // NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. -func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { +func NewRegionRequestRuntimeStats() *RegionRequestRuntimeStats { return locate.NewRegionRequestRuntimeStats() } diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 33a4e05dd..6e1d2373d 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -229,6 +229,10 @@ type Request struct { // If it's not empty, the store which receive the request will forward it to // the forwarded host. It's useful when network partition occurs. ForwardedHost string + // The initial read type, note this will be assigned in the first try, no need to set it outside the client. + ReadType string + // InputRequestSource is the input source of the request, if it's not empty, the final RequestSource sent to store will be attached with the retry info. + InputRequestSource string } // NewRequest returns new kv rpc request. @@ -695,13 +699,20 @@ type MPPStreamResponse struct { // SetContext set the Context field for the given req to the specified ctx. func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { - ctx := &req.Context if region != nil { - ctx.RegionId = region.Id - ctx.RegionEpoch = region.RegionEpoch + req.Context.RegionId = region.Id + req.Context.RegionEpoch = region.RegionEpoch } - ctx.Peer = peer + req.Context.Peer = peer + // Shallow copy the context to avoid concurrent modification. + return AttachContext(req, req.Context) +} + +// AttachContext sets the request context to the request, +// Parameter `rpcCtx` use `kvrpcpb.Context` instead of `*kvrpcpb.Context` to avoid concurrent modification by shallow copy. +func AttachContext(req *Request, rpcCtx kvrpcpb.Context) error { + ctx := &rpcCtx switch req.Type { case CmdGet: req.Get().Context = ctx diff --git a/txnkv/txnsnapshot/client_helper.go b/txnkv/txnsnapshot/client_helper.go index 34a6636d5..834a04f18 100644 --- a/txnkv/txnsnapshot/client_helper.go +++ b/txnkv/txnsnapshot/client_helper.go @@ -62,7 +62,7 @@ type ClientHelper struct { committedLocks *util.TSSet client client.Client resolveLite bool - locate.RegionRequestRuntimeStats + Stats *locate.RegionRequestRuntimeStats } // NewClientHelper creates a helper instance. @@ -81,7 +81,7 @@ func NewClientHelper(store kvstore, resolvedLocks *util.TSSet, committedLocks *u func (ch *ClientHelper) ResolveLocksWithOpts(bo *retry.Backoffer, opts txnlock.ResolveLocksOptions) (txnlock.ResolveLockResult, error) { if ch.Stats != nil { defer func(start time.Time) { - locate.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) + ch.Stats.RecordRPCRuntimeStats(tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } opts.ForRead = true @@ -103,7 +103,7 @@ func (ch *ClientHelper) ResolveLocksWithOpts(bo *retry.Backoffer, opts txnlock.R func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) { if ch.Stats != nil { defer func(start time.Time) { - locate.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) + ch.Stats.RecordRPCRuntimeStats(tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } msBeforeTxnExpired, resolvedLocks, committedLocks, err := ch.lockResolver.ResolveLocksForRead(bo, callerStartTS, locks, ch.resolveLite) diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 2e24d52fc..25579de4c 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -202,6 +202,8 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { var loc *locate.KeyLocation var resolvingRecordToken *int var err error + // the states in request need to keep when retry request. + var readType string for { if !s.reverse { loc, err = s.snapshot.store.GetRegionCache().LocateKey(bo, s.nextStartKey) @@ -251,8 +253,12 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { TaskId: s.snapshot.mu.taskID, ResourceGroupTag: s.snapshot.mu.resourceGroupTag, IsolationLevel: s.snapshot.isolationLevel.ToPB(), - RequestSource: s.snapshot.GetRequestSource(), }) + if readType != "" { + req.ReadType = readType + req.IsRetryRequest = true + } + req.InputRequestSource = s.snapshot.GetRequestSource() if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil { s.snapshot.mu.resourceGroupTagger(req) } @@ -265,6 +271,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { if err != nil { return err } + readType = req.ReadType if regionErr != nil { logutil.BgLogger().Debug("scanner getData failed", zap.Stringer("regionErr", regionErr)) diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 76b5aed5a..fbb91bf21 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -118,6 +118,7 @@ type KVSnapshot struct { resolvedLocks util.TSSet committedLocks util.TSSet scanBatchSize int + readTimeout time.Duration // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, @@ -360,7 +361,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, false) s.mu.RLock() if s.mu.stats != nil { - cli.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats) + cli.Stats = locate.NewRegionRequestRuntimeStats() defer func() { s.mergeRegionRequestStats(cli.Stats) }() @@ -370,6 +371,9 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, pending := batch.keys var resolvingRecordToken *int + useConfigurableKVTimeout := true + // the states in request need to keep when retry request. + var readType string for { s.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{ @@ -381,8 +385,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, TaskId: s.mu.taskID, ResourceGroupTag: s.mu.resourceGroupTag, IsolationLevel: s.isolationLevel.ToPB(), - RequestSource: s.GetRequestSource(), }) + req.InputRequestSource = s.GetRequestSource() + if readType != "" { + req.ReadType = readType + req.IsRetryRequest = true + } if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) } @@ -395,6 +403,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, if isStaleness { req.EnableStaleRead() } + timeout := client.ReadTimeoutMedium + if useConfigurableKVTimeout && s.readTimeout > 0 { + useConfigurableKVTimeout = false + timeout = s.readTimeout + } + req.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) ops := make([]locate.StoreSelectorOption, 0, 2) if len(matchStoreLabels) > 0 { ops = append(ops, locate.WithMatchLabels(matchStoreLabels)) @@ -406,7 +420,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, } req.ReplicaReadType = readType } - resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, client.ReadTimeoutMedium, tikvrpc.TiKV, "", ops...) + resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, timeout, tikvrpc.TiKV, "", ops...) if err != nil { return err } @@ -414,6 +428,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, if err != nil { return err } + readType = req.ReadType if regionErr != nil { // For other region error and the fake region error, backoff because // there's something wrong. @@ -572,7 +587,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, true) if s.mu.stats != nil { - cli.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats) + cli.Stats = locate.NewRegionRequestRuntimeStats() defer func() { s.mergeRegionRequestStats(cli.Stats) }() @@ -587,8 +602,8 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] TaskId: s.mu.taskID, ResourceGroupTag: s.mu.resourceGroupTag, IsolationLevel: s.isolationLevel.ToPB(), - RequestSource: s.GetRequestSource(), }) + req.InputRequestSource = s.GetRequestSource() if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) } @@ -616,13 +631,20 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] var firstLock *txnlock.Lock var resolvingRecordToken *int + useConfigurableKVTimeout := true for { util.EvalFailpoint("beforeSendPointGet") loc, err := s.store.GetRegionCache().LocateKey(bo, k) if err != nil { return nil, err } - resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV, "", ops...) + timeout := client.ReadTimeoutShort + if useConfigurableKVTimeout && s.readTimeout > 0 { + useConfigurableKVTimeout = false + timeout = s.readTimeout + } + req.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) + resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, timeout, tikvrpc.TiKV, "", ops...) if err != nil { return nil, err } @@ -902,25 +924,27 @@ func (s *KVSnapshot) recordBackoffInfo(bo *retry.Backoffer) { } } -func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) { +func (s *KVSnapshot) mergeRegionRequestStats(rpcStats *locate.RegionRequestRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() if s.mu.stats == nil { return } - if s.mu.stats.rpcStats.Stats == nil { - s.mu.stats.rpcStats.Stats = stats + if s.mu.stats.rpcStats == nil { + s.mu.stats.rpcStats = rpcStats return } - for k, v := range stats { - stat, ok := s.mu.stats.rpcStats.Stats[k] - if !ok { - s.mu.stats.rpcStats.Stats[k] = v - continue - } - stat.Count += v.Count - stat.Consume += v.Consume - } + s.mu.stats.rpcStats.Merge(rpcStats) +} + +// SetKVReadTimeout sets timeout for individual KV read operations under this snapshot +func (s *KVSnapshot) SetKVReadTimeout(readTimeout time.Duration) { + s.readTimeout = readTimeout +} + +// GetKVReadTimeout returns timeout for individual KV read operations under this snapshot or 0 if timeout is not set +func (s *KVSnapshot) GetKVReadTimeout() time.Duration { + return s.readTimeout } func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail { @@ -934,7 +958,7 @@ func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail { // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { - rpcStats locate.RegionRequestRuntimeStats + rpcStats *locate.RegionRequestRuntimeStats backoffSleepMS map[string]int backoffTimes map[string]int scanDetail *util.ScanDetail @@ -944,11 +968,9 @@ type SnapshotRuntimeStats struct { // Clone implements the RuntimeStats interface. func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { - newRs := SnapshotRuntimeStats{rpcStats: locate.NewRegionRequestRuntimeStats()} - if rs.rpcStats.Stats != nil { - for k, v := range rs.rpcStats.Stats { - newRs.rpcStats.Stats[k] = v - } + newRs := SnapshotRuntimeStats{} + if rs.rpcStats != nil { + newRs.rpcStats = rs.rpcStats.Clone() } if len(rs.backoffSleepMS) > 0 { newRs.backoffSleepMS = make(map[string]int) @@ -978,9 +1000,9 @@ func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { // Merge implements the RuntimeStats interface. func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) { - if other.rpcStats.Stats != nil { - if rs.rpcStats.Stats == nil { - rs.rpcStats.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats, len(other.rpcStats.Stats)) + if other.rpcStats != nil { + if rs.rpcStats == nil { + rs.rpcStats = locate.NewRegionRequestRuntimeStats() } rs.rpcStats.Merge(other.rpcStats) } @@ -1003,7 +1025,9 @@ func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) { // String implements fmt.Stringer interface. func (rs *SnapshotRuntimeStats) String() string { var buf bytes.Buffer - buf.WriteString(rs.rpcStats.String()) + if rs.rpcStats != nil { + buf.WriteString(rs.rpcStats.String()) + } for k, v := range rs.backoffTimes { if buf.Len() > 0 { buf.WriteByte(',') diff --git a/txnkv/txnsnapshot/test_probe.go b/txnkv/txnsnapshot/test_probe.go index e1d4d9310..521d6498d 100644 --- a/txnkv/txnsnapshot/test_probe.go +++ b/txnkv/txnsnapshot/test_probe.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/retry" - "github.com/tikv/client-go/v2/tikvrpc" ) // SnapshotProbe exposes some snapshot utilities for testing purpose. @@ -27,8 +26,8 @@ type SnapshotProbe struct { } // MergeRegionRequestStats merges RPC runtime stats into snapshot's stats. -func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.RPCRuntimeStats) { - s.mergeRegionRequestStats(stats) +func (s SnapshotProbe) MergeRegionRequestStats(rpcStats *locate.RegionRequestRuntimeStats) { + s.mergeRegionRequestStats(rpcStats) } // RecordBackoffInfo records backoff stats into snapshot's stats. diff --git a/util/misc.go b/util/misc.go index bd3e2b779..e324bf797 100644 --- a/util/misc.go +++ b/util/misc.go @@ -89,7 +89,7 @@ func WithRecovery(exec func(), recoverFn func(r interface{})) { } if r != nil { logutil.BgLogger().Error("panic in the recoverable goroutine", - zap.Reflect("r", r), + zap.Any("r", r), zap.Stack("stack trace")) } }()