Skip to content

Commit

Permalink
client: both Support names and config path (#5886)
Browse files Browse the repository at this point in the history
close #5885, ref #5887, ref #5888, ref pingcap/tiflow#8110

The global config needs to support both names and configuration paths, not only for compatibility but also to make more sense.

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HuSharp and ti-chi-bot authored Feb 2, 2023
1 parent 4189e94 commit 18df271
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 45 deletions.
29 changes: 21 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type GlobalConfigItem struct {
EventType pdpb.EventType
Name string
Value string
PayLoad []byte
}

// Client is a PD (Placement Driver) client.
Expand Down Expand Up @@ -122,7 +123,7 @@ type Client interface {
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)

// LoadGlobalConfig gets the global config from etcd
LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error)
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
Expand Down Expand Up @@ -419,7 +420,7 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi
}
// Start the daemons.
c.updateTSODispatcher()
c.createTokenispatcher()
c.createTokenDispatcher()
c.wg.Add(3)
go c.tsLoop()
go c.tsCancelLoop()
Expand Down Expand Up @@ -1820,16 +1821,22 @@ func trimHTTPPrefix(str string) string {
return str
}

func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]GlobalConfigItem, int64, error) {
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{ConfigPath: configPath})
func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath})
if err != nil {
return nil, 0, err
}

res := make([]GlobalConfigItem, len(resp.GetItems()))
for i, item := range resp.GetItems() {
cfg := GlobalConfigItem{Name: item.GetName()}
cfg.Value = item.GetValue()
cfg := GlobalConfigItem{Name: item.GetName(), EventType: item.GetKind(), PayLoad: item.GetPayload()}
if item.GetValue() == "" {
// We need to keep the Value field for CDC compatibility.
// But if you not use `Names`, will only have `Payload` field.
cfg.Value = string(item.GetPayload())
} else {
cfg.Value = item.GetValue()
}
res[i] = cfg
}
return res, resp.GetRevision(), nil
Expand All @@ -1838,7 +1845,7 @@ func (c *client) LoadGlobalConfig(ctx context.Context, configPath string) ([]Glo
func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error {
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType}
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad}
}
_, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath})
if err != nil {
Expand Down Expand Up @@ -1874,7 +1881,13 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue()}
// We need to keep the Value field for CDC compatibility.
// But if you not use `Names`, will only have `Payload` field.
if i.GetValue() == "" {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), string(i.GetPayload()), i.GetPayload()}
} else {
arr[j] = GlobalConfigItem{i.GetKind(), i.GetName(), i.GetValue(), i.GetPayload()}
}
}
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 h1:oYUK4V5PMlyIooU/+pPkKrJ3vELwcuuCNyKKlqSQa5c=
github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion client/resourcemanager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ type resourceManagerConnectionContext struct {
cancel context.CancelFunc
}

func (c *client) createTokenispatcher() {
func (c *client) createTokenDispatcher() {
dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx)
dispatcher := &tokenDispatcher{
dispatcherCancel: dispatcherCancel,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7 h1:oYUK4V5PMlyIooU/+pPkKrJ3vELwcuuCNyKKlqSQa5c=
github.com/pingcap/kvproto v0.0.0-20230131104319-a7c51106dfe7/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
Expand Down
57 changes: 50 additions & 7 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io"
"path"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1881,16 +1882,30 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
<-done
}

// for CDC compatibility, we need to initialize config path to `globalConfigPath`
const globalConfigPath = "/global/config/"

// StoreGlobalConfig store global config into etcd by transaction
// Since item value needs to support marshal of different struct types,
// it should be set to `Payload bytes` instead of `Value string`
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
configPath := request.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := item.GetName()
name := path.Join(configPath, item.GetName())
switch item.GetKind() {
case pdpb.EventType_PUT:
ops[i] = clientv3.OpPut(request.GetConfigPath()+name, item.GetValue())
// For CDC compatibility, we need to check the Value field firstly.
value := item.GetValue()
if value == "" {
value = string(item.GetPayload())
}
ops[i] = clientv3.OpPut(name, value)
case pdpb.EventType_DELETE:
ops[i] = clientv3.OpDelete(request.GetConfigPath() + name)
ops[i] = clientv3.OpDelete(name)
}
}
res, err :=
Expand All @@ -1904,16 +1919,38 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo
return &pdpb.StoreGlobalConfigResponse{}, nil
}

// LoadGlobalConfig load global config from etcd
// LoadGlobalConfig support 2 ways to load global config from etcd
// - `Names` iteratively get value from `ConfigPath/Name` but not care about revision
// - `ConfigPath` if `Names` is nil can get all values and revision of current path
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
configPath := request.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
// Since item value needs to support marshal of different struct types,
// it should be set to `Payload bytes` instead of `Value string`.
if request.Names != nil {
res := make([]*pdpb.GlobalConfigItem, len(request.Names))
for i, name := range request.Names {
r, err := s.client.Get(ctx, path.Join(configPath, name))
if err != nil {
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
} else if len(r.Kvs) == 0 {
msg := "key " + name + " not found"
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Payload: r.Kvs[0].Value, Kind: pdpb.EventType_PUT}
}
}
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
}
r, err := s.client.Get(ctx, configPath, clientv3.WithPrefix())
if err != nil {
return &pdpb.LoadGlobalConfigResponse{}, err
}
res := make([]*pdpb.GlobalConfigItem, len(r.Kvs))
for i, value := range r.Kvs {
res[i] = &pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: string(value.Key), Value: string(value.Value)}
res[i] = &pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: string(value.Key), Payload: value.Value}
}
return &pdpb.LoadGlobalConfigResponse{Items: res, Revision: r.Header.GetRevision()}, nil
}
Expand All @@ -1924,11 +1961,15 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo
func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
configPath := req.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
revision := req.GetRevision()
// If the revision is compacted, will meet required revision has been compacted error.
// - If required revision < CompactRevision, we need to reload all configs to avoid losing data.
// - If required revision >= CompactRevision, just keep watching.
watchChan := s.client.Watch(ctx, req.GetConfigPath(), clientv3.WithPrefix(), clientv3.WithRev(revision))
watchChan := s.client.Watch(ctx, configPath, clientv3.WithPrefix(), clientv3.WithRev(revision))
for {
select {
case <-ctx.Done():
Expand All @@ -1947,7 +1988,9 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve

cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
for _, e := range res.Events {
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value), Kind: pdpb.EventType(e.Type)})
// Since item value needs to support marshal of different struct types,
// it should be set to `Payload bytes` instead of `Value string`.
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Payload: e.Kv.Value, Kind: pdpb.EventType(e.Type)})
}
if len(cfgs) > 0 {
if err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs, Revision: res.Header.GetRevision()}); err != nil {
Expand Down
Loading

0 comments on commit 18df271

Please sign in to comment.