From 1cd01dba40c26a8e1d7eb59d880d6c59ba5c3bd4 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 12 Jun 2023 17:00:53 -0700 Subject: [PATCH] Fault inject in TSO Proxy Signed-off-by: Bin Shi --- server/config/config.go | 14 ++--- server/grpc_service.go | 34 +++++++---- server/server.go | 6 +- tests/integrations/mcs/tso/proxy_test.go | 74 ++++++++++++++++++++++++ 4 files changed, 107 insertions(+), 21 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 48318032359..f071a893c12 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -85,9 +85,9 @@ type Config struct { // Set this to 0 will disable TSO Proxy. // Set this to the negative value to disable the limit. MaxConcurrentTSOProxyStreamings int `toml:"max-concurrent-tso-proxy-streamings" json:"max-concurrent-tso-proxy-streamings"` - // TSOProxyClientRecvTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream. + // TSOProxyRecvFromClientTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream. // After the timeout, the TSO proxy will close the grpc TSO stream. - TSOProxyClientRecvTimeout typeutil.Duration `toml:"tso-proxy-client-recv-timeout" json:"tso-proxy-client-recv-timeout"` + TSOProxyRecvFromClientTimeout typeutil.Duration `toml:"tso-proxy-recv-from-client-timeout" json:"tso-proxy-recv-from-client-timeout"` // TSOSaveInterval is the interval to save timestamp. TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"` @@ -229,7 +229,7 @@ const ( defaultDRWaitStoreTimeout = time.Minute defaultMaxConcurrentTSOProxyStreamings = 5000 - defaultTSOProxyClientRecvTimeout = 1 * time.Hour + defaultTSOProxyRecvFromClientTimeout = 1 * time.Hour defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second // defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`. @@ -455,7 +455,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { } configutil.AdjustInt(&c.MaxConcurrentTSOProxyStreamings, defaultMaxConcurrentTSOProxyStreamings) - configutil.AdjustDuration(&c.TSOProxyClientRecvTimeout, defaultTSOProxyClientRecvTimeout) + configutil.AdjustDuration(&c.TSOProxyRecvFromClientTimeout, defaultTSOProxyRecvFromClientTimeout) configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease) configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval) @@ -1271,9 +1271,9 @@ func (c *Config) GetMaxConcurrentTSOProxyStreamings() int { return c.MaxConcurrentTSOProxyStreamings } -// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout. -func (c *Config) GetTSOProxyClientRecvTimeout() time.Duration { - return c.TSOProxyClientRecvTimeout.Duration +// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client. +func (c *Config) GetTSOProxyRecvFromClientTimeout() time.Duration { + return c.TSOProxyRecvFromClientTimeout.Duration } // GetTSOUpdatePhysicalInterval returns TSO update physical interval. diff --git a/server/grpc_service.go b/server/grpc_service.go index dcc3e02abf2..169d97b5a3c 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -67,7 +67,7 @@ var ( ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") - ErrTSOProxyClientRecvTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy client recv timeout. stream closed by server") + ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") ) // GrpcServer wraps Server to provide grpc service. @@ -435,7 +435,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { default: } - request, err := server.Recv(s.GetTSOProxyClientRecvTimeout()) + request, err := server.Recv(s.GetTSOProxyRecvFromClientTimeout()) if err == io.EOF { return nil } @@ -550,6 +550,11 @@ func (s *GrpcServer) forwardTSORequestAsync( DcLocation: request.GetDcLocation(), } + failpoint.Inject("tsoProxySendToTSOTimeout", func() { + <-ctxTimeout.Done() + failpoint.Return() + }) + if err := forwardStream.Send(tsopbReq); err != nil { select { case <-ctxTimeout.Done(): @@ -565,23 +570,21 @@ func (s *GrpcServer) forwardTSORequestAsync( default: } + failpoint.Inject("tsoProxyRecvFromTSOTimeout", func() { + <-ctxTimeout.Done() + failpoint.Return() + }) + response, err := forwardStream.Recv() if err != nil { if strings.Contains(err.Error(), errs.NotLeaderErr) { s.tsoPrimaryWatcher.ForceLoad() } - select { - case <-ctxTimeout.Done(): - return - case tsoRespCh <- &tsopbTSOResponse{err: err}: - } - return } - select { case <-ctxTimeout.Done(): return - case tsoRespCh <- &tsopbTSOResponse{response: response}: + case tsoRespCh <- &tsopbTSOResponse{response: response, err: err}: } } @@ -609,6 +612,10 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error { done := make(chan error, 1) go func() { defer logutil.LogPanic() + failpoint.Inject("tsoProxyFailToSendToClient", func() { + done <- errors.New("injected error") + failpoint.Return() + }) done <- s.stream.Send(m) }() select { @@ -627,6 +634,11 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { if atomic.LoadInt32(&s.closed) == 1 { return nil, io.EOF } + failpoint.Inject("tsoProxyRecvFromClientTimeout", func(val failpoint.Value) { + if customTimeoutInSeconds, ok := val.(int); ok { + timeout = time.Duration(customTimeoutInSeconds) * time.Second + } + }) requestCh := make(chan *pdpbTSORequest, 1) go func() { defer logutil.LogPanic() @@ -642,7 +654,7 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) { return req.request, nil case <-time.After(timeout): atomic.StoreInt32(&s.closed, 1) - return nil, ErrTSOProxyClientRecvTimeout + return nil, ErrTSOProxyRecvFromClientTimeout } } diff --git a/server/server.go b/server/server.go index a1cf0e3da16..40f5f4d59bd 100644 --- a/server/server.go +++ b/server/server.go @@ -1909,9 +1909,9 @@ func (s *Server) GetMaxConcurrentTSOProxyStreamings() int { return s.cfg.GetMaxConcurrentTSOProxyStreamings() } -// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout. -func (s *Server) GetTSOProxyClientRecvTimeout() time.Duration { - return s.cfg.GetTSOProxyClientRecvTimeout() +// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client. +func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration { + return s.cfg.GetTSOProxyRecvFromClientTimeout() } // GetLeaderLease returns the leader lease. diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index d4d73a74b83..d873665c258 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/stretchr/testify/require" @@ -194,6 +195,79 @@ func (s *tsoProxyTestSuite) TestTSOProxyClientsWithSameContext() { s.cleanupGRPCStreams(cleanupFuncs) } +// TestTSOProxyRecvFromClientTimeout tests the TSO Proxy can properly close the grpc stream on the server side +// when the client does not send any request to the server for a long time. +func (s *tsoProxyTestSuite) TestTSOProxyRecvFromClientTimeout() { + re := s.Require() + + // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout", `return(1)`)) + streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + // Sleep 2 seconds to make the TSO Proxy's grpc stream timeout on the server side. + time.Sleep(2 * time.Second) + err := streams[0].Send(s.defaultReq) + re.Error(err) + s.cleanupGRPCStreams(cleanupFuncs) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout")) + + // Verify the streams with no fault injection can work correctly. + s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true) +} + +// TestTSOProxyFailToSendToClient tests the TSO Proxy can properly close the grpc stream on the server side +// when it fails to send the response to the client. +func (s *tsoProxyTestSuite) TestTSOProxyFailToSendToClient() { + re := s.Require() + + // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyFailToSendToClient", `return(true)`)) + streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + err := streams[0].Send(s.defaultReq) + re.NoError(err) + _, err = streams[0].Recv() + re.Error(err) + s.cleanupGRPCStreams(cleanupFuncs) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyFailToSendToClient")) + + s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true) +} + +// TestTSOProxySendToTSOTimeout tests the TSO Proxy can properly close the grpc stream on the server side +// when it sends the request to the TSO service and encounters timeout. +func (s *tsoProxyTestSuite) TestTSOProxySendToTSOTimeout() { + re := s.Require() + + // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout", `return(true)`)) + streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + err := streams[0].Send(s.defaultReq) + re.NoError(err) + _, err = streams[0].Recv() + re.Error(err) + s.cleanupGRPCStreams(cleanupFuncs) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout")) + + s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true) +} + +// TestTSOProxyRecvFromTSOTimeout tests the TSO Proxy can properly close the grpc stream on the server side +// when it receives the response from the TSO service and encounters timeout. +func (s *tsoProxyTestSuite) TestTSOProxyRecvFromTSOTimeout() { + re := s.Require() + + // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout", `return(true)`)) + streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + err := streams[0].Send(s.defaultReq) + re.NoError(err) + _, err = streams[0].Recv() + re.Error(err) + s.cleanupGRPCStreams(cleanupFuncs) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout")) + + s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true) +} + func (s *tsoProxyTestSuite) cleanupGRPCStreams(cleanupFuncs []testutil.CleanupFunc) { for i := 0; i < len(cleanupFuncs); i++ { if cleanupFuncs[i] != nil {