Skip to content

Commit

Permalink
client/tso: fix the issue where the TSO follower proxy cannot be clos…
Browse files Browse the repository at this point in the history
…ed (#8719)

close #8709

Remove outdated follower connections after disabling the TSO follower proxy.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Oct 16, 2024
1 parent 3d4c416 commit b70107e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
6 changes: 5 additions & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ func (c *tsoClient) tryConnectToTSO(
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
// Only store the `connectionCtx` if it does not exist before.
connectionCtxs.LoadOrStore(newURL, connectionCtx)
if connectionCtx != nil {
connectionCtxs.LoadOrStore(newURL, connectionCtx)
}
// Remove all other `connectionCtx`s.
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
Expand All @@ -405,6 +407,8 @@ func (c *tsoClient) tryConnectToTSO(
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
if _, ok := connectionCtxs.Load(url); ok {
// Just trigger the clean up of the stale connection contexts.
updateAndClear(url, nil)
return nil
}
if cc != nil {
Expand Down
36 changes: 35 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ func TestTSOFollowerProxy(t *testing.T) {
defer cli1.Close()
cli2 := setupCli(ctx, re, endpoints)
defer cli2.Close()
cli2.UpdateOption(pd.EnableTSOFollowerProxy, true)
err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, true)
re.NoError(err)

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
Expand All @@ -385,6 +386,39 @@ func TestTSOFollowerProxy(t *testing.T) {
}()
}
wg.Wait()

// Disable the follower proxy and check if the stream is updated.
err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, false)
re.NoError(err)

wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
physical, logical, err := cli2.GetTS(context.Background())
if err != nil {
// It can only be the context canceled error caused by the stale stream cleanup.
re.ErrorContains(err, "context canceled")
continue
}
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
// After requesting with the follower proxy, request with the leader directly.
physical, logical, err = cli1.GetTS(context.Background())
re.NoError(err)
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
// Ensure at least one request is successful.
re.NotEmpty(lastTS)
}()
}
wg.Wait()
}

func TestTSOFollowerProxyWithTSOService(t *testing.T) {
Expand Down

0 comments on commit b70107e

Please sign in to comment.