Skip to content

Commit

Permalink
Merge branch 'master' into add-background
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 5, 2023
2 parents 0d22ad3 + ce5010d commit e48b8dd
Show file tree
Hide file tree
Showing 53 changed files with 737 additions and 233 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ report.xml
coverage.xml
coverage
*.txt
go.work*
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ test-tso-consistency: install-tools
TASK_COUNT=1
TASK_ID=1

# The command should be used in daily CIit will split some tasks to run parallel.
# The command should be used in daily CI, it will split some tasks to run parallel.
# It should retain report.xml,coverage,coverage.xml and package.list to analyze.
test-with-cover-parallel: install-tools dashboard-ui split
@$(FAILPOINT_ENABLE)
Expand Down
36 changes: 17 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,6 @@ type serviceModeKeeper struct {
tsoSvcDiscovery ServiceDiscovery
}

func (k *serviceModeKeeper) SetKeyspaceID(keyspaceID uint32) {
k.Lock()
defer k.Unlock()
if k.serviceMode == pdpb.ServiceMode_API_SVC_MODE {
k.tsoSvcDiscovery.SetKeyspaceID(keyspaceID)
}
}

func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
Expand Down Expand Up @@ -392,7 +384,7 @@ func createClientWithKeyspace(

c.pdSvcDiscovery = newPDServiceDiscovery(
clientCtx, clientCancel, &c.wg, c.setServiceMode,
keyspaceID, c.svrUrls, c.tlsCfg, c.option)
nil, keyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
return nil, err
Expand Down Expand Up @@ -504,36 +496,42 @@ func newClientWithKeyspaceName(
opt(c)
}

updateKeyspaceIDCb := func() error {
if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil {
return err
}
// c.keyspaceID is the source of truth for keyspace id.
c.pdSvcDiscovery.(*pdServiceDiscovery).SetKeyspaceID(c.keyspaceID)
return nil
}

// Create a PD service discovery with null keyspace id, then query the real id wth the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
c.pdSvcDiscovery = newPDServiceDiscovery(
clientCtx, clientCancel, &c.wg, c.setServiceMode, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option)
clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
return nil, err
}
if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil {
return nil, err
}
// We call "c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)" after service mode already switching to API mode
// and tso service discovery already initialized, so here we need to set the tso_service_discovery's keyspace id too.
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)
c.serviceModeKeeper.SetKeyspaceID(c.keyspaceID)
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), zap.Uint32("keyspace-id", c.keyspaceID))
zap.Strings("pd-address", svrAddrs),
zap.String("keyspace-name", keyspaceName),
zap.Uint32("keyspace-id", c.keyspaceID))
return c, nil
}

func (c *client) initRetry(f func(s string) error, str string) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(str); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/tikv/pd/client
go 1.20

