Skip to content

Commit

Permalink
meta/autoid: make autoid client ResetConn operation concurrency-safe (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jan 19, 2024
1 parent e757ddc commit d8298d5
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions pkg/meta/autoid/autoid_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -56,6 +57,8 @@ type ClientDiscover struct {
// See https://github.com/grpc/grpc-go/issues/5321
*grpc.ClientConn
}
// version is increased in every ResetConn() to make the operation safe.
version uint64
}

const (
Expand All @@ -70,27 +73,27 @@ func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover {
}

// GetClient gets the AutoIDAllocClient.
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) {
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, uint64, error) {
d.mu.RLock()
cli := d.mu.AutoIDAllocClient
if cli != nil {
d.mu.RUnlock()
return cli, nil
return cli, atomic.LoadUint64(&d.version), nil
}
d.mu.RUnlock()

d.mu.Lock()
defer d.mu.Unlock()
if d.mu.AutoIDAllocClient != nil {
return d.mu.AutoIDAllocClient, nil
return d.mu.AutoIDAllocClient, atomic.LoadUint64(&d.version), nil
}

resp, err := d.etcdCli.Get(ctx, autoIDLeaderPath, clientv3.WithFirstCreate()...)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return nil, errors.New("autoid service leader not found")
return nil, 0, errors.New("autoid service leader not found")
}

addr := string(resp.Kvs[0].Value)
Expand All @@ -100,19 +103,19 @@ func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClien
clusterSecurity := security.ClusterSecurity()
tlsConfig, err := clusterSecurity.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
logutil.BgLogger().Info("connect to leader", zap.String("category", "autoid client"), zap.String("addr", addr))
grpcConn, err := grpc.Dial(addr, opt)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
cli = autoid.NewAutoIDAllocClient(grpcConn)
d.mu.AutoIDAllocClient = cli
d.mu.ClientConn = grpcConn
return cli, nil
return cli, atomic.LoadUint64(&d.version), nil
}

// Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch.
Expand All @@ -131,7 +134,7 @@ func (sp *singlePointAlloc) Alloc(ctx context.Context, n uint64, increment, offs

var bo backoffer
retry:
cli, err := sp.GetClient(ctx)
cli, ver, err := sp.GetClient(ctx)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand All @@ -149,7 +152,7 @@ retry:
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDAlloc, metrics.RetLabel(err)).Observe(time.Since(start).Seconds())
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
sp.ResetConn(err)
sp.resetConn(ver, err)
bo.Backoff()
goto retry
}
Expand Down Expand Up @@ -188,6 +191,14 @@ func (b *backoffer) Backoff() {
time.Sleep(b.Duration)
}

func (d *ClientDiscover) resetConn(version uint64, reason error) {
// Avoid repeated Reset operation
if !atomic.CompareAndSwapUint64(&d.version, version, version+1) {
return
}
d.ResetConn(reason)
}

// ResetConn reset the AutoIDAllocClient and underlying grpc connection.
// The next GetClient() call will recreate the client connecting to the correct leader by querying etcd.
func (d *ClientDiscover) ResetConn(reason error) {
Expand All @@ -205,10 +216,14 @@ func (d *ClientDiscover) ResetConn(reason error) {
d.mu.Unlock()
// Close grpc.ClientConn to release resource.
if grpcConn != nil {
err := grpcConn.Close()
if err != nil {
logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err))
}
go func() {
// Doen't close the conn immediately, in case the other sessions are still using it.
time.Sleep(200 * time.Millisecond)
err := grpcConn.Close()
if err != nil {
logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err))
}
}()
}
}

Expand All @@ -235,7 +250,7 @@ func (sp *singlePointAlloc) Rebase(ctx context.Context, newBase int64, _ bool) e
func (sp *singlePointAlloc) rebase(ctx context.Context, newBase int64, force bool) error {
var bo backoffer
retry:
cli, err := sp.GetClient(ctx)
cli, ver, err := sp.GetClient(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -249,7 +264,7 @@ retry:
})
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
sp.ResetConn(err)
sp.resetConn(ver, err)
bo.Backoff()
goto retry
}
Expand Down

0 comments on commit d8298d5

Please sign in to comment.