Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement GlobalConfig for grpc server and client #4308

Merged
merged 6 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ type Region struct {
PendingPeers []*metapb.Peer
}

// GlobalConfigItem standard format of KV pair in GlobalConfig client
type GlobalConfigItem struct {
LemonHX marked this conversation as resolved.
Show resolved Hide resolved
Name string
Value string
Error error
}

// Client is a PD (Placement Driver) client.
// It should not be used after calling Close().
type Client interface {
Expand Down Expand Up @@ -109,6 +116,13 @@ type Client interface {
SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error)
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)

// LoadGlobalConfig gets the global config from etcd
LoadGlobalConfig(ctx context.Context, names []string) ([]GlobalConfigItem, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error
// Close closes the client.
Expand Down Expand Up @@ -307,6 +321,8 @@ var (
errClosing = errors.New("[pd] closing")
// errTSOLength is returned when the number of response timestamps is inconsistent with request.
errTSOLength = errors.New("[pd] tso length in rpc response is incorrect")
// errGlobalConfigNotFound is returned when etcd does not contain the globalConfig item
errGlobalConfigNotFound = errors.New("[pd] global config not found")
)

// ClientOption configures client.
Expand Down Expand Up @@ -1745,3 +1761,76 @@ func trimHTTPPrefix(str string) string {
str = strings.TrimPrefix(str, "https://")
return str
}

func (c *client) LoadGlobalConfig(ctx context.Context, names []string) ([]GlobalConfigItem, error) {
resp, err := c.getClient().LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names})
if err != nil {
return nil, err
}
res := make([]GlobalConfigItem, len(resp.GetItems()))
for i, item := range resp.GetItems() {
cfg := GlobalConfigItem{Name: item.GetName()}
if item.Error != nil {
LemonHX marked this conversation as resolved.
Show resolved Hide resolved
if item.Error.Type == pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND {
cfg.Error = errGlobalConfigNotFound
} else {
cfg.Error = errors.New("[pd]" + item.Error.Message)
}
} else {
cfg.Value = item.GetValue()
}
res[i] = cfg
}
return res, nil
}

func (c *client) StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error {
resArr := make([]*pdpb.GlobalConfigItem, len(items))
for i, it := range items {
resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value}
}
res, err := c.getClient().StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr})
if err != nil {
return err
}
LemonHX marked this conversation as resolved.
Show resolved Hide resolved
resErr := res.GetError()
if resErr != nil {
return errors.Errorf("[pd]" + resErr.Message)
}
return err
}

func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error) {
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
res, err := c.getClient().WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{})
if err != nil {
close(globalConfigWatcherCh)
return nil, err
}
go func() {
LemonHX marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if r := recover(); r != nil {
log.Error("[pd] panic in client `WatchGlobalConfig`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
close(globalConfigWatcherCh)
return
default:
m, err := res.Recv()
if err != nil {
return
}
arr := make([]GlobalConfigItem, len(m.Changes))
for j, i := range m.Changes {
arr[j] = GlobalConfigItem{i.GetName(), i.GetValue(), nil}
}
globalConfigWatcherCh <- arr
}
}
}()
return globalConfigWatcherCh, err
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20211206031355-bcc43a01d537
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc h1:6goyJr/7qam8KgDLgOd3k2BQAjtPlg+w22YdgClBlIk=
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c h1:jrPg+QFqQ7VyI30SPzB0ZviHCvDGyZHiASz6Bgomxi0=
github.com/pingcap/kvproto v0.0.0-20211213085605-3329b3c5404c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
87 changes: 87 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/kv"
"github.com/tikv/pd/server/tso"
"github.com/tikv/pd/server/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -1698,3 +1700,88 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}
<-done
}

const globalConfigPath = "/global/config/"

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
value := item.GetValue()
ops[i] = clientv3.OpPut(name, value)
}
res, err :=
kv.NewSlowLogTxn(s.client).Then(ops...).Commit()
if err != nil {
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}, err
}
if !res.Succeeded {
return &pdpb.StoreGlobalConfigResponse{Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: "failed to execute StoreGlobalConfig transaction"}}, errors.Errorf("failed to execute StoreGlobalConfig transaction")
}
return &pdpb.StoreGlobalConfigResponse{}, err
LemonHX marked this conversation as resolved.
Show resolved Hide resolved
}

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
names := request.Names
res := make([]*pdpb.GlobalConfigItem, len(names))
for i, name := range names {
r, err := s.client.Get(ctx, globalConfigPath+name)
LemonHX marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
} else if len(r.Kvs) == 0 {
msg := "key " + name + " not found"
LemonHX marked this conversation as resolved.
Show resolved Hide resolved
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)}
}
}
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
}

// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(request *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
if err != nil {
return err
}
watchChan := s.client.Watch(ctx, globalConfigPath, clientv3.WithPrefix())
for {
select {
case <-ctx.Done():
rleungx marked this conversation as resolved.
Show resolved Hide resolved
return nil
case res := <-watchChan:
cfgs := make([]*pdpb.GlobalConfigItem, 0, len(res.Events))
for _, e := range res.Events {
if e.Type != clientv3.EventTypePut {
continue
}
cfgs = append(cfgs, &pdpb.GlobalConfigItem{Name: string(e.Kv.Key), Value: string(e.Kv.Value)})
}
if len(cfgs) > 0 {
err := server.Send(&pdpb.WatchGlobalConfigResponse{Changes: cfgs})
if err != nil {
return err
}
}
}
}
}

func (s *GrpcServer) sendAllGlobalConfig(ctx context.Context, server pdpb.PD_WatchGlobalConfigServer) error {
configList, err := s.client.Get(ctx, globalConfigPath, clientv3.WithPrefix())
if err != nil {
return err
}
ls := make([]*pdpb.GlobalConfigItem, configList.Count)
for i, kv := range configList.Kvs {
ls[i] = &pdpb.GlobalConfigItem{Name: string(kv.Key), Value: string(kv.Value)}
}
err = server.Send(&pdpb.WatchGlobalConfigResponse{Changes: ls})
return err
}
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,3 +1425,8 @@ func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) e
}
return nil
}

// SplitAndScatterRegions TODO
func (s *Server) SplitAndScatterRegions(context context.Context, r *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) {
LemonHX marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("no implemented")
}
Loading