Skip to content

Commit

Permalink
Support multi-keyspace-group in PD(TSO) client
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Apr 14, 2023
1 parent 4201238 commit 83ea22b
Show file tree
Hide file tree
Showing 20 changed files with 3,561 additions and 69 deletions.
14 changes: 11 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ import (
const (
// defaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap and reserved for users who haven't been assigned keyspace.
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// and reserved for users who haven't been assigned keyspace.
defaultKeyspaceID = uint32(0)
// defaultKeySpaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
defaultKeySpaceGroupID = uint32(0)
)

// Region contains information of a region's meta and its peers.
Expand Down Expand Up @@ -380,6 +384,7 @@ func (c *client) Close() {
func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()

if newMode == c.serviceMode {
return
}
Expand All @@ -398,11 +403,15 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(c.ctx, MetaStorageClient(c),
c.GetClusterID(c.ctx), c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", c.serviceMode.String()), zap.Error(err))
zap.Strings("svr-urls", c.svrUrls),
zap.String("current-mode", c.serviceMode.String()),
zap.Error(err))
return
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down Expand Up @@ -601,7 +610,6 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
req.start = time.Now()
req.keyspaceID = c.keyspaceID
req.dcLocation = dcLocation

if tsoClient == nil {
Expand Down
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/binshi-bing/kvproto v0.0.0-20230414013749-4846a6711e9d
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/binshi-bing/kvproto v0.0.0-20230414013749-4846a6711e9d h1:k50EdcwJYKXooPbLabSfUSOdLZbLRgo6JTacUeLFlqU=
github.com/binshi-bing/kvproto v0.0.0-20230414013749-4846a6711e9d/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -82,8 +84,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a h1:PWkMSJSDaOuLNKCV84K3tQ9stZuZPN8E148jRPD9TcA=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
1 change: 0 additions & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type tsoRequest struct {
done chan error
physical int64
logical int64
keyspaceID uint32
dcLocation string
}

Expand Down
13 changes: 9 additions & 4 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,8 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
cctx, cancel := context.WithCancel(dispatcherCtx)
// Do not proxy the leader client.
if addr != leaderAddr {
log.Info("[tso] use follower to forward tso stream to do the proxy", zap.String("dc", dc), zap.String("addr", addr))
log.Info("[tso] use follower to forward tso stream to do the proxy",
zap.String("dc", dc), zap.String("addr", addr))
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
}
// Create the TSO stream.
Expand All @@ -676,7 +677,8 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel})
continue
}
log.Error("[tso] create the tso stream failed", zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err))
log.Error("[tso] create the tso stream failed",
zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err))
cancel()
}
return nil
Expand All @@ -691,15 +693,18 @@ func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanO
return opts
}

func (c *tsoClient) processRequests(stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption) error {
func (c *tsoClient) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption,
) error {
if len(opts) > 0 {
span := opentracing.StartSpan("pdclient.processRequests", opts...)
defer span.Finish()
}

requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(c.svcDiscovery.GetClusterID(), dcLocation, requests, tbc.batchStartTime)
physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.keyspaceID, c.keyspaceGroupID, dcLocation, requests, tbc.batchStartTime)
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
return err
Expand Down
3 changes: 2 additions & 1 deletion client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ func (c *tsoServiceDiscovery) switchPrimary(addrs []string) error {
func (c *tsoServiceDiscovery) updateMember() error {
resp, err := c.metacli.Get(c.ctx, []byte(c.primaryKey))
if err != nil {
log.Error("[tso] failed to get the keyspace serving endpoint", zap.String("primary-key", c.primaryKey), errs.ZapError(err))
log.Error("[tso] failed to get the keyspace serving endpoint",
zap.String("primary-key", c.primaryKey), errs.ZapError(err))
return err
}

Expand Down
25 changes: 17 additions & 8 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ type tsoTSOStreamBuilder struct {
client tsopb.TSOClient
}

func (b *tsoTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
func (b *tsoTSOStreamBuilder) build(
ctx context.Context, cancel context.CancelFunc, timeout time.Duration,
) (tsoStream, error) {
done := make(chan struct{})
// TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created.
go checkStreamTimeout(ctx, cancel, done, timeout)
Expand All @@ -97,16 +99,19 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha

type tsoStream interface {
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error)
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error)
}

type pdTSOStream struct {
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error) {
func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Expand Down Expand Up @@ -152,13 +157,17 @@ type tsoTSOStream struct {
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error) {
func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
ClusterId: clusterID,
KeyspaceId: keyspaceID,
KeyspaceGroupId: keyspaceGroupID,
},
Count: uint32(count),
DcLocation: dcLocation,
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,5 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/pingcap/kvproto => github.com/binshi-bing/kvproto v0.0.0-20230414013749-4846a6711e9d
Loading

0 comments on commit 83ea22b

Please sign in to comment.