diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index a05627f10..c6762ff40 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -12,31 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -// NOTE: The code in this file is based on code from the -// TiDB project, licensed under the Apache License v 2.0 -// -// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/prewrite_test.go -// - -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package tikv_test import ( "context" "fmt" + "math" "strings" "sync/atomic" "testing" @@ -67,12 +48,15 @@ type apiTestSuite struct { } func (s *apiTestSuite) SetupTest() { + require := s.Require() addrs := strings.Split(*pdAddrs, ",") pdClient, err := pd.NewClient(addrs, pd.SecurityOption{}) - s.Require().Nil(err) + require.NoError(err) rpcClient := tikv.NewRPCClient() + require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) // Set PD HTTP client. store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs)) + require.NoError(err) s.store = store storeID := uint64(1) s.store.GetRegionCache().SetRegionCacheStore(storeID, tikvrpc.TiKV, 1, nil) @@ -85,6 +69,18 @@ func (s *apiTestSuite) storeAddr(id uint64) string { type storeSafeTsMockClient struct { tikv.Client requestCount int32 + kvSafeTS uint64 +} + +func newStoreSafeTsMockClient(client tikv.Client) storeSafeTsMockClient { + return storeSafeTsMockClient{ + Client: client, + kvSafeTS: 150, // Set a default value. + } +} + +func (c *storeSafeTsMockClient) SetKVSafeTS(ts uint64) { + c.kvSafeTS = ts } func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { @@ -92,9 +88,9 @@ func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, re return c.Client.SendRequest(ctx, addr, req, timeout) } atomic.AddInt32(&c.requestCount, 1) - resp := &tikvrpc.Response{} - resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 150} - return resp, nil + return &tikvrpc.Response{ + Resp: &kvrpcpb.StoreSafeTSResponse{SafeTs: c.kvSafeTS}, + }, nil } func (c *storeSafeTsMockClient) Close() error { @@ -105,66 +101,42 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error { return c.Client.CloseAddr(addr) } +func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) { + s.Eventually(func() bool { + return s.store.GetMinSafeTS(txnScope) == ts + }, time.Second, 200*time.Millisecond) +} + func (s *apiTestSuite) TestGetClusterMinResolvedTS() { util.EnableFailpoints() // Try to get the minimum resolved timestamp of the cluster from PD. require := s.Require() - require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) - mockClient := storeSafeTsMockClient{ - Client: s.store.GetTiKVClient(), - } + mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) s.store.SetTiKVClient(&mockClient) - var retryCount int - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { - time.Sleep(2 * time.Second) - if retryCount > 5 { - break - } - retryCount++ - } + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) + s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Try to get the minimum resolved timestamp of the store from TiKV. - require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`)) - defer func() { - s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) - }() - retryCount = 0 - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 { - time.Sleep(2 * time.Second) - if retryCount > 5 { - break - } - retryCount++ - } + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) + s.waitForMinSafeTS(oracle.GlobalTxnScope, 150) require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { util.EnableFailpoints() // Try to get the minimum resolved timestamp of the cluster from PD. require := s.Require() - require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`)) - mockClient := storeSafeTsMockClient{ - Client: s.store.GetTiKVClient(), - } + mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) s.store.SetTiKVClient(&mockClient) - var retryCount int - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { - time.Sleep(2 * time.Second) - if retryCount > 5 { - break - } - retryCount++ - } + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) + s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) - defer func() { - s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) - }() // Set DC label for store 1. dcLabel := "testDC" @@ -182,21 +154,47 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { s.store.GetRegionCache().SetRegionCacheStore(1, tikvrpc.TiKV, 1, labels) // Try to get the minimum resolved timestamp of the store from TiKV. - retryCount = 0 - for s.store.GetMinSafeTS(dcLabel) != 150 { - time.Sleep(2 * time.Second) - if retryCount > 5 { - break - } - retryCount++ - } + s.waitForMinSafeTS(dcLabel, 150) require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) +} + +func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { + util.EnableFailpoints() + require := s.Require() + mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) + s.store.SetTiKVClient(&mockClient) + + // Make sure the store's min resolved ts is not initialized. + mockClient.SetKVSafeTS(0) + // Try to get the minimum resolved timestamp of the cluster from TiKV. + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) + s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64) + // Make sure the store's min resolved ts is not initialized. + require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) + + // Try to get the minimum resolved timestamp of the cluster from PD. + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) + s.waitForMinSafeTS(oracle.GlobalTxnScope, 100) + // Make sure the store's min resolved ts is not regarded as MaxUint64. + require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) + + // Fallback to KV Request when PD server not support get min resolved ts. + require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) + mockClient.SetKVSafeTS(150) + s.waitForMinSafeTS(oracle.GlobalTxnScope, 150) + // Make sure the minSafeTS can advance. + require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) + require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) } func (s *apiTestSuite) TearDownTest() { if s.store != nil { - s.Require().Nil(s.store.Close()) + s.Require().NoError(s.store.Close()) } + s.Require().NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater")) } diff --git a/internal/client/client_fail_test.go b/internal/client/client_fail_test.go index e7d413970..1ed6b0ac6 100644 --- a/internal/client/client_fail_test.go +++ b/internal/client/client_fail_test.go @@ -46,7 +46,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/config" - "github.com/tikv/client-go/v2/internal/client/mock_server" + "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/tikvrpc" ) @@ -54,7 +54,7 @@ func TestPanicInRecvLoop(t *testing.T) { require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`)) require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`)) - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) defer server.Stop() @@ -82,7 +82,7 @@ func TestPanicInRecvLoop(t *testing.T) { } func TestRecvErrorInMultipleRecvLoops(t *testing.T) { - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) defer server.Stop() addr := server.Addr() diff --git a/internal/client/client_test.go b/internal/client/client_test.go index c01f753d4..b1bc3ec31 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -55,7 +55,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/config" - "github.com/tikv/client-go/v2/internal/client/mock_server" + "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/tikvrpc" "go.uber.org/zap" @@ -117,7 +117,7 @@ func TestCancelTimeoutRetErr(t *testing.T) { } func TestSendWhenReconnect(t *testing.T) { - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) rpcClient := NewRPCClient() @@ -241,7 +241,7 @@ func TestCollapseResolveLock(t *testing.T) { } func TestForwardMetadataByUnaryCall(t *testing.T) { - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) defer server.Stop() addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) @@ -309,7 +309,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) { } func TestForwardMetadataByBatchCommands(t *testing.T) { - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) defer server.Stop() addr := server.Addr() @@ -642,7 +642,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { conf.TiKVClient.MaxBatchSize = 128 })() - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) require.True(t, server.IsRunning()) addr := server.Addr() diff --git a/internal/client/mock_server/mock_tikv_service.go b/internal/client/mockserver/mock_tikv_service.go similarity index 88% rename from internal/client/mock_server/mock_tikv_service.go rename to internal/client/mockserver/mock_tikv_service.go index 392d3a5fb..86dfe779e 100644 --- a/internal/client/mock_server/mock_tikv_service.go +++ b/internal/client/mockserver/mock_tikv_service.go @@ -18,7 +18,7 @@ // https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/mock_tikv_service_test.go // -package mock_server +package mockserver import ( "context" @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc" ) +// MockServer is a mock tikv server for testing purpose. type MockServer struct { tikvpb.TikvServer grpcServer *grpc.Server @@ -49,6 +50,7 @@ type MockServer struct { } } +// KvGet implements the TikvServer interface. func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { if err := s.checkMetadata(ctx); err != nil { return nil, err @@ -56,6 +58,7 @@ func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpc return &kvrpcpb.GetResponse{}, nil } +// KvPrewrite implements the TikvServer interface. func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { if err := s.checkMetadata(ctx); err != nil { return nil, err @@ -63,6 +66,7 @@ func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteReques return &kvrpcpb.PrewriteResponse{}, nil } +// CoprocessorStream implements the TikvServer interface. func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err @@ -70,6 +74,7 @@ func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_ return ss.Send(&coprocessor.Response{}) } +// BatchCommands implements the TikvServer interface. func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err @@ -101,6 +106,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { } } +// SetMetaChecker implements the TikvServer interface. func (s *MockServer) SetMetaChecker(check func(context.Context) error) { s.metaChecker.Lock() s.metaChecker.check = check @@ -116,19 +122,23 @@ func (s *MockServer) checkMetadata(ctx context.Context) error { return nil } +// IsRunning returns whether the mock server is running. func (s *MockServer) IsRunning() bool { return atomic.LoadInt64(&s.running) == 1 } +// Addr returns the address of the mock server. func (s *MockServer) Addr() string { return s.addr } +// Stop stops the mock server. func (s *MockServer) Stop() { s.grpcServer.Stop() atomic.StoreInt64(&s.running, 0) } +// Start starts the mock server. func (s *MockServer) Start(addr string) int { if addr == "" { addr = fmt.Sprintf("%s:%d", "127.0.0.1", 0) @@ -159,7 +169,7 @@ func (s *MockServer) Start(addr string) int { return port } -// StartMockTikvService try to start a gRPC server and retrun the server instance and binded port. +// StartMockTikvService try to start a gRPC server and return the server instance and binded port. func StartMockTikvService() (*MockServer, int) { server := &MockServer{} port := server.Start("") diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 7adc5e014..812e945c9 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -55,7 +55,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/internal/client" - "github.com/tikv/client-go/v2/internal/client/mock_server" + "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/tikvrpc" @@ -670,7 +670,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { conf.TiKVClient.MaxBatchSize = 128 })() - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() s.True(port > 0) rpcClient := client.NewRPCClient() fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { diff --git a/tikv/gc.go b/tikv/gc.go index a44ccf48c..e3ebcd373 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -35,7 +35,7 @@ import ( zap "go.uber.org/zap" ) -// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. +// GCScanLockLimit We don't want gc to sweep out the cached info belong to other processes, like coprocessor. const GCScanLockLimit = txnlock.ResolvedCacheSize / 2 // GC does garbage collection (GC) of the TiKV cluster. @@ -77,11 +77,13 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc return nil } +// BaseRegionLockResolver is a base implementation of RegionLockResolver. type BaseRegionLockResolver struct { identifier string store Storage } +// NewRegionLockResolver creates a new BaseRegionLockResolver. func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver { return &BaseRegionLockResolver{ identifier: identifier, @@ -89,18 +91,22 @@ func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockReso } } +// Identifier represents the name of this resolver. func (l *BaseRegionLockResolver) Identifier() string { return l.identifier } +// ResolveLocksInOneRegion tries to resolve expired locks for one region. func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) { return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc) } +// ScanLocksInOneRegion return locks and location with given start key in a region. func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) { return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit) } +// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver. func (l *BaseRegionLockResolver) GetStore() Storage { return l.store } @@ -130,6 +136,7 @@ type RegionLockResolver interface { GetStore() Storage } +// ResolveLocksForRange resolves locks in a range. func ResolveLocksForRange( ctx context.Context, resolver RegionLockResolver, diff --git a/tikv/kv.go b/tikv/kv.go index 733887f6a..811a1896c 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -528,6 +528,9 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) { func (s *KVStore) safeTSUpdater() { defer s.wg.Done() t := time.NewTicker(time.Second * 2) + if _, e := util.EvalFailpoint("mockFastSafeTSUpdater"); e == nil { + t.Reset(time.Millisecond * 100) + } defer t.Stop() ctx, cancel := context.WithCancel(s.ctx) ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) @@ -625,7 +628,9 @@ func (s *KVStore) setClusterMinSafeTSByPD(ctx context.Context) bool { } else if clusterMinSafeTS != 0 { // Update metrics. preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope) - if preClusterMinSafeTS > clusterMinSafeTS { + // If preClusterMinSafeTS is maxUint64, it means that the min safe ts has not been initialized. + // related to https://github.com/tikv/client-go/issues/991 + if preClusterMinSafeTS != math.MaxUint64 && preClusterMinSafeTS > clusterMinSafeTS { skipClusterSafeTSUpdateCounter.Inc() preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS) clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds()) diff --git a/util/pd.go b/util/pd.go index 38aac0330..162910838 100644 --- a/util/pd.go +++ b/util/pd.go @@ -111,7 +111,7 @@ func (p *PDHTTPClient) GetClusterMinResolvedTS(ctx context.Context) (uint64, err logutil.BgLogger().Debug(message.Error()) return 0, errors.Trace(message) } - if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil { + if val, e := EvalFailpoint("InjectPDMinResolvedTS"); e == nil { // Need to make sure successfully get from real pd. if d.MinResolvedTS != 0 { // Should be val.(uint64) but failpoint doesn't support that.