Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6866
Browse files Browse the repository at this point in the history
close tikv#6860

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Aug 10, 2023
1 parent 3dc0cc3 commit b6655f0
Show file tree
Hide file tree
Showing 3 changed files with 576 additions and 0 deletions.
55 changes: 55 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,20 @@ const (
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
<<<<<<< HEAD
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
=======
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address")
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server")
ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started")
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
)

// GrpcServer wraps Server to provide grpc service.
Expand Down Expand Up @@ -1898,6 +1909,16 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
<<<<<<< HEAD
=======
if s.client == nil {
return nil, ErrEtcdNotStarted
}
configPath := request.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
ops := make([]clientv3.Op, len(request.Changes))
for i, item := range request.Changes {
name := globalConfigPath + item.GetName()
Expand All @@ -1917,6 +1938,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo

// LoadGlobalConfig load global config from etcd
func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) {
<<<<<<< HEAD
names := request.Names
res := make([]*pdpb.GlobalConfigItem, len(names))
for i, name := range names {
Expand All @@ -1928,15 +1950,48 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)}
=======
if s.client == nil {
return nil, ErrEtcdNotStarted
}
configPath := request.GetConfigPath()
if configPath == "" {
configPath = globalConfigPath
}
// Since item value needs to support marshal of different struct types,
// it should be set to `Payload bytes` instead of `Value string`.
if request.Names != nil {
res := make([]*pdpb.GlobalConfigItem, len(request.Names))
for i, name := range request.Names {
r, err := s.client.Get(ctx, path.Join(configPath, name))
if err != nil {
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}}
} else if len(r.Kvs) == 0 {
msg := "key " + name + " not found"
res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}}
} else {
res[i] = &pdpb.GlobalConfigItem{Name: name, Payload: r.Kvs[0].Value, Kind: pdpb.EventType_PUT}
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
}
}
return &pdpb.LoadGlobalConfigResponse{Items: res}, nil
}

<<<<<<< HEAD
// WatchGlobalConfig if the connection of WatchGlobalConfig is end
// or stoped by whatever reason
// just reconnect to it.
func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
=======
// WatchGlobalConfig will retry on recoverable errors forever until reconnected
// by Etcd.Watch() as long as the context has not been canceled or timed out.
// Watch on revision which greater than or equal to the required revision.
func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error {
if s.client == nil {
return ErrEtcdNotStarted
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
ctx, cancel := context.WithCancel(s.Context())
defer cancel()
err := s.sendAllGlobalConfig(ctx, server)
Expand Down
167 changes: 167 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,3 +1531,170 @@ func (s *Server) IsTTLConfigExist(key string) bool {
}
return false
}
<<<<<<< HEAD
=======

// MarkSnapshotRecovering mark pd that we're recovering
// tikv will get this state during BR EBS restore.
// we write this info into etcd for simplicity, the key only stays inside etcd temporary
// during BR EBS restore in which period the cluster is not able to serve request.
// and is deleted after BR EBS restore is done.
func (s *Server) MarkSnapshotRecovering() error {
log.Info("mark snapshot recovering")
markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath)
// the value doesn't matter, set to a static string
_, err := kv.NewSlowLogTxn(s.client).
If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)).
Then(clientv3.OpPut(markPath, "on")).
Commit()
// if other client already marked, return success too
return err
}

// IsSnapshotRecovering check whether recovering-mark marked
func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) {
markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath)
resp, err := s.client.Get(ctx, markPath)
if err != nil {
return false, err
}
return len(resp.Kvs) > 0, nil
}

// UnmarkSnapshotRecovering unmark recovering mark
func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error {
log.Info("unmark snapshot recovering")
markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath)
_, err := s.client.Delete(ctx, markPath)
// if other client already unmarked, return success too
return err
}

// GetServicePrimaryAddr returns the primary address for a given service.
// Note: This function will only return primary address without judging if it's alive.
func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) {
ticker := time.NewTicker(retryIntervalGetServicePrimary)
defer ticker.Stop()
for i := 0; i < maxRetryTimesGetServicePrimary; i++ {
if v, ok := s.servicePrimaryMap.Load(serviceName); ok {
return v.(string), true
}
select {
case <-s.ctx.Done():
return "", false
case <-ctx.Done():
return "", false
case <-ticker.C:
}
}
return "", false
}

// SetServicePrimaryAddr sets the primary address directly.
// Note: This function is only used for test.
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}

func (s *Server) initTSOPrimaryWatcher() {
serviceName := mcs.TSOServiceName
tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID)
tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID)
putFn := func(kv *mvccpb.KeyValue) error {
primary := &tsopb.Participant{} // TODO: use Generics
if err := proto.Unmarshal(kv.Value, primary); err != nil {
return err
}
listenUrls := primary.GetListenUrls()
if len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update tso primary", zap.String("primary", listenUrls[0]))
}
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
s.servicePrimaryMap.Delete(serviceName)
return nil
}
s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher(
s.serverLoopCtx,
&s.serverLoopWg,
s.client,
"tso-primary-watcher",
tsoServicePrimaryKey,
putFn,
deleteFn,
func() error { return nil },
)
}

// RecoverAllocID recover alloc id. set current base id to input id
func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error {
return s.idAllocator.SetBase(id)
}

// GetExternalTS returns external timestamp.
func (s *Server) GetExternalTS() uint64 {
return s.GetRaftCluster().GetExternalTS()
}

// SetExternalTS returns external timestamp.
func (s *Server) SetExternalTS(externalTS, globalTS uint64) error {
if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 {
desc := "the external timestamp should not be larger than global ts"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS))
return errors.New(desc)
}
currentExternalTS := s.GetRaftCluster().GetExternalTS()
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
desc := "the external timestamp should be larger than current external timestamp"
log.Error(desc, zap.Uint64("request", externalTS), zap.Uint64("current", currentExternalTS))
return errors.New(desc)
}
s.GetRaftCluster().SetExternalTS(externalTS)
return nil
}

// IsLocalTSOEnabled returns if the local TSO is enabled.
func (s *Server) IsLocalTSOEnabled() bool {
return s.cfg.IsLocalTSOEnabled()
}

// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings.
// If the value is negative, there is no limit.
func (s *Server) GetMaxConcurrentTSOProxyStreamings() int {
return s.cfg.GetMaxConcurrentTSOProxyStreamings()
}

// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration {
return s.cfg.GetTSOProxyRecvFromClientTimeout()
}

// GetLeaderLease returns the leader lease.
func (s *Server) GetLeaderLease() int64 {
return s.cfg.GetLeaderLease()
}

// GetTSOSaveInterval returns TSO save interval.
func (s *Server) GetTSOSaveInterval() time.Duration {
return s.cfg.GetTSOSaveInterval()
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration {
return s.cfg.GetTSOUpdatePhysicalInterval()
}

// GetMaxResetTSGap gets the max gap to reset the tso.
func (s *Server) GetMaxResetTSGap() time.Duration {
return s.persistOptions.GetMaxResetTSGap()
}

// SetClient sets the etcd client.
// Notes: it is only used for test.
func (s *Server) SetClient(client *clientv3.Client) {
s.client = client
}
>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866))
Loading

0 comments on commit b6655f0

Please sign in to comment.