From ea0f9cc971e701c4e3280ffab7c19104087bb63e Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sat, 20 Jan 2024 09:03:18 +0800 Subject: [PATCH] meta/autoid: make autoid client ResetConn operation concurrency-safe (#50522) (#50592) close pingcap/tidb#50519 --- meta/autoid/autoid_service.go | 47 +++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/meta/autoid/autoid_service.go b/meta/autoid/autoid_service.go index 682317b32e3de..439d75bd73b75 100644 --- a/meta/autoid/autoid_service.go +++ b/meta/autoid/autoid_service.go @@ -18,6 +18,7 @@ import ( "context" "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -55,6 +56,8 @@ type ClientDiscover struct { // See https://github.com/grpc/grpc-go/issues/5321 *grpc.ClientConn } + // version is increased in every ResetConn() to make the operation safe. + version uint64 } const ( @@ -69,27 +72,27 @@ func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover { } // GetClient gets the AutoIDAllocClient. -func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) { +func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, uint64, error) { d.mu.RLock() cli := d.mu.AutoIDAllocClient if cli != nil { d.mu.RUnlock() - return cli, nil + return cli, atomic.LoadUint64(&d.version), nil } d.mu.RUnlock() d.mu.Lock() defer d.mu.Unlock() if d.mu.AutoIDAllocClient != nil { - return d.mu.AutoIDAllocClient, nil + return d.mu.AutoIDAllocClient, atomic.LoadUint64(&d.version), nil } resp, err := d.etcdCli.Get(ctx, autoIDLeaderPath, clientv3.WithFirstCreate()...) if err != nil { - return nil, errors.Trace(err) + return nil, 0, errors.Trace(err) } if len(resp.Kvs) == 0 { - return nil, errors.New("autoid service leader not found") + return nil, 0, errors.New("autoid service leader not found") } addr := string(resp.Kvs[0].Value) @@ -99,19 +102,19 @@ func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClien clusterSecurity := security.ClusterSecurity() tlsConfig, err := clusterSecurity.ToTLSConfig() if err != nil { - return nil, errors.Trace(err) + return nil, 0, errors.Trace(err) } opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } logutil.BgLogger().Info("[autoid client] connect to leader", zap.String("addr", addr)) grpcConn, err := grpc.Dial(addr, opt) if err != nil { - return nil, errors.Trace(err) + return nil, 0, errors.Trace(err) } cli = autoid.NewAutoIDAllocClient(grpcConn) d.mu.AutoIDAllocClient = cli d.mu.ClientConn = grpcConn - return cli, nil + return cli, atomic.LoadUint64(&d.version), nil } // Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch. @@ -129,7 +132,7 @@ func (sp *singlePointAlloc) Alloc(ctx context.Context, n uint64, increment, offs } retry: - cli, err := sp.GetClient(ctx) + cli, ver, err := sp.GetClient(ctx) if err != nil { return 0, 0, errors.Trace(err) } @@ -147,7 +150,7 @@ retry: if err != nil { if strings.Contains(err.Error(), "rpc error") { time.Sleep(backoffDuration) - sp.ResetConn(err) + sp.resetConn(ver, err) goto retry } return 0, 0, errors.Trace(err) @@ -164,6 +167,14 @@ retry: const backoffDuration = 200 * time.Millisecond +func (d *ClientDiscover) resetConn(version uint64, reason error) { + // Avoid repeated Reset operation + if !atomic.CompareAndSwapUint64(&d.version, version, version+1) { + return + } + d.ResetConn(reason) +} + // ResetConn reset the AutoIDAllocClient and underlying grpc connection. // The next GetClient() call will recreate the client connecting to the correct leader by querying etcd. func (d *ClientDiscover) ResetConn(reason error) { @@ -179,10 +190,14 @@ func (d *ClientDiscover) ResetConn(reason error) { d.mu.Unlock() // Close grpc.ClientConn to release resource. if grpcConn != nil { - err := grpcConn.Close() - if err != nil { - logutil.BgLogger().Warn("[autoid client] close grpc connection error", zap.Error(err)) - } + go func() { + // Doen't close the conn immediately, in case the other sessions are still using it. + time.Sleep(200 * time.Millisecond) + err := grpcConn.Close() + if err != nil { + logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err)) + } + }() } } @@ -208,7 +223,7 @@ func (sp *singlePointAlloc) Rebase(ctx context.Context, newBase int64, _ bool) e func (sp *singlePointAlloc) rebase(ctx context.Context, newBase int64, force bool) error { retry: - cli, err := sp.GetClient(ctx) + cli, ver, err := sp.GetClient(ctx) if err != nil { return errors.Trace(err) } @@ -223,7 +238,7 @@ retry: if err != nil { if strings.Contains(err.Error(), "rpc error") { time.Sleep(backoffDuration) - sp.ResetConn(err) + sp.resetConn(ver, err) goto retry } return errors.Trace(err)