Skip to content

Commit

Permalink
support tso gRPC interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jan 11, 2023
1 parent 59a6203 commit b57953b
Show file tree
Hide file tree
Showing 14 changed files with 3,031 additions and 140 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type Client interface {
KeyspaceClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
// TsoClient manages TSO data.
TsoClient
// Close closes the client.
Close()
}
Expand Down
6 changes: 4 additions & 2 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ require (
github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.1
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.43.0
google.golang.org/grpc v1.51.0
)

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230111052323-b72b694bbb30
933 changes: 912 additions & 21 deletions client/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package pd

import (
"context"
"go.uber.org/zap"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down
4 changes: 4 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ var (
cmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions")
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
cmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
cmdDurationLoadTimestamp = cmdDuration.WithLabelValues("load_timestamp")
cmdDurationSaveTimestamp = cmdDuration.WithLabelValues("save_timestamp")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -113,6 +115,8 @@ var (
cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point")
cmdFailedDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
requestDurationTSO = requestDuration.WithLabelValues("tso")
cmdFailedDurationLoadTimestamp = cmdDuration.WithLabelValues("load_timestamp")
cmdFailedDurationSaveTimestamp = cmdDuration.WithLabelValues("save_timestamp")
)

func init() {
Expand Down
105 changes: 105 additions & 0 deletions client/tso_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/tikv/pd/client/grpcutil"
"google.golang.org/grpc"
)

// TsoClient manages TSO data.
type TsoClient interface {
// LoadTimestamp loads and returns timestamp.
LoadTimestamp(ctx context.Context, key string) (uint64, error)
// SaveTimestamp saves the timestamp.
SaveTimestamp(ctx context.Context, key string, timestamp uint64) error
}

// tsoClient returns the TsoClient from current PD leader.
func (c *client) tsoClient() tsopb.TsoClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return tsopb.NewTsoClient(cc.(*grpc.ClientConn))
}
return nil
}

// LoadTimestamp loads and returns timestamp.
func (c *client) LoadTimestamp(ctx context.Context, key string) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("tsoClient.LoadTimestamp", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationLoadTimestamp.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &tsopb.LoadTimestampRequest{
Header: c.requestHeader(),
Key: key,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.tsoClient().LoadTimestamp(ctx, req)
cancel()

if err != nil {
cmdFailedDurationLoadTimestamp.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, err
}

if resp.Header.GetError() != nil {
cmdFailedDurationLoadTimestamp.Observe(time.Since(start).Seconds())
return 0, errors.Errorf("Load timestamp %s failed: %s", key, resp.Header.GetError().String())
}

return resp.Timestamp, nil
}

// SaveTimestamp saves the timestamp.
func (c *client) SaveTimestamp(ctx context.Context, key string, timestamp uint64) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("tsoClient.SaveTimestamp", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationSaveTimestamp.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &tsopb.SaveTimestampRequest{
Header: c.requestHeader(),
Key: key,
Timestamp: timestamp,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.tsoClient().SaveTimestamp(ctx, req)
cancel()

if err != nil {
cmdFailedDurationSaveTimestamp.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return err
}

if resp.Header.GetError() != nil {
cmdFailedDurationSaveTimestamp.Observe(time.Since(start).Seconds())
return errors.Errorf("Save timestamp %d to %s failed: %s", timestamp, key, resp.Header.GetError().String())
}

return nil
}
47 changes: 23 additions & 24 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/sasha-s/go-deadlock v0.2.0
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.1
github.com/swaggo/http-swagger v0.0.0-20200308142732-58ac5e232fba
github.com/swaggo/swag v1.8.3
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
Expand All @@ -44,19 +44,14 @@ require (
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.19.1
golang.org/x/text v0.3.7
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/tools v0.1.10
google.golang.org/grpc v1.26.0
golang.org/x/text v0.4.0
golang.org/x/time v0.1.0
golang.org/x/tools v0.1.12
google.golang.org/grpc v1.51.0
gotest.tools/gotestsum v1.7.0
)

require (
github.com/goccy/go-json v0.9.7 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/onsi/gomega v1.20.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
)
require google.golang.org/protobuf v1.28.1 // indirect

require (
github.com/KyleBanks/depth v1.2.1 // indirect
Expand Down Expand Up @@ -95,16 +90,17 @@ require (
github.com/go-resty/resty/v2 v2.6.0 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/goccy/go-graphviz v0.0.9 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/golang-jwt/jwt v3.2.1+incompatible // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.12.1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69 // indirect
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand All @@ -130,6 +126,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/oleiade/reflections v1.0.1 // indirect
github.com/olekukonko/tablewriter v0.0.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -144,7 +141,7 @@ require (
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 // indirect
github.com/thoas/go-funk v0.8.0 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
Expand All @@ -165,16 +162,14 @@ require (
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/exp v0.0.0-20220321173239-a90fa8a75705
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c // indirect
google.golang.org/protobuf v1.28.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/term v0.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand All @@ -189,3 +184,7 @@ require (
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230111052323-b72b694bbb30

replace google.golang.org/grpc v1.51.0 => google.golang.org/grpc v1.26.0
Loading

0 comments on commit b57953b

Please sign in to comment.