Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config, server, TSO Proxy: add fault injection in TSO Proxy #6588

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ type Config struct {
// Set this to 0 will disable TSO Proxy.
// Set this to the negative value to disable the limit.
MaxConcurrentTSOProxyStreamings int `toml:"max-concurrent-tso-proxy-streamings" json:"max-concurrent-tso-proxy-streamings"`
// TSOProxyClientRecvTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream.
// TSOProxyRecvFromClientTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream.
// After the timeout, the TSO proxy will close the grpc TSO stream.
TSOProxyClientRecvTimeout typeutil.Duration `toml:"tso-proxy-client-recv-timeout" json:"tso-proxy-client-recv-timeout"`
TSOProxyRecvFromClientTimeout typeutil.Duration `toml:"tso-proxy-recv-from-client-timeout" json:"tso-proxy-recv-from-client-timeout"`

// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`
Expand Down Expand Up @@ -229,7 +229,7 @@ const (
defaultDRWaitStoreTimeout = time.Minute

defaultMaxConcurrentTSOProxyStreamings = 5000
defaultTSOProxyClientRecvTimeout = 1 * time.Hour
defaultTSOProxyRecvFromClientTimeout = 1 * time.Hour

defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
Expand Down Expand Up @@ -455,7 +455,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
}

configutil.AdjustInt(&c.MaxConcurrentTSOProxyStreamings, defaultMaxConcurrentTSOProxyStreamings)
configutil.AdjustDuration(&c.TSOProxyClientRecvTimeout, defaultTSOProxyClientRecvTimeout)
configutil.AdjustDuration(&c.TSOProxyRecvFromClientTimeout, defaultTSOProxyRecvFromClientTimeout)

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)
configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval)
Expand Down Expand Up @@ -1271,9 +1271,9 @@ func (c *Config) GetMaxConcurrentTSOProxyStreamings() int {
return c.MaxConcurrentTSOProxyStreamings
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (c *Config) GetTSOProxyClientRecvTimeout() time.Duration {
return c.TSOProxyClientRecvTimeout.Duration
// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (c *Config) GetTSOProxyRecvFromClientTimeout() time.Duration {
return c.TSOProxyRecvFromClientTimeout.Duration
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
Expand Down
34 changes: 23 additions & 11 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var (
ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address")
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrTSOProxyClientRecvTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy client recv timeout. stream closed by server")
ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
)

// GrpcServer wraps Server to provide grpc service.
Expand Down Expand Up @@ -435,7 +435,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
default:
}

request, err := server.Recv(s.GetTSOProxyClientRecvTimeout())
request, err := server.Recv(s.GetTSOProxyRecvFromClientTimeout())
if err == io.EOF {
return nil
}
Expand Down Expand Up @@ -550,6 +550,11 @@ func (s *GrpcServer) forwardTSORequestAsync(
DcLocation: request.GetDcLocation(),
}

failpoint.Inject("tsoProxySendToTSOTimeout", func() {
<-ctxTimeout.Done()
failpoint.Return()
})

if err := forwardStream.Send(tsopbReq); err != nil {
select {
case <-ctxTimeout.Done():
Expand All @@ -565,23 +570,21 @@ func (s *GrpcServer) forwardTSORequestAsync(
default:
}

failpoint.Inject("tsoProxyRecvFromTSOTimeout", func() {
<-ctxTimeout.Done()
failpoint.Return()
})

response, err := forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
}
select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{err: err}:
}
return
}

select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{response: response}:
case tsoRespCh <- &tsopbTSOResponse{response: response, err: err}:
}
}

