Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/tso: double-check the contexts to prevent waiting for TSO requests in closed chan #7962

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur

req := c.getTSORequest(ctx, dcLocation)
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.done <- err
req.tryDone(err)
}
return req
}
Expand Down
16 changes: 14 additions & 2 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
)

type tsoBatchController struct {
Expand Down Expand Up @@ -138,7 +141,7 @@
tsoReq := tbc.collectedRequests[i]
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End()
tsoReq.done <- err
tsoReq.tryDone(err)
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
Expand All @@ -147,6 +150,15 @@
func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.done <- err
req.tryDone(err)

Check warning on line 153 in client/tso_batch_controller.go

View check run for this annotation

Codecov / codecov/patch

client/tso_batch_controller.go#L153

Added line #L153 was not covered by tests
}
}

func (tbc *tsoBatchController) clear() {
log.Info("[pd] clear the tso batch controller",
zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize),
zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh)))
tsoErr := errors.WithStack(errClosing)
tbc.finishCollectedRequests(0, 0, 0, tsoErr)
tbc.revokePendingRequests(tsoErr)
}
11 changes: 8 additions & 3 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
Expand Down Expand Up @@ -64,6 +63,13 @@
},
}

func (req *tsoRequest) tryDone(err error) {
Copy link
Contributor

@nolouch nolouch Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need to try? from the goroutine profile we can make sure here is no blocking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A request may be revoked right after it is sent to tsoBatchController, then if we double-check the contexts and return an error, done <- err will be blocked.

select {
case req.done <- err:
default:

Check warning on line 69 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L69

Added line #L69 was not covered by tests
}
}

type tsoClient struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -140,9 +146,8 @@
c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
tsoErr := errors.WithStack(errClosing)
dispatcher.tsoBatchController.revokePendingRequests(tsoErr)
dispatcher.dispatcherCancel()
dispatcher.tsoBatchController.clear()
}
return true
})
Expand Down
17 changes: 17 additions & 0 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,23 @@
// tsoClient is closed due to the PD service mode switch, which is retryable.
return true, c.ctx.Err()
default:
// This failpoint will increase the possibility that the request is sent to a closed dispatcher.
failpoint.Inject("delayDispatchTSORequest", func() {
time.Sleep(time.Second)
})
dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request
}
// Check the contexts again to make sure the request is not been sent to a closed dispatcher.
// Never retry on these conditions to prevent unexpected data race.
select {
case <-request.requestCtx.Done():
return false, request.requestCtx.Err()
case <-request.clientCtx.Done():
return false, request.clientCtx.Err()

Check warning on line 110 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L107-L110

Added lines #L107 - L110 were not covered by tests
case <-c.ctx.Done():
return false, c.ctx.Err()
default:
}
return false, nil
}

Expand Down Expand Up @@ -368,6 +383,8 @@
cc.(*tsoConnectionContext).cancel()
return true
})
// Clear the tso batch controller.
tbc.clear()
c.wg.Done()
}()
// Call updateTSOConnectionCtxs once to init the connectionCtxs first.
Expand Down
4 changes: 3 additions & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,9 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval"))
}

func (suite *tsoClientTestSuite) TestGetTSWhileRestingTSOClient() {
func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/client/delayDispatchTSORequest", "return(true)"))
var (
clients []pd.Client
stopSignal atomic.Bool
Expand Down Expand Up @@ -467,6 +468,7 @@ func (suite *tsoClientTestSuite) TestGetTSWhileRestingTSOClient() {
}
stopSignal.Store(true)
wg.Wait()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/delayDispatchTSORequest"))
}

// When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time.
Expand Down
Loading