Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta/autoid: make autoid client ResetConn operation concurrency-safe (#50522) #50594

Merged
merged 3 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7041,13 +7041,13 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "e5ef65d9e2bf40b0264e509262462421dfaa3468f0c8061d40e90b88fb8f2d1b",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231116062916-ef6ba8551e52",
sha256 = "fc6b1ce304af19928623f951c23ce58734ce0efb7a36ef621d05be893a94f869",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20240102100501-7ce5860ab342",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231116062916-ef6ba8551e52.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231116062916-ef6ba8551e52.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231116062916-ef6ba8551e52.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231116062916-ef6ba8551e52.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240102100501-7ce5860ab342.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240102100501-7ce5860ab342.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240102100501-7ce5860ab342.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240102100501-7ce5860ab342.zip",
],
)
go_repository(
Expand Down Expand Up @@ -9186,26 +9186,26 @@ def go_deps():
name = "com_sourcegraph_sourcegraph_appdash",
build_file_proto_mode = "disable_global",
importpath = "sourcegraph.com/sourcegraph/appdash",
sha256 = "bd2492d9db05362c2fecd0b3d0f6002c89a6d90d678fb93b4158298ab883736f",
strip_prefix = "sourcegraph.com/sourcegraph/appdash@v0.0.0-20190731080439-ebfcffb1b5c0",
sha256 = "c46b442fa40d2af48e08064f4c16ae3712953a9988cd0f7588fcf5e4fc7a2fed",
strip_prefix = "github.com/sourcegraph/appdash@v0.0.0-20190731080439-ebfcffb1b5c0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/sourcegraph.com/sourcegraph/appdash/com_sourcegraph_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip",
"http://ats.apps.svc/gomod/sourcegraph.com/sourcegraph/appdash/com_sourcegraph_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip",
"https://cache.hawkingrei.com/gomod/sourcegraph.com/sourcegraph/appdash/com_sourcegraph_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/sourcegraph.com/sourcegraph/appdash/com_sourcegraph_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/sourcegraph/appdash/com_github_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip",
"http://ats.apps.svc/gomod/github.com/sourcegraph/appdash/com_github_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip",
"https://cache.hawkingrei.com/gomod/github.com/sourcegraph/appdash/com_github_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/sourcegraph/appdash/com_github_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip",
],
)
go_repository(
name = "com_sourcegraph_sourcegraph_appdash_data",
build_file_proto_mode = "disable_global",
importpath = "sourcegraph.com/sourcegraph/appdash-data",
sha256 = "382adefecd62bb79172e2552bcfb7d45f47122f9bd22259b0566b26fb2627b87",
strip_prefix = "sourcegraph.com/sourcegraph/appdash-data@v0.0.0-20151005221446-73f23eafcf67",
sha256 = "59b71fa8cdb0fe2b1c02739ccf2daeaf28f2e22c4b178cdc8e1b902ad1022bc0",
strip_prefix = "github.com/sourcegraph/appdash-data@v0.0.0-20151005221446-73f23eafcf67",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/sourcegraph.com/sourcegraph/appdash-data/com_sourcegraph_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"http://ats.apps.svc/gomod/sourcegraph.com/sourcegraph/appdash-data/com_sourcegraph_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"https://cache.hawkingrei.com/gomod/sourcegraph.com/sourcegraph/appdash-data/com_sourcegraph_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/sourcegraph.com/sourcegraph/appdash-data/com_sourcegraph_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/sourcegraph/appdash-data/com_github_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"http://ats.apps.svc/gomod/github.com/sourcegraph/appdash-data/com_github_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"https://cache.hawkingrei.com/gomod/github.com/sourcegraph/appdash-data/com_github_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/sourcegraph/appdash-data/com_github_sourcegraph_appdash_data-v0.0.0-20151005221446-73f23eafcf67.zip",
],
)
go_repository(
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20231116065855-46811b6ac353
github.com/tikv/pd/client v0.0.0-20231116062916-ef6ba8551e52
github.com/tikv/pd/client v0.0.0-20240102100501-7ce5860ab342
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down Expand Up @@ -312,4 +312,9 @@ replace (
github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
github.com/pingcap/tidb/pkg/parser => ./pkg/parser

// TODO: `sourcegraph.com/sourcegraph/appdash` has been archived, and the original host has been removed.
// Please remove these dependencies.
sourcegraph.com/sourcegraph/appdash => github.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
sourcegraph.com/sourcegraph/appdash-data => github.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,10 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:IJ3DuWHPTJrsqtIqjfdmPTELdTFGefvrOa2eTeRBleQ=
github.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:V952P4GGl1v/MMynLwxVdWEbSZJx+n0oOO3ljnez+WU=
github.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 h1:8ZnTA26bBOoPkAbbitKPgNlpw0Bwt7ZlpYgZWHWJR/w=
github.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:tNZjgbYncKL5HxvDULAr/mWDmFz4B7H8yrXEDlnoIiw=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand Down Expand Up @@ -993,8 +997,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20231116065855-46811b6ac353 h1:3yZMlIPxpFEepJOJVq64e/ulhyON3cOoRynYGEOGBP8=
github.com/tikv/client-go/v2 v2.0.8-0.20231116065855-46811b6ac353/go.mod h1:0W23EJEJejgM/75ZhMaRLokR/OMlGymyGrB2/gvfuvs=
github.com/tikv/pd/client v0.0.0-20231116062916-ef6ba8551e52 h1:wucAo/ks8INgayRVfbrzZ+BSWEwRLETj0XfngDcrZ4k=
github.com/tikv/pd/client v0.0.0-20231116062916-ef6ba8551e52/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/pd/client v0.0.0-20240102100501-7ce5860ab342 h1:MHYipM+19XLf/QPM8xsjDW3+p2/aOnBqPh+GnO9LDd8=
github.com/tikv/pd/client v0.0.0-20240102100501-7ce5860ab342/go.mod h1:AwjTSpM7CgAynYwB6qTG5R5fVC9/eXlQXiTO6zDL1HI=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
Expand Down Expand Up @@ -1651,9 +1655,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ih
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:ucqkfpjg9WzSUubAO62csmucvxl4/JeW3F4I4909XkM=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 h1:e1sMhtVq9AfcEy8AXNb8eSg6gbzfdpYhoNqnPJa+GzI=
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
47 changes: 31 additions & 16 deletions pkg/meta/autoid/autoid_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -56,6 +57,8 @@ type ClientDiscover struct {
// See https://github.com/grpc/grpc-go/issues/5321
*grpc.ClientConn
}
// version is increased in every ResetConn() to make the operation safe.
version uint64
}

const (
Expand All @@ -70,27 +73,27 @@ func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover {
}

// GetClient gets the AutoIDAllocClient.
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) {
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, uint64, error) {
d.mu.RLock()
cli := d.mu.AutoIDAllocClient
if cli != nil {
d.mu.RUnlock()
return cli, nil
return cli, atomic.LoadUint64(&d.version), nil
}
d.mu.RUnlock()

d.mu.Lock()
defer d.mu.Unlock()
if d.mu.AutoIDAllocClient != nil {
return d.mu.AutoIDAllocClient, nil
return d.mu.AutoIDAllocClient, atomic.LoadUint64(&d.version), nil
}

resp, err := d.etcdCli.Get(ctx, autoIDLeaderPath, clientv3.WithFirstCreate()...)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return nil, errors.New("autoid service leader not found")
return nil, 0, errors.New("autoid service leader not found")
}

addr := string(resp.Kvs[0].Value)
Expand All @@ -100,19 +103,19 @@ func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClien
clusterSecurity := security.ClusterSecurity()
tlsConfig, err := clusterSecurity.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
logutil.BgLogger().Info("connect to leader", zap.String("category", "autoid client"), zap.String("addr", addr))
grpcConn, err := grpc.Dial(addr, opt)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
cli = autoid.NewAutoIDAllocClient(grpcConn)
d.mu.AutoIDAllocClient = cli
d.mu.ClientConn = grpcConn
return cli, nil
return cli, atomic.LoadUint64(&d.version), nil
}

// Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch.
Expand All @@ -130,7 +133,7 @@ func (sp *singlePointAlloc) Alloc(ctx context.Context, n uint64, increment, offs
}

retry:
cli, err := sp.GetClient(ctx)
cli, ver, err := sp.GetClient(ctx)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand All @@ -149,7 +152,7 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.ResetConn(err)
sp.resetConn(ver, err)
goto retry
}
return 0, 0, errors.Trace(err)
Expand All @@ -166,6 +169,14 @@ retry:

const backoffDuration = 200 * time.Millisecond

func (d *ClientDiscover) resetConn(version uint64, reason error) {
// Avoid repeated Reset operation
if !atomic.CompareAndSwapUint64(&d.version, version, version+1) {
return
}
d.ResetConn(reason)
}

// ResetConn reset the AutoIDAllocClient and underlying grpc connection.
// The next GetClient() call will recreate the client connecting to the correct leader by querying etcd.
func (d *ClientDiscover) ResetConn(reason error) {
Expand All @@ -181,10 +192,14 @@ func (d *ClientDiscover) ResetConn(reason error) {
d.mu.Unlock()
// Close grpc.ClientConn to release resource.
if grpcConn != nil {
err := grpcConn.Close()
if err != nil {
logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err))
}
go func() {
// Doen't close the conn immediately, in case the other sessions are still using it.
time.Sleep(200 * time.Millisecond)
err := grpcConn.Close()
if err != nil {
logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err))
}
}()
}
}

Expand All @@ -210,7 +225,7 @@ func (sp *singlePointAlloc) Rebase(ctx context.Context, newBase int64, _ bool) e

func (sp *singlePointAlloc) rebase(ctx context.Context, newBase int64, force bool) error {
retry:
cli, err := sp.GetClient(ctx)
cli, ver, err := sp.GetClient(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -225,7 +240,7 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.ResetConn(err)
sp.resetConn(ver, err)
goto retry
}
return errors.Trace(err)
Expand Down