Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

min-safe-ts: fix MinSafeTS might be set to MaxUint64 permanently #999

Merged
merged 4 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 71 additions & 73 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/prewrite_test.go
//

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv_test

import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -67,12 +48,15 @@ type apiTestSuite struct {
}

func (s *apiTestSuite) SetupTest() {
require := s.Require()
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().Nil(err)
require.NoError(err)
rpcClient := tikv.NewRPCClient()
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
require.NoError(err)
s.store = store
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, tikvrpc.TiKV, 1, nil)
Expand All @@ -85,16 +69,28 @@ func (s *apiTestSuite) storeAddr(id uint64) string {
type storeSafeTsMockClient struct {
tikv.Client
requestCount int32
kvSafeTS uint64
}

func newStoreSafeTsMockClient(client tikv.Client) storeSafeTsMockClient {
return storeSafeTsMockClient{
Client: client,
kvSafeTS: 150, // Set a default value.
}
}

func (c *storeSafeTsMockClient) SetKVSafeTS(ts uint64) {
c.kvSafeTS = ts
}

func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if req.Type != tikvrpc.CmdStoreSafeTS {
return c.Client.SendRequest(ctx, addr, req, timeout)
}
atomic.AddInt32(&c.requestCount, 1)
resp := &tikvrpc.Response{}
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 150}
return resp, nil
return &tikvrpc.Response{
Resp: &kvrpcpb.StoreSafeTSResponse{SafeTs: c.kvSafeTS},
}, nil
}

func (c *storeSafeTsMockClient) Close() error {
Expand All @@ -105,66 +101,42 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) {
s.Eventually(func() bool {
return s.store.GetMinSafeTS(txnScope) == ts
}, time.Second, 200*time.Millisecond)
}

func (s *apiTestSuite) TestGetClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Try to get the minimum resolved timestamp of the store from TiKV.
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 150)
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()

// Set DC label for store 1.
dcLabel := "testDC"
Expand All @@ -182,21 +154,47 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
s.store.GetRegionCache().SetRegionCacheStore(1, tikvrpc.TiKV, 1, labels)

// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
s.waitForMinSafeTS(dcLabel, 150)

require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
util.EnableFailpoints()
require := s.Require()
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)

// Make sure the store's min resolved ts is not initialized.
mockClient.SetKVSafeTS(0)
// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64)
// Make sure the store's min resolved ts is not initialized.
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
// Make sure the store's min resolved ts is not regarded as MaxUint64.
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))

// Fallback to KV Request when PD server not support get min resolved ts.
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
mockClient.SetKVSafeTS(150)
s.waitForMinSafeTS(oracle.GlobalTxnScope, 150)
// Make sure the minSafeTS can advance.
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
}

func (s *apiTestSuite) TearDownTest() {
if s.store != nil {
s.Require().Nil(s.store.Close())
s.Require().NoError(s.store.Close())
}
s.Require().NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}
6 changes: 3 additions & 3 deletions internal/client/client_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client/mock_server"
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestPanicInRecvLoop(t *testing.T) {
require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`))
require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`))

server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestPanicInRecvLoop(t *testing.T) {
}

func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := server.Addr()
Expand Down
10 changes: 5 additions & 5 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client/mock_server"
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestCancelTimeoutRetErr(t *testing.T) {
}

func TestSendWhenReconnect(t *testing.T) {
server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)

rpcClient := NewRPCClient()
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestCollapseResolveLock(t *testing.T) {
}

func TestForwardMetadataByUnaryCall(t *testing.T) {
server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
}

func TestForwardMetadataByBatchCommands(t *testing.T) {
server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := server.Addr()
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
conf.TiKVClient.MaxBatchSize = 128
})()

server, port := mock_server.StartMockTikvService()
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
require.True(t, server.IsRunning())
addr := server.Addr()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/mock_tikv_service_test.go
//

package mock_server
package mockserver

import (
"context"
Expand All @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc"
)

// MockServer is a mock tikv server for testing purpose.
type MockServer struct {
tikvpb.TikvServer
grpcServer *grpc.Server
Expand All @@ -49,27 +50,31 @@ type MockServer struct {
}
}

// KvGet implements the TikvServer interface.
func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
if err := s.checkMetadata(ctx); err != nil {
return nil, err
}
return &kvrpcpb.GetResponse{}, nil
}

// KvPrewrite implements the TikvServer interface.
func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
if err := s.checkMetadata(ctx); err != nil {
return nil, err
}
return &kvrpcpb.PrewriteResponse{}, nil
}

// CoprocessorStream implements the TikvServer interface.
func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error {
if err := s.checkMetadata(ss.Context()); err != nil {
return err
}
return ss.Send(&coprocessor.Response{})
}

// BatchCommands implements the TikvServer interface.
func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
if err := s.checkMetadata(ss.Context()); err != nil {
return err
Expand Down Expand Up @@ -101,6 +106,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
}
}

// SetMetaChecker implements the TikvServer interface.
func (s *MockServer) SetMetaChecker(check func(context.Context) error) {
s.metaChecker.Lock()
s.metaChecker.check = check
Expand All @@ -116,19 +122,23 @@ func (s *MockServer) checkMetadata(ctx context.Context) error {
return nil
}

// IsRunning returns whether the mock server is running.
func (s *MockServer) IsRunning() bool {
return atomic.LoadInt64(&s.running) == 1
}

// Addr returns the address of the mock server.
func (s *MockServer) Addr() string {
return s.addr
}

// Stop stops the mock server.
func (s *MockServer) Stop() {
s.grpcServer.Stop()
atomic.StoreInt64(&s.running, 0)
}

// Start starts the mock server.
func (s *MockServer) Start(addr string) int {
if addr == "" {
addr = fmt.Sprintf("%s:%d", "127.0.0.1", 0)
Expand Down Expand Up @@ -159,7 +169,7 @@ func (s *MockServer) Start(addr string) int {
return port
}

// StartMockTikvService try to start a gRPC server and retrun the server instance and binded port.
// StartMockTikvService try to start a gRPC server and return the server instance and binded port.
func StartMockTikvService() (*MockServer, int) {
server := &MockServer{}
port := server.Start("")
Expand Down
Loading
Loading