diff --git a/server/grpc_service.go b/server/grpc_service.go index 6fd0c303ab9..2d846aaf278 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -61,6 +61,7 @@ var ( ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started") ) // GrpcServer wraps Server to provide grpc service. @@ -1706,7 +1707,10 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan } // StoreGlobalConfig store global config into etcd by transaction -func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { +func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { + if s.client == nil { + return nil, ErrEtcdNotStarted + } ops := make([]clientv3.Op, len(request.Changes)) for i, item := range request.Changes { name := globalConfigPath + item.GetName() @@ -1726,6 +1730,9 @@ func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreG // LoadGlobalConfig load global config from etcd func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { + if s.client == nil { + return nil, ErrEtcdNotStarted + } names := request.Names res := make([]*pdpb.GlobalConfigItem, len(names)) for i, name := range names { @@ -1743,9 +1750,12 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo } // WatchGlobalConfig if the connection of WatchGlobalConfig is end -// or stoped by whatever reason +// or stopped by whatever reason // just reconnect to it. -func (s *GrpcServer) WatchGlobalConfig(request *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { +func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { + if s.client == nil { + return ErrEtcdNotStarted + } ctx, cancel := context.WithCancel(s.Context()) defer cancel() err := s.sendAllGlobalConfig(ctx, server) diff --git a/server/server.go b/server/server.go index 0fb25b1d1fd..88647b89dd3 100644 --- a/server/server.go +++ b/server/server.go @@ -1414,3 +1414,9 @@ func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) e func (s *Server) SplitAndScatterRegions(context context.Context, r *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) { return nil, errors.New("no implemented") } + +// SetClient sets the etcd client. +// Notes: it is only used for test. +func (s *Server) SetClient(client *clientv3.Client) { + s.client = client +} diff --git a/tests/server/global_config/global_config_test.go b/tests/server/global_config/global_config_test.go index cd3b6270804..4d56d7ba3df 100644 --- a/tests/server/global_config/global_config_test.go +++ b/tests/server/global_config/global_config_test.go @@ -17,13 +17,14 @@ package global_config_test import ( "context" "strconv" + "strings" + "sync" "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" - pd "github.com/tikv/pd/client" "github.com/tikv/pd/pkg/assertutil" "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server" @@ -45,8 +46,9 @@ var globalConfigPath = "/global/config/" type GlobalConfigTestSuite struct { server *server.GrpcServer - client pd.Client + client *grpc.ClientConn cleanup server.CleanupFunc + mu sync.Mutex } type TestReceiver struct { @@ -70,7 +72,7 @@ func (s *GlobalConfigTestSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) server.EnableZap = true addr := s.server.GetAddr() - s.client, err = pd.NewClient([]string{addr}, pd.SecurityOption{}) + s.client, err = grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure()) c.Assert(err, IsNil) } @@ -132,6 +134,50 @@ func (s *GlobalConfigTestSuite) TestWatch(c *C) { } } +func (s *GlobalConfigTestSuite) loadGlobalConfig(ctx context.Context, names []string) ([]*pdpb.GlobalConfigItem, error) { + res, err := pdpb.NewPDClient(s.client).LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names}) + return res.GetItems(), err +} + +func (s *GlobalConfigTestSuite) storeGlobalConfig(ctx context.Context, changes []*pdpb.GlobalConfigItem) error { + _, err := pdpb.NewPDClient(s.client).StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: changes}) + return err +} + +func (s *GlobalConfigTestSuite) watchGlobalConfig(ctx context.Context) (chan []*pdpb.GlobalConfigItem, error) { + globalConfigWatcherCh := make(chan []*pdpb.GlobalConfigItem, 16) + res, err := pdpb.NewPDClient(s.client).WatchGlobalConfig(ctx, &pdpb.WatchGlobalConfigRequest{}) + if err != nil { + close(globalConfigWatcherCh) + return nil, err + } + go func() { + defer func() { + if r := recover(); r != nil { + return + } + }() + for { + select { + case <-ctx.Done(): + close(globalConfigWatcherCh) + return + default: + m, err := res.Recv() + if err != nil { + return + } + arr := make([]*pdpb.GlobalConfigItem, len(m.Changes)) + for j, i := range m.Changes { + arr[j] = &pdpb.GlobalConfigItem{Name: i.GetName(), Value: i.GetValue()} + } + globalConfigWatcherCh <- arr + } + } + }() + return globalConfigWatcherCh, err +} + func (s *GlobalConfigTestSuite) TestClientLoad(c *C) { defer func() { _, err := s.server.GetClient().Delete(s.server.Context(), globalConfigPath+"test") @@ -139,14 +185,14 @@ func (s *GlobalConfigTestSuite) TestClientLoad(c *C) { }() _, err := s.server.GetClient().Put(s.server.Context(), globalConfigPath+"test", "test") c.Assert(err, IsNil) - res, err := s.client.LoadGlobalConfig(s.server.Context(), []string{"test"}) + res, err := s.loadGlobalConfig(s.server.Context(), []string{"test"}) c.Assert(err, IsNil) c.Assert(len(res), Equals, 1) - c.Assert(res[0], Equals, pd.GlobalConfigItem{Name: "test", Value: "test", Error: nil}) + c.Assert(res[0], DeepEquals, &pdpb.GlobalConfigItem{Name: "test", Value: "test", Error: nil}) } func (s *GlobalConfigTestSuite) TestClientLoadError(c *C) { - res, err := s.client.LoadGlobalConfig(s.server.Context(), []string{"test"}) + res, err := s.loadGlobalConfig(s.server.Context(), []string{"test"}) c.Assert(err, IsNil) c.Assert(res[0].Error, Not(Equals), nil) } @@ -158,7 +204,7 @@ func (s *GlobalConfigTestSuite) TestClientStore(c *C) { c.Assert(err, IsNil) } }() - err := s.client.StoreGlobalConfig(s.server.Context(), []pd.GlobalConfigItem{{Name: "1", Value: "1"}, {Name: "2", Value: "2"}, {Name: "3", Value: "3"}}) + err := s.storeGlobalConfig(s.server.Context(), []*pdpb.GlobalConfigItem{{Name: "1", Value: "1"}, {Name: "2", Value: "2"}, {Name: "3", Value: "3"}}) c.Assert(err, IsNil) for i := 1; i <= 3; i++ { res, err := s.server.GetClient().Get(s.server.Context(), globalConfigPath+strconv.Itoa(i)) @@ -174,7 +220,7 @@ func (s *GlobalConfigTestSuite) TestClientWatch(c *C) { c.Assert(err, IsNil) } }() - wc, err := s.client.WatchGlobalConfig(s.server.Context()) + wc, err := s.watchGlobalConfig(s.server.Context()) c.Assert(err, IsNil) for i := 0; i < 3; i++ { _, err = s.server.GetClient().Put(s.server.Context(), globalConfigPath+strconv.Itoa(i), strconv.Itoa(i)) @@ -192,16 +238,26 @@ func (s *GlobalConfigTestSuite) TestClientWatch(c *C) { } } -func (s *GlobalConfigTestSuite) TestClientWatchCloseReceiverExternally(c *C) { - wc, err := s.client.WatchGlobalConfig(s.server.Context()) - c.Assert(err, IsNil) - close(wc) -} +func (s *GlobalConfigTestSuite) TestEtcdNotStart(c *C) { + cli := s.server.GetClient() + defer func() { + s.mu.Lock() + s.server.SetClient(cli) + s.mu.Unlock() + }() + s.mu.Lock() + s.server.SetClient(nil) + s.mu.Unlock() + err := s.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{}, nil) + c.Assert(err, NotNil) + + _, err = s.server.StoreGlobalConfig(s.server.Context(), &pdpb.StoreGlobalConfigRequest{ + Changes: []*pdpb.GlobalConfigItem{{Name: "0", Value: "0"}}, + }) + c.Assert(err, NotNil) -func (s *GlobalConfigTestSuite) TestClientWatchTimeout(c *C) { - ctx, cancel := context.WithCancel(s.server.Context()) - wc, _ := s.client.WatchGlobalConfig(ctx) - cancel() - _, opened := <-wc - c.Assert(opened, Equals, false) + _, err = s.server.LoadGlobalConfig(s.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"test_etcd"}, + }) + c.Assert(err, NotNil) }