Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/tso: fix the bug that TSO may hang when switching mode #7937

Merged
merged 4 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,23 +782,42 @@
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
req.start = time.Now()
req.dcLocation = dcLocation

if tsoClient == nil {
req.done <- errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
return req
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.done <- err
}
return req
}

const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

if err := tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
// Wait for a while and try again
time.Sleep(50 * time.Millisecond)
if err = tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
req.done <- err
func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
var (
retryable bool
err error
)
for i := 0; i < dispatchRetryCount; i++ {
// Do not delay for the first time.
if i > 0 {
time.Sleep(dispatchRetryDelay)
}
// Get the tsoClient each time, as it may be initialized or switched during the process.
tsoClient := c.getTSOClient()
if tsoClient == nil {
err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
continue

Check warning on line 813 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L812-L813

Added lines #L812 - L813 were not covered by tests
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
break
}
}
return req
return err
}

func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
Expand Down
2 changes: 1 addition & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type tsoClient struct {

// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding dc-location TSO channel.
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *tsoInfo while the tsoInfo is the last TSO info
Expand Down
27 changes: 18 additions & 9 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,31 @@
}
}

func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, request *tsoRequest) error {
dispatcher, ok := c.tsoDispatcher.Load(dcLocation)
func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) {
dispatcher, ok := c.tsoDispatcher.Load(request.dcLocation)
if !ok {
err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation))
log.Error("[tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err))
err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation))
log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
return err
// New dispatcher could be created in the meantime, which is retryable.
return true, err
}

defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End()
select {
case <-ctx.Done():
return ctx.Err()
case dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request:
case <-request.requestCtx.Done():

Check warning on line 88 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L88

Added line #L88 was not covered by tests
// Caller cancelled the request, no need to retry.
return false, request.requestCtx.Err()

Check warning on line 90 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L90

Added line #L90 was not covered by tests
case <-request.clientCtx.Done():
// Client is closed, no need to retry.
return false, request.clientCtx.Err()
case <-c.ctx.Done():
// tsoClient is closed due to the PD service mode switch, which is retryable.
return true, c.ctx.Err()
default:
dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request
}
return nil
return false, nil
}

// TSFuture is a future which promises to return a TSO.
Expand Down
16 changes: 15 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,9 +1062,23 @@ func TestCloseClient(t *testing.T) {
defer cluster.Destroy()
endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
cli.GetTSAsync(context.TODO())
ts := cli.GetTSAsync(context.TODO())
time.Sleep(time.Second)
cli.Close()
physical, logical, err := ts.Wait()
if err == nil {
re.Greater(physical, int64(0))
re.Greater(logical, int64(0))
} else {
re.ErrorIs(err, context.Canceled)
re.Zero(physical)
re.Zero(logical)
}
ts = cli.GetTSAsync(context.TODO())
physical, logical, err = ts.Wait()
re.ErrorIs(err, context.Canceled)
re.Zero(physical)
re.Zero(logical)
}

type idAllocator struct {
Expand Down
Loading