require (
github.com/elastic/gosigar v0.14.2
github.com/cloudfoundry/gosigar v1.3.6
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
Expand Down
14 changes: 10 additions & 4 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudfoundry/gosigar v1.3.6 h1:gIc08FbB3QPb+nAQhINIK/qhf5REKkY0FTGgRGXkcVc=
github.com/cloudfoundry/gosigar v1.3.6/go.mod h1:lNWstu5g5gw59O09Y+wsMNFzBSnU8a0u+Sfx4dq360E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4=
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
Expand Down Expand Up @@ -74,6 +75,9 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
Expand Down Expand Up @@ -174,7 +178,6 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -210,6 +213,7 @@ golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
Expand All @@ -229,16 +233,18 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
22 changes: 17 additions & 5 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ type ServiceDiscovery interface {
GetClusterID() uint64
// GetKeyspaceID returns the ID of the keyspace
GetKeyspaceID() uint32
// SetKeyspaceID sets the ID of the keyspace
SetKeyspaceID(keyspaceID uint32)
// GetKeyspaceGroupID returns the ID of the keyspace group
GetKeyspaceGroupID() uint32
// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
Expand Down Expand Up @@ -99,6 +97,7 @@ type ServiceDiscovery interface {
AddServiceAddrsSwitchedCallback(callbacks ...func())
}

type updateKeyspaceIDFunc func() error
type tsoLocalServAddrsUpdatedFunc func(map[string]string) error
type tsoGlobalServAddrUpdatedFunc func(string) error

Expand Down Expand Up @@ -149,8 +148,9 @@ type pdServiceDiscovery struct {
cancel context.CancelFunc
closeOnce sync.Once

keyspaceID uint32
tlsCfg *tlsutil.TLSConfig
updateKeyspaceIDCb updateKeyspaceIDFunc
keyspaceID uint32
tlsCfg *tlsutil.TLSConfig
// Client option.
option *option
}
Expand All @@ -160,6 +160,7 @@ func newPDServiceDiscovery(
ctx context.Context, cancel context.CancelFunc,
wg *sync.WaitGroup,
serviceModeUpdateCb func(pdpb.ServiceMode),
updateKeyspaceIDCb updateKeyspaceIDFunc,
keyspaceID uint32,
urls []string, tlsCfg *tlsutil.TLSConfig, option *option,
) *pdServiceDiscovery {
Expand All @@ -169,6 +170,7 @@ func newPDServiceDiscovery(
cancel: cancel,
wg: wg,
serviceModeUpdateCb: serviceModeUpdateCb,
updateKeyspaceIDCb: updateKeyspaceIDCb,
keyspaceID: keyspaceID,
tlsCfg: tlsCfg,
option: option,
Expand All @@ -192,6 +194,14 @@ func (c *pdServiceDiscovery) Init() error {
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

// We need to update the keyspace ID before we discover and update the service mode
// so that TSO in API mode can be initialized with the correct keyspace ID.
if c.updateKeyspaceIDCb != nil {
if err := c.updateKeyspaceIDCb(); err != nil {
return err
}
}

if err := c.checkServiceModeChanged(); err != nil {
log.Warn("[pd] failed to check service mode and will check later", zap.Error(err))
}
Expand All @@ -206,14 +216,16 @@ func (c *pdServiceDiscovery) Init() error {

func (c *pdServiceDiscovery) initRetry(f func() error) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type ResourceGroupKVInterceptor interface {
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
}

// ResourceGroupProvider provides some api to interact with resource manager server
// ResourceGroupProvider provides some api to interact with resource manager server.
type ResourceGroupProvider interface {
GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error)
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (r *Reservation) CancelAt(now time.Time) {

// Reserve returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// The returned Reservations OK() method returns false if wait duration exceeds deadline.
// The returned Reservation's OK() method returns false if wait duration exceeds deadline.
// Usage example:
//
// r := lim.Reserve(time.Now(), 1)
Expand Down
6 changes: 3 additions & 3 deletions client/resource_group/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"os"
"time"

"github.com/elastic/gosigar"
"github.com/pingcap/log"
"github.com/cloudfoundry/gosigar"
"go.uber.org/zap"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
)

// RequestUnit is the basic unit of the resource request management, which has two types:
Expand Down Expand Up @@ -265,7 +265,7 @@ func getSQLProcessCPUTime(isSingleGroupByKeyspace bool) float64 {

func getSysProcessCPUTime() float64 {
pid := os.Getpid()
cpuTime := gosigar.ProcTime{}
cpuTime := sigar.ProcTime{}
if err := cpuTime.Get(pid); err != nil {
log.Error("getCPUTime get pid failed", zap.Error(err))
}
Expand Down
4 changes: 3 additions & 1 deletion client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
err error
stream rmpb.ResourceManager_AcquireTokenBucketsClient
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
cc, err := c.resourceManagerClient()
if err != nil {
Expand All @@ -406,7 +408,7 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
select {
case <-ctx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}
return err
Expand Down
43 changes: 43 additions & 0 deletions client/timerpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"sync"
"time"
)

// GlobalTimerPool is a global pool for reusing *time.Timer.
var GlobalTimerPool TimerPool

// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse.
type TimerPool struct {
pool sync.Pool
}

// Get returns a timer with a given duration.
func (tp *TimerPool) Get(d time.Duration) *time.Timer {
if v := tp.pool.Get(); v != nil {
timer := v.(*time.Timer)
timer.Reset(d)
return timer
}
return time.NewTimer(d)
}

// Put tries to call timer.Stop() before putting it back into pool,
// if the timer.Stop() returns false (it has either already expired or been stopped),
// have a shot at draining the channel with residual time if there is one.
func (tp *TimerPool) Put(timer *time.Timer) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
tp.pool.Put(timer)
}
Loading

0 comments on commit e48b8dd

Please sign in to comment.