diff --git a/client/client.go b/client/client.go index 8838c184d92..b9535aa504e 100644 --- a/client/client.go +++ b/client/client.go @@ -793,7 +793,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur req := c.getTSORequest(ctx, dcLocation) if err := c.dispatchTSORequestWithRetry(req); err != nil { - req.done <- err + req.tryDone(err) } return req } diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index bd7a440fb08..5f3b08c2895 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -19,7 +19,10 @@ import ( "runtime/trace" "time" + "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/tikv/pd/client/tsoutil" + "go.uber.org/zap" ) type tsoBatchController struct { @@ -138,7 +141,7 @@ func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical in tsoReq := tbc.collectedRequests[i] tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End() - tsoReq.done <- err + tsoReq.tryDone(err) } // Prevent the finished requests from being processed again. tbc.collectedRequestCount = 0 @@ -147,6 +150,15 @@ func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical in func (tbc *tsoBatchController) revokePendingRequests(err error) { for i := 0; i < len(tbc.tsoRequestCh); i++ { req := <-tbc.tsoRequestCh - req.done <- err + req.tryDone(err) } } + +func (tbc *tsoBatchController) clear() { + log.Info("[pd] clear the tso batch controller", + zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize), + zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh))) + tsoErr := errors.WithStack(errClosing) + tbc.finishCollectedRequests(0, 0, 0, tsoErr) + tbc.revokePendingRequests(tsoErr) +} diff --git a/client/tso_client.go b/client/tso_client.go index c563df0efdb..5f8b12df36f 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "go.uber.org/zap" @@ -64,6 +63,13 @@ var tsoReqPool = sync.Pool{ }, } +func (req *tsoRequest) tryDone(err error) { + select { + case req.done <- err: + default: + } +} + type tsoClient struct { ctx context.Context cancel context.CancelFunc @@ -140,9 +146,8 @@ func (c *tsoClient) Close() { c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { if dispatcherInterface != nil { dispatcher := dispatcherInterface.(*tsoDispatcher) - tsoErr := errors.WithStack(errClosing) - dispatcher.tsoBatchController.revokePendingRequests(tsoErr) dispatcher.dispatcherCancel() + dispatcher.tsoBatchController.clear() } return true }) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index a625f8dbbe1..88f8ffd61b5 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -95,8 +95,23 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { // tsoClient is closed due to the PD service mode switch, which is retryable. return true, c.ctx.Err() default: + // This failpoint will increase the possibility that the request is sent to a closed dispatcher. + failpoint.Inject("delayDispatchTSORequest", func() { + time.Sleep(time.Second) + }) dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request } + // Check the contexts again to make sure the request is not been sent to a closed dispatcher. + // Never retry on these conditions to prevent unexpected data race. + select { + case <-request.requestCtx.Done(): + return false, request.requestCtx.Err() + case <-request.clientCtx.Done(): + return false, request.clientCtx.Err() + case <-c.ctx.Done(): + return false, c.ctx.Err() + default: + } return false, nil } @@ -368,6 +383,8 @@ func (c *tsoClient) handleDispatcher( cc.(*tsoConnectionContext).cancel() return true }) + // Clear the tso batch controller. + tbc.clear() c.wg.Done() }() // Call updateTSOConnectionCtxs once to init the connectionCtxs first. diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index b0bd6f1d4e5..d4f484087cf 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -425,8 +425,9 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) } -func (suite *tsoClientTestSuite) TestGetTSWhileRestingTSOClient() { +func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/client/delayDispatchTSORequest", "return(true)")) var ( clients []pd.Client stopSignal atomic.Bool @@ -467,6 +468,7 @@ func (suite *tsoClientTestSuite) TestGetTSWhileRestingTSOClient() { } stopSignal.Store(true) wg.Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/client/delayDispatchTSORequest")) } // When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time.