Skip to content

Commit

Permalink
min-safe-ts: fix MinSafeTS might be set to MaxUint64 permanently (#999)
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp authored Nov 21, 2023
1 parent 3499a6d commit c1041a4
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 88 deletions.
144 changes: 71 additions & 73 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/prewrite_test.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv_test

import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -67,12 +48,15 @@ type apiTestSuite struct {
}

func (s *apiTestSuite) SetupTest() {
require := s.Require()
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().Nil(err)
require.NoError(err)
rpcClient := tikv.NewRPCClient()
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
require.NoError(err)
s.store = store
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, tikvrpc.TiKV, 1, nil)
Expand All @@ -85,16 +69,28 @@ func (s *apiTestSuite) storeAddr(id uint64) string {
type storeSafeTsMockClient struct {
tikv.Client
requestCount int32
kvSafeTS uint64
}

func newStoreSafeTsMockClient(client tikv.Client) storeSafeTsMockClient {
return storeSafeTsMockClient{
Client: client,
kvSafeTS: 150, // Set a default value.
}
}

func (c *storeSafeTsMockClient) SetKVSafeTS(ts uint64) {
c.kvSafeTS = ts
}

func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if req.Type != tikvrpc.CmdStoreSafeTS {
return c.Client.SendRequest(ctx, addr, req, timeout)
}
atomic.AddInt32(&c.requestCount, 1)
resp := &tikvrpc.Response{}
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 150}
return resp, nil
return &tikvrpc.Response{
Resp: &kvrpcpb.StoreSafeTSResponse{SafeTs: c.kvSafeTS},
}, nil
}

func (c *storeSafeTsMockClient) Close() error {
Expand All @@ -105,66 +101,42 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) {
s.Eventually(func() bool {
return s.store.GetMinSafeTS(txnScope) == ts
}, time.Second, 200*time.Millisecond)
}

func (s *apiTestSuite) TestGetClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Try to get the minimum resolved timestamp of the store from TiKV.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 150)
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()

// Set DC label for store 1.
dcLabel := "testDC"
Expand All @@ -182,21 +154,47 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
s.store.GetRegionCache().SetRegionCacheStore(1, tikvrpc.TiKV, 1, labels)

// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
s.waitForMinSafeTS(dcLabel, 150)

require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
util.EnableFailpoints()
require := s.Require()
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)

// Make sure the store's min resolved ts is not initialized.
mockClient.SetKVSafeTS(0)
// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64)
// Make sure the store's min resolved ts is not initialized.
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
// Make sure the store's min resolved ts is not regarded as MaxUint64.
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Fallback to KV Request when PD server not support get min resolved ts.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
mockClient.SetKVSafeTS(150)
s.waitForMinSafeTS(oracle.GlobalTxnScope, 150)
// Make sure the minSafeTS can advance.
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TearDownTest() {
if s.store != nil {
s.Require().Nil(s.store.Close())
s.Require().NoError(s.store.Close())
}
s.Require().NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}
6 changes: 3 additions & 3 deletions internal/client/client_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client/mock_server"
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestPanicInRecvLoop(t *testing.T) {
require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`))
require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`))

server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestPanicInRecvLoop(t *testing.T) {
}

func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := server.Addr()
Expand Down
10 changes: 5 additions & 5 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client/mock_server"
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestCancelTimeoutRetErr(t *testing.T) {
}

func TestSendWhenReconnect(t *testing.T) {
server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)

rpcClient := NewRPCClient()
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestCollapseResolveLock(t *testing.T) {
}

func TestForwardMetadataByUnaryCall(t *testing.T) {
server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
}

func TestForwardMetadataByBatchCommands(t *testing.T) {
server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := server.Addr()
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
conf.TiKVClient.MaxBatchSize = 128
})()

server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
require.True(t, server.IsRunning())
addr := server.Addr()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/mock_tikv_service_test.go
//

package mock_server
package mockserver

import (
"context"
Expand All @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc"
)

// MockServer is a mock tikv server for testing purpose.
type MockServer struct {
tikvpb.TikvServer
grpcServer *grpc.Server
Expand All @@ -49,27 +50,31 @@ type MockServer struct {
}
}

// KvGet implements the TikvServer interface.
func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
if err := s.checkMetadata(ctx); err != nil {
return nil, err
}
return &kvrpcpb.GetResponse{}, nil
}

// KvPrewrite implements the TikvServer interface.
func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
if err := s.checkMetadata(ctx); err != nil {
return nil, err
}
return &kvrpcpb.PrewriteResponse{}, nil
}

// CoprocessorStream implements the TikvServer interface.
func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error {
if err := s.checkMetadata(ss.Context()); err != nil {
return err
}
return ss.Send(&coprocessor.Response{})
}

// BatchCommands implements the TikvServer interface.
func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
if err := s.checkMetadata(ss.Context()); err != nil {
return err
Expand Down Expand Up @@ -101,6 +106,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
}
}

// SetMetaChecker implements the TikvServer interface.
func (s *MockServer) SetMetaChecker(check func(context.Context) error) {
s.metaChecker.Lock()
s.metaChecker.check = check
Expand All @@ -116,19 +122,23 @@ func (s *MockServer) checkMetadata(ctx context.Context) error {
return nil
}

// IsRunning returns whether the mock server is running.
func (s *MockServer) IsRunning() bool {
return atomic.LoadInt64(&s.running) == 1
}

// Addr returns the address of the mock server.
func (s *MockServer) Addr() string {
return s.addr
}

// Stop stops the mock server.
func (s *MockServer) Stop() {
s.grpcServer.Stop()
atomic.StoreInt64(&s.running, 0)
}

// Start starts the mock server.
func (s *MockServer) Start(addr string) int {
if addr == "" {
addr = fmt.Sprintf("%s:%d", "127.0.0.1", 0)
Expand Down Expand Up @@ -159,7 +169,7 @@ func (s *MockServer) Start(addr string) int {
return port
}

// StartMockTikvService try to start a gRPC server and retrun the server instance and binded port.
// StartMockTikvService try to start a gRPC server and return the server instance and binded port.
func StartMockTikvService() (*MockServer, int) {
server := &MockServer{}
port := server.Start("")
Expand Down
Loading

0 comments on commit c1041a4

Please sign in to comment.