Expand Down Expand Up @@ -609,6 +612,10 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error {
done := make(chan error, 1)
go func() {
defer logutil.LogPanic()
failpoint.Inject("tsoProxyFailToSendToClient", func() {
done <- errors.New("injected error")
failpoint.Return()
})
done <- s.stream.Send(m)
}()
select {
Expand All @@ -627,6 +634,11 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) {
if atomic.LoadInt32(&s.closed) == 1 {
return nil, io.EOF
}
failpoint.Inject("tsoProxyRecvFromClientTimeout", func(val failpoint.Value) {
if customTimeoutInSeconds, ok := val.(int); ok {
timeout = time.Duration(customTimeoutInSeconds) * time.Second
}
})
requestCh := make(chan *pdpbTSORequest, 1)
go func() {
defer logutil.LogPanic()
Expand All @@ -642,7 +654,7 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) {
return req.request, nil
case <-time.After(timeout):
atomic.StoreInt32(&s.closed, 1)
return nil, ErrTSOProxyClientRecvTimeout
return nil, ErrTSOProxyRecvFromClientTimeout
}
}

Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1909,9 +1909,9 @@ func (s *Server) GetMaxConcurrentTSOProxyStreamings() int {
return s.cfg.GetMaxConcurrentTSOProxyStreamings()
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (s *Server) GetTSOProxyClientRecvTimeout() time.Duration {
return s.cfg.GetTSOProxyClientRecvTimeout()
// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration {
return s.cfg.GetTSOProxyRecvFromClientTimeout()
}

// GetLeaderLease returns the leader lease.
Expand Down
74 changes: 74 additions & 0 deletions tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -194,6 +195,79 @@ func (s *tsoProxyTestSuite) TestTSOProxyClientsWithSameContext() {
s.cleanupGRPCStreams(cleanupFuncs)
}

// TestTSOProxyRecvFromClientTimeout tests the TSO Proxy can properly close the grpc stream on the server side
// when the client does not send any request to the server for a long time.
func (s *tsoProxyTestSuite) TestTSOProxyRecvFromClientTimeout() {
re := s.Require()

// Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout", `return(1)`))
streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1)
// Sleep 2 seconds to make the TSO Proxy's grpc stream timeout on the server side.
time.Sleep(2 * time.Second)
err := streams[0].Send(s.defaultReq)
re.Error(err)
s.cleanupGRPCStreams(cleanupFuncs)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout"))

// Verify the streams with no fault injection can work correctly.
s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true)
}

// TestTSOProxyFailToSendToClient tests the TSO Proxy can properly close the grpc stream on the server side
// when it fails to send the response to the client.
func (s *tsoProxyTestSuite) TestTSOProxyFailToSendToClient() {
re := s.Require()

// Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyFailToSendToClient", `return(true)`))
streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1)
err := streams[0].Send(s.defaultReq)
re.NoError(err)
_, err = streams[0].Recv()
re.Error(err)
s.cleanupGRPCStreams(cleanupFuncs)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyFailToSendToClient"))

s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true)
}

// TestTSOProxySendToTSOTimeout tests the TSO Proxy can properly close the grpc stream on the server side
// when it sends the request to the TSO service and encounters timeout.
func (s *tsoProxyTestSuite) TestTSOProxySendToTSOTimeout() {
re := s.Require()

// Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout", `return(true)`))
streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1)
err := streams[0].Send(s.defaultReq)
re.NoError(err)
_, err = streams[0].Recv()
re.Error(err)
s.cleanupGRPCStreams(cleanupFuncs)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout"))

s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true)
}

// TestTSOProxyRecvFromTSOTimeout tests the TSO Proxy can properly close the grpc stream on the server side
// when it receives the response from the TSO service and encounters timeout.
func (s *tsoProxyTestSuite) TestTSOProxyRecvFromTSOTimeout() {
re := s.Require()

// Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout", `return(true)`))
streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1)
err := streams[0].Send(s.defaultReq)
re.NoError(err)
_, err = streams[0].Recv()
re.Error(err)
s.cleanupGRPCStreams(cleanupFuncs)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout"))

s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true)
}

func (s *tsoProxyTestSuite) cleanupGRPCStreams(cleanupFuncs []testutil.CleanupFunc) {
for i := 0; i < len(cleanupFuncs); i++ {
if cleanupFuncs[i] != nil {
Expand Down