From b6655f07abe44ced92ec7a46b123cad71ff7c7ad Mon Sep 17 00:00:00 2001 From: Hu# Date: Mon, 31 Jul 2023 20:53:38 +0800 Subject: [PATCH] This is an automated cherry-pick of #6866 close tikv/pd#6860 Signed-off-by: ti-chi-bot --- server/grpc_service.go | 55 +++ server/server.go | 167 +++++++++ .../integrations/client/global_config_test.go | 354 ++++++++++++++++++ 3 files changed, 576 insertions(+) create mode 100644 tests/integrations/client/global_config_test.go diff --git a/server/grpc_service.go b/server/grpc_service.go index 0047c43beea1..9bdc002b5226 100755 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -61,9 +61,20 @@ const ( var ( // ErrNotLeader is returned when current server is not the leader and not possible to process request. // TODO: work as proxy. +<<<<<<< HEAD ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") +======= + ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address") + ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") + ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") + ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") + ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started") +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) ) // GrpcServer wraps Server to provide grpc service. @@ -1898,6 +1909,16 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan // StoreGlobalConfig store global config into etcd by transaction func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { +<<<<<<< HEAD +======= + if s.client == nil { + return nil, ErrEtcdNotStarted + } + configPath := request.GetConfigPath() + if configPath == "" { + configPath = globalConfigPath + } +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) ops := make([]clientv3.Op, len(request.Changes)) for i, item := range request.Changes { name := globalConfigPath + item.GetName() @@ -1917,6 +1938,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo // LoadGlobalConfig load global config from etcd func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { +<<<<<<< HEAD names := request.Names res := make([]*pdpb.GlobalConfigItem, len(names)) for i, name := range names { @@ -1928,15 +1950,48 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}} } else { res[i] = &pdpb.GlobalConfigItem{Name: name, Value: string(r.Kvs[0].Value)} +======= + if s.client == nil { + return nil, ErrEtcdNotStarted + } + configPath := request.GetConfigPath() + if configPath == "" { + configPath = globalConfigPath + } + // Since item value needs to support marshal of different struct types, + // it should be set to `Payload bytes` instead of `Value string`. + if request.Names != nil { + res := make([]*pdpb.GlobalConfigItem, len(request.Names)) + for i, name := range request.Names { + r, err := s.client.Get(ctx, path.Join(configPath, name)) + if err != nil { + res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_UNKNOWN, Message: err.Error()}} + } else if len(r.Kvs) == 0 { + msg := "key " + name + " not found" + res[i] = &pdpb.GlobalConfigItem{Name: name, Error: &pdpb.Error{Type: pdpb.ErrorType_GLOBAL_CONFIG_NOT_FOUND, Message: msg}} + } else { + res[i] = &pdpb.GlobalConfigItem{Name: name, Payload: r.Kvs[0].Value, Kind: pdpb.EventType_PUT} + } +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) } } return &pdpb.LoadGlobalConfigResponse{Items: res}, nil } +<<<<<<< HEAD // WatchGlobalConfig if the connection of WatchGlobalConfig is end // or stoped by whatever reason // just reconnect to it. func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { +======= +// WatchGlobalConfig will retry on recoverable errors forever until reconnected +// by Etcd.Watch() as long as the context has not been canceled or timed out. +// Watch on revision which greater than or equal to the required revision. +func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { + if s.client == nil { + return ErrEtcdNotStarted + } +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) ctx, cancel := context.WithCancel(s.Context()) defer cancel() err := s.sendAllGlobalConfig(ctx, server) diff --git a/server/server.go b/server/server.go index b1ac379edfd2..8b73a7153f99 100644 --- a/server/server.go +++ b/server/server.go @@ -1531,3 +1531,170 @@ func (s *Server) IsTTLConfigExist(key string) bool { } return false } +<<<<<<< HEAD +======= + +// MarkSnapshotRecovering mark pd that we're recovering +// tikv will get this state during BR EBS restore. +// we write this info into etcd for simplicity, the key only stays inside etcd temporary +// during BR EBS restore in which period the cluster is not able to serve request. +// and is deleted after BR EBS restore is done. +func (s *Server) MarkSnapshotRecovering() error { + log.Info("mark snapshot recovering") + markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + // the value doesn't matter, set to a static string + _, err := kv.NewSlowLogTxn(s.client). + If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)). + Then(clientv3.OpPut(markPath, "on")). + Commit() + // if other client already marked, return success too + return err +} + +// IsSnapshotRecovering check whether recovering-mark marked +func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) { + markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + resp, err := s.client.Get(ctx, markPath) + if err != nil { + return false, err + } + return len(resp.Kvs) > 0, nil +} + +// UnmarkSnapshotRecovering unmark recovering mark +func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error { + log.Info("unmark snapshot recovering") + markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + _, err := s.client.Delete(ctx, markPath) + // if other client already unmarked, return success too + return err +} + +// GetServicePrimaryAddr returns the primary address for a given service. +// Note: This function will only return primary address without judging if it's alive. +func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) { + ticker := time.NewTicker(retryIntervalGetServicePrimary) + defer ticker.Stop() + for i := 0; i < maxRetryTimesGetServicePrimary; i++ { + if v, ok := s.servicePrimaryMap.Load(serviceName); ok { + return v.(string), true + } + select { + case <-s.ctx.Done(): + return "", false + case <-ctx.Done(): + return "", false + case <-ticker.C: + } + } + return "", false +} + +// SetServicePrimaryAddr sets the primary address directly. +// Note: This function is only used for test. +func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { + s.servicePrimaryMap.Store(serviceName, addr) +} + +func (s *Server) initTSOPrimaryWatcher() { + serviceName := mcs.TSOServiceName + tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID) + tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID) + putFn := func(kv *mvccpb.KeyValue) error { + primary := &tsopb.Participant{} // TODO: use Generics + if err := proto.Unmarshal(kv.Value, primary); err != nil { + return err + } + listenUrls := primary.GetListenUrls() + if len(listenUrls) > 0 { + // listenUrls[0] is the primary service endpoint of the keyspace group + s.servicePrimaryMap.Store(serviceName, listenUrls[0]) + log.Info("update tso primary", zap.String("primary", listenUrls[0])) + } + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + s.servicePrimaryMap.Delete(serviceName) + return nil + } + s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher( + s.serverLoopCtx, + &s.serverLoopWg, + s.client, + "tso-primary-watcher", + tsoServicePrimaryKey, + putFn, + deleteFn, + func() error { return nil }, + ) +} + +// RecoverAllocID recover alloc id. set current base id to input id +func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error { + return s.idAllocator.SetBase(id) +} + +// GetExternalTS returns external timestamp. +func (s *Server) GetExternalTS() uint64 { + return s.GetRaftCluster().GetExternalTS() +} + +// SetExternalTS returns external timestamp. +func (s *Server) SetExternalTS(externalTS, globalTS uint64) error { + if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 { + desc := "the external timestamp should not be larger than global ts" + log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS)) + return errors.New(desc) + } + currentExternalTS := s.GetRaftCluster().GetExternalTS() + if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 { + desc := "the external timestamp should be larger than current external timestamp" + log.Error(desc, zap.Uint64("request", externalTS), zap.Uint64("current", currentExternalTS)) + return errors.New(desc) + } + s.GetRaftCluster().SetExternalTS(externalTS) + return nil +} + +// IsLocalTSOEnabled returns if the local TSO is enabled. +func (s *Server) IsLocalTSOEnabled() bool { + return s.cfg.IsLocalTSOEnabled() +} + +// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings. +// If the value is negative, there is no limit. +func (s *Server) GetMaxConcurrentTSOProxyStreamings() int { + return s.cfg.GetMaxConcurrentTSOProxyStreamings() +} + +// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client. +func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration { + return s.cfg.GetTSOProxyRecvFromClientTimeout() +} + +// GetLeaderLease returns the leader lease. +func (s *Server) GetLeaderLease() int64 { + return s.cfg.GetLeaderLease() +} + +// GetTSOSaveInterval returns TSO save interval. +func (s *Server) GetTSOSaveInterval() time.Duration { + return s.cfg.GetTSOSaveInterval() +} + +// GetTSOUpdatePhysicalInterval returns TSO update physical interval. +func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration { + return s.cfg.GetTSOUpdatePhysicalInterval() +} + +// GetMaxResetTSGap gets the max gap to reset the tso. +func (s *Server) GetMaxResetTSGap() time.Duration { + return s.persistOptions.GetMaxResetTSGap() +} + +// SetClient sets the etcd client. +// Notes: it is only used for test. +func (s *Server) SetClient(client *clientv3.Client) { + s.client = client +} +>>>>>>> c97dfcb74 (global config: fix etcd client not found (#6866)) diff --git a/tests/integrations/client/global_config_test.go b/tests/integrations/client/global_config_test.go new file mode 100644 index 000000000000..15034d035a69 --- /dev/null +++ b/tests/integrations/client/global_config_test.go @@ -0,0 +1,354 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client_test + +import ( + "path" + "strconv" + "sync" + "testing" + "time" + + pd "github.com/tikv/pd/client" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/utils/assertutil" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/server" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const globalConfigPath = "/global/config/" + +type testReceiver struct { + re *require.Assertions + grpc.ServerStream +} + +func (s testReceiver) Send(m *pdpb.WatchGlobalConfigResponse) error { + log.Info("received", zap.Any("received", m.GetChanges())) + for _, change := range m.GetChanges() { + s.re.Contains(change.Name, globalConfigPath+string(change.Payload)) + } + return nil +} + +type globalConfigTestSuite struct { + suite.Suite + server *server.GrpcServer + client pd.Client + cleanup testutil.CleanupFunc + mu sync.Mutex +} + +func TestGlobalConfigTestSuite(t *testing.T) { + suite.Run(t, new(globalConfigTestSuite)) +} + +func (suite *globalConfigTestSuite) SetupSuite() { + var err error + var gsi *server.Server + checker := assertutil.NewChecker() + checker.FailNow = func() {} + gsi, suite.cleanup, err = server.NewTestServer(suite.Require(), checker) + suite.server = &server.GrpcServer{Server: gsi} + suite.NoError(err) + addr := suite.server.GetAddr() + suite.client, err = pd.NewClientWithContext(suite.server.Context(), []string{addr}, pd.SecurityOption{}) + suite.NoError(err) +} + +func (suite *globalConfigTestSuite) TearDownSuite() { + suite.client.Close() + suite.cleanup() +} + +func (suite *globalConfigTestSuite) GetEtcdPath(configPath string) string { + return globalConfigPath + configPath +} + +func (suite *globalConfigTestSuite) TestLoadWithoutNames() { + defer func() { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + }() + r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + ConfigPath: globalConfigPath, + }) + suite.NoError(err) + suite.Len(res.Items, 1) + suite.Equal(r.Header.GetRevision(), res.Revision) + suite.Equal("test", string(res.Items[0].Payload)) +} + +func (suite *globalConfigTestSuite) TestLoadWithoutConfigPath() { + defer func() { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("source_id")) + suite.NoError(err) + }() + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("source_id"), "1") + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"source_id"}, + }) + suite.NoError(err) + suite.Len(res.Items, 1) + suite.Equal([]byte("1"), res.Items[0].Payload) +} + +func (suite *globalConfigTestSuite) TestLoadOtherConfigPath() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), path.Join("OtherConfigPath", strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"0", "1"}, + ConfigPath: "OtherConfigPath", + }) + suite.NoError(err) + suite.Len(res.Items, 2) + for i, item := range res.Items { + suite.Equal(&pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: strconv.Itoa(i), Payload: []byte(strconv.Itoa(i))}, item) + } +} + +func (suite *globalConfigTestSuite) TestLoadAndStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + } + }() + changes := []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}, {Kind: pdpb.EventType_PUT, Name: "1", Payload: []byte("1")}, {Kind: pdpb.EventType_PUT, Name: "2", Payload: []byte("2")}} + _, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: changes, + }) + suite.NoError(err) + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + ConfigPath: globalConfigPath, + }) + suite.Len(res.Items, 3) + suite.NoError(err) + for i, item := range res.Items { + suite.Equal(&pdpb.GlobalConfigItem{Kind: pdpb.EventType_PUT, Name: suite.GetEtcdPath(strconv.Itoa(i)), Payload: []byte(strconv.Itoa(i))}, item) + } +} + +func (suite *globalConfigTestSuite) TestStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + } + }() + changes := []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}, {Kind: pdpb.EventType_PUT, Name: "1", Payload: []byte("1")}, {Kind: pdpb.EventType_PUT, Name: "2", Payload: []byte("2")}} + _, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: changes, + }) + suite.NoError(err) + for i := 0; i < 3; i++ { + res, err := suite.server.GetClient().Get(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + suite.Equal(suite.GetEtcdPath(string(res.Kvs[0].Value)), string(res.Kvs[0].Key)) + } +} + +func (suite *globalConfigTestSuite) TestWatch() { + defer func() { + for i := 0; i < 3; i++ { + // clean up + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + server := testReceiver{re: suite.Require()} + go suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Revision: 0, + }, server) + for i := 0; i < 6; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + for i := 3; i < 6; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + res, err := suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + ConfigPath: globalConfigPath, + }) + suite.Len(res.Items, 3) + suite.NoError(err) +} + +func (suite *globalConfigTestSuite) TestClientLoadWithoutNames() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), nil, globalConfigPath) + suite.NoError(err) + suite.Len(res, 3) + for i, item := range res { + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: suite.GetEtcdPath(strconv.Itoa(i)), PayLoad: []byte(strconv.Itoa(i)), Value: strconv.Itoa(i)}, item) + } +} + +func (suite *globalConfigTestSuite) TestClientLoadWithoutConfigPath() { + defer func() { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("source_id")) + suite.NoError(err) + }() + _, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("source_id"), "1") + suite.NoError(err) + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), []string{"source_id"}, "") + suite.NoError(err) + suite.Len(res, 1) + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: "source_id", PayLoad: []byte("1"), Value: "1"}, res[0]) +} + +func (suite *globalConfigTestSuite) TestClientLoadOtherConfigPath() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Put(suite.server.Context(), path.Join("OtherConfigPath", strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + res, _, err := suite.client.LoadGlobalConfig(suite.server.Context(), []string{"0", "1"}, "OtherConfigPath") + suite.NoError(err) + suite.Len(res, 2) + for i, item := range res { + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: strconv.Itoa(i), PayLoad: []byte(strconv.Itoa(i)), Value: strconv.Itoa(i)}, item) + } +} + +func (suite *globalConfigTestSuite) TestClientStore() { + defer func() { + for i := 0; i < 3; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + err := suite.client.StoreGlobalConfig(suite.server.Context(), globalConfigPath, + []pd.GlobalConfigItem{{Name: "0", Value: "0"}, {Name: "1", Value: "1"}, {Name: "2", Value: "2"}}) + suite.NoError(err) + for i := 0; i < 3; i++ { + res, err := suite.server.GetClient().Get(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + suite.Equal(suite.GetEtcdPath(string(res.Kvs[0].Value)), string(res.Kvs[0].Key)) + } +} + +func (suite *globalConfigTestSuite) TestClientWatchWithRevision() { + defer func() { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath("test")) + suite.NoError(err) + + for i := 3; i < 9; i++ { + _, err := suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + }() + // Mock get revision by loading + r, err := suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath("test"), "test") + suite.NoError(err) + res, revision, err := suite.client.LoadGlobalConfig(suite.server.Context(), nil, globalConfigPath) + suite.NoError(err) + suite.Len(res, 1) + suite.LessOrEqual(r.Header.GetRevision(), revision) + suite.Equal(pd.GlobalConfigItem{EventType: pdpb.EventType_PUT, Name: suite.GetEtcdPath("test"), PayLoad: []byte("test"), Value: "test"}, res[0]) + // Mock when start watcher there are existed some keys, will load firstly + for i := 0; i < 6; i++ { + _, err = suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + // Start watcher at next revision + configChan, err := suite.client.WatchGlobalConfig(suite.server.Context(), globalConfigPath, revision) + suite.NoError(err) + // Mock delete + for i := 0; i < 3; i++ { + _, err = suite.server.GetClient().Delete(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i))) + suite.NoError(err) + } + // Mock put + for i := 6; i < 9; i++ { + _, err = suite.server.GetClient().Put(suite.server.Context(), suite.GetEtcdPath(strconv.Itoa(i)), strconv.Itoa(i)) + suite.NoError(err) + } + for { + select { + case <-time.After(time.Second): + return + case res := <-configChan: + for _, r := range res { + suite.Equal(suite.GetEtcdPath(r.Value), r.Name) + } + } + } +} + +func (suite *globalConfigTestSuite) TestEtcdNotStart() { + cli := suite.server.GetClient() + defer func() { + suite.mu.Lock() + suite.server.SetClient(cli) + suite.mu.Unlock() + }() + suite.mu.Lock() + suite.server.SetClient(nil) + suite.mu.Unlock() + err := suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Revision: 0, + }, nil) + suite.Error(err) + + _, err = suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + ConfigPath: globalConfigPath, + Changes: []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}}, + }) + suite.Error(err) + + _, err = suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"test_etcd"}, + }) + suite.Error(err) +}