Skip to content

Commit

Permalink
support tso gRPC interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jan 16, 2023
1 parent be017f9 commit 4103913
Show file tree
Hide file tree
Showing 16 changed files with 1,169 additions and 44 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type Client interface {
KeyspaceClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
// TsoClient manages TSO data.
TsoClient
// Close closes the client.
Close()
}
Expand Down
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ require (
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.51.0
)

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230112032800-8743513f0807
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230112063313-a14c44ef44b3 h1:KwHZORXPRl/tSxZsCpOL4PKs9Kfh2pDz32cK5f0BkYA=
github.com/pingcap/kvproto v0.0.0-20230112063313-a14c44ef44b3/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -590,6 +588,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rleungx/kvproto v0.0.0-20230112032800-8743513f0807 h1:iYc7qd2NSSCx9Zpl9siRriPMJjBz+nOHHYaCpDXJda0=
github.com/rleungx/kvproto v0.0.0-20230112032800-8743513f0807/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
Expand Down
4 changes: 4 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ var (
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
cmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
cmdDurationLoadTimestamp = cmdDuration.WithLabelValues("load_timestamp")
cmdDurationSaveTimestamp = cmdDuration.WithLabelValues("save_timestamp")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -115,6 +117,8 @@ var (
cmdFailedDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
cmdFailedDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
requestDurationTSO = requestDuration.WithLabelValues("tso")
cmdFailedDurationLoadTimestamp = cmdDuration.WithLabelValues("load_timestamp")
cmdFailedDurationSaveTimestamp = cmdDuration.WithLabelValues("save_timestamp")
)

func init() {
Expand Down
109 changes: 109 additions & 0 deletions client/tso_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/tikv/pd/client/grpcutil"
"google.golang.org/grpc"
)

// TsoClient manages TSO data.
type TsoClient interface {
// LoadTimestamp loads and returns timestamp.
LoadTimestamp(ctx context.Context, key string) (uint64, error)
// SaveTimestamp saves the timestamp.
SaveTimestamp(ctx context.Context, key string, timestamp uint64, skipCheck bool, lastTimestamp ...uint64) error
}

// tsoClient returns the TsoClient from current PD leader.
func (c *client) tsoClient() tsopb.TsoClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return tsopb.NewTsoClient(cc.(*grpc.ClientConn))
}
return nil
}

// LoadTimestamp loads and returns timestamp.
func (c *client) LoadTimestamp(ctx context.Context, key string) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("tsoClient.LoadTimestamp", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationLoadTimestamp.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &tsopb.LoadTimestampRequest{
Header: c.requestHeader(),
Key: key,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.tsoClient().LoadTimestamp(ctx, req)
cancel()

if err != nil {
cmdFailedDurationLoadTimestamp.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, err
}

if resp.Header.GetError() != nil {
cmdFailedDurationLoadTimestamp.Observe(time.Since(start).Seconds())
return 0, errors.Errorf("Load timestamp %s failed: %s", key, resp.Header.GetError().String())
}

return resp.Timestamp, nil
}

// SaveTimestamp saves the timestamp.
func (c *client) SaveTimestamp(ctx context.Context, key string, timestamp uint64, skipCheck bool, lastTimestamp ...uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("tsoClient.SaveTimestamp", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationSaveTimestamp.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &tsopb.SaveTimestampRequest{
Header: c.requestHeader(),
Key: key,
Timestamp: timestamp,
SkipCheck: skipCheck,
}
if !skipCheck && len(lastTimestamp) != 0 {
req.LastTimestamp = lastTimestamp[0]
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.tsoClient().SaveTimestamp(ctx, req)
cancel()

if err != nil {
cmdFailedDurationSaveTimestamp.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return err
}

if resp.Header.GetError() != nil {
cmdFailedDurationSaveTimestamp.Observe(time.Since(start).Seconds())
return errors.Errorf("Save timestamp %d to %s failed: %s", timestamp, key, resp.Header.GetError().String())
}

return nil
}
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.1
github.com/swaggo/http-swagger v0.0.0-20200308142732-58ac5e232fba
github.com/swaggo/swag v1.8.3
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
Expand All @@ -46,7 +46,7 @@ require (
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.19.1
golang.org/x/text v0.4.0
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/time v0.1.0
golang.org/x/tools v0.1.12
google.golang.org/grpc v1.51.0
gotest.tools/gotestsum v1.7.0
Expand Down Expand Up @@ -94,10 +94,9 @@ require (
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down Expand Up @@ -142,7 +141,7 @@ require (
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 // indirect
github.com/thoas/go-funk v0.8.0 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
Expand All @@ -165,11 +164,11 @@ require (
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/term v0.2.0 // indirect
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand All @@ -188,3 +187,5 @@ replace google.golang.org/grpc v1.51.0 => google.golang.org/grpc v1.26.0
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230112032800-8743513f0807
Loading

0 comments on commit 4103913

Please sign in to comment.