From f77c86bc138013d3891ac1bbd90988f149686188 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 31 May 2023 10:24:41 +0800 Subject: [PATCH] mcs: add a test for starting tso server first (#6535) ref tikv/pd#5895 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/resource_manager/server/server.go | 2 +- server/server.go | 3 + tests/integrations/mcs/tso/api_test.go | 69 +++++++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index 5e89bda1120..6705c4b1da9 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -409,7 +409,7 @@ func (s *Server) startServer() (err error) { s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10), utils.ResourceManagerServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) if err := s.serviceRegister.Register(); err != nil { - log.Error("failed to regiser the service", zap.String("service-name", utils.ResourceManagerServiceName), errs.ZapError(err)) + log.Error("failed to register the service", zap.String("service-name", utils.ResourceManagerServiceName), errs.ZapError(err)) return err } atomic.StoreInt64(&s.isRunning, 1) diff --git a/server/server.go b/server/server.go index 619049cac79..8eaca5d808a 100644 --- a/server/server.go +++ b/server/server.go @@ -556,6 +556,9 @@ func (s *Server) Run() error { if err := s.startEtcd(s.ctx); err != nil { return err } + failpoint.Inject("delayStartServer", func() { + time.Sleep(2 * time.Second) + }) if err := s.startServer(s.ctx); err != nil { return err } diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index e5dbcd7998c..c4064445a4a 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -15,17 +15,22 @@ package tso import ( + "bytes" "context" "encoding/json" "io" "net/http" "testing" + "time" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" tso "github.com/tikv/pd/pkg/mcs/tso/server" apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" ) @@ -104,3 +109,67 @@ func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map re.NoError(json.Unmarshal(data, &resp)) return resp } + +func TestTSOServerStartFirst(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServer", `return(true)`)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + apiCluster, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = []string{"k1", "k2"} + }) + defer apiCluster.Destroy() + re.NoError(err) + addr := apiCluster.GetConfig().GetClientURL() + ch := make(chan struct{}) + defer close(ch) + clusterCh := make(chan *mcs.TestTSOCluster) + defer close(clusterCh) + go func() { + tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, addr) + re.NoError(err) + primary := tsoCluster.WaitForDefaultPrimaryServing(re) + re.NotNil(primary) + clusterCh <- tsoCluster + ch <- struct{}{} + }() + err = apiCluster.RunInitialServers() + re.NoError(err) + leaderName := apiCluster.WaitLeader() + pdLeaderServer := apiCluster.GetServer(leaderName) + re.NoError(pdLeaderServer.BootstrapCluster()) + re.NoError(err) + tsoCluster := <-clusterCh + defer tsoCluster.Destroy() + <-ch + + time.Sleep(time.Second * 1) + input := make(map[string]interface{}) + input["new-id"] = 1 + input["keyspaces"] = []uint32{2} + jsonBody, err := json.Marshal(input) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, addr+"/pd/api/v2/tso/keyspace-groups/0/split", bytes.NewBuffer(jsonBody)) + re.NoError(err) + httpResp, err := dialClient.Do(httpReq) + re.NoError(err) + defer httpResp.Body.Close() + re.Equal(http.StatusOK, httpResp.StatusCode) + + httpReq, err = http.NewRequest(http.MethodGet, addr+"/pd/api/v2/tso/keyspace-groups/0", nil) + re.NoError(err) + httpResp, err = dialClient.Do(httpReq) + re.NoError(err) + data, err := io.ReadAll(httpResp.Body) + re.NoError(err) + defer httpResp.Body.Close() + re.Equal(http.StatusOK, httpResp.StatusCode) + + var group endpoint.KeyspaceGroup + re.NoError(json.Unmarshal(data, &group)) + re.Len(group.Keyspaces, 2) + re.Len(group.Members, 2) + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServer")) +}