Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7054
Browse files Browse the repository at this point in the history
close tikv#7053

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Sep 11, 2023
1 parent 947701a commit df236db
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 26 deletions.
39 changes: 38 additions & 1 deletion server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type minResolvedTS struct {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts/{store_id} [get]
func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)
idStr := mux.Vars(r)["store_id"]
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
Expand All @@ -74,9 +74,46 @@ func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *h
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
<<<<<<< HEAD

Check failure on line 77 in server/api/min_resolved_ts.go

View workflow job for this annotation

GitHub Actions / statics

syntax error: unexpected <<, expecting }
c := h.svr.GetRaftCluster()
value := c.GetMinResolvedTS()
persistInterval := c.GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval
=======
c := getCluster(r)
scopeMinResolvedTS := c.GetMinResolvedTS()
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval

var storesMinResolvedTS map[uint64]uint64
if scopeStr := r.URL.Query().Get("scope"); len(scopeStr) > 0 {

Check failure on line 87 in server/api/min_resolved_ts.go

View workflow job for this annotation

GitHub Actions / statics

syntax error: non-declaration statement outside function body
// scope is an optional parameter, it can be `cluster` or specified store IDs.
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
// - When scope given a list of stores, min_resolved_ts will be provided for each store
// and the scope-specific min_resolved_ts will be returned.
if scopeStr == "cluster" {
stores := c.GetMetaStores()
ids := make([]uint64, len(stores))
for i, store := range stores {
ids[i] = store.GetId()
}
// use cluster-level min_resolved_ts as the scope-specific min_resolved_ts.
_, storesMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids)
} else {
scopeIDs := strings.Split(scopeStr, ",")
ids := make([]uint64, len(scopeIDs))
for i, idStr := range scopeIDs {
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
ids[i] = id
}
scopeMinResolvedTS, storesMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids)
}
}

>>>>>>> d03f485c9 (*: check raftcluster nil (#7054))

Check failure on line 116 in server/api/min_resolved_ts.go

View workflow job for this annotation

GitHub Actions / statics

invalid character U+0023 '#'
h.rd.JSON(w, http.StatusOK, minResolvedTS{
MinResolvedTS: value,
PersistInterval: persistInterval,
Expand Down
4 changes: 3 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,6 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB
if rc == nil {
return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

_, err := rc.HandleBatchReportSplit(request)
if err != nil {
return &pdpb.ReportBatchSplitResponse{
Expand Down Expand Up @@ -1695,6 +1694,9 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S
return rsp.(*pdpb.SplitAndScatterRegionsResponse), err
}
rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()))
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,14 +930,20 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) er

// SetStoreLimitScene sets the limit values for different scenes
func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) {
cluster := h.s.GetRaftCluster()
cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return
}
rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
}

// GetStoreLimitScene returns the limit values for different scenes
func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene {
cluster := h.s.GetRaftCluster()
return cluster.GetStoreLimiter().StoreLimitScene(limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return nil
}
return rc.GetStoreLimiter().StoreLimitScene(limitType)
}

// GetProgressByID returns the progress details for a given store ID.
Expand Down
60 changes: 40 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,18 +949,18 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
}
old := s.persistOptions.GetReplicationConfig()
if cfg.EnablePlacementRules != old.EnablePlacementRules {
raftCluster := s.GetRaftCluster()
if raftCluster == nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if cfg.EnablePlacementRules {
// initialize rule manager.
if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil {
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil {
return err
}
} else {
// NOTE: can be removed after placement rules feature is enabled by default.
for _, s := range raftCluster.GetStores() {
for _, s := range rc.GetStores() {
if !s.IsRemoved() && s.IsTiFlash() {
return errors.New("cannot disable placement rules with TiFlash nodes")
}
Expand All @@ -970,8 +970,12 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {

var rule *placement.Rule
if cfg.EnablePlacementRules {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
// replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule.
defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
defaultRule := rc.GetRuleManager().GetRule("pd", "default")

CheckInDefaultRule := func() error {
// replication config won't work when placement rule is enabled and exceeds one default rule
Expand All @@ -997,7 +1001,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
if rule != nil {
rule.Count = int(cfg.MaxReplicas)
rule.LocationLabels = cfg.LocationLabels
if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if err := rc.GetRuleManager().SetRule(rule); err != nil {
log.Error("failed to update rule count",
errs.ZapError(err))
return err
Expand All @@ -1009,7 +1017,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
s.persistOptions.SetReplicationConfig(old)
if rule != nil {
rule.Count = int(old.MaxReplicas)
if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if e := rc.GetRuleManager().SetRule(rule); e != nil {
log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e))
}
}
Expand Down Expand Up @@ -1243,18 +1255,18 @@ func (s *Server) GetServerOption() *config.PersistOptions {

// GetMetaRegions gets meta regions from cluster.
func (s *Server) GetMetaRegions() []*metapb.Region {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetMetaRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetMetaRegions()
}
return nil
}

// GetRegions gets regions from cluster.
func (s *Server) GetRegions() []*core.RegionInfo {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetRegions()
}
return nil
}
Expand Down Expand Up @@ -1375,9 +1387,9 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro
}
log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))

cluster := s.GetRaftCluster()
if cluster != nil {
err := cluster.GetReplicationMode().UpdateConfig(cfg)
rc := s.GetRaftCluster()
if rc != nil {
err := rc.GetReplicationMode().UpdateConfig(cfg)
if err != nil {
log.Warn("failed to update replication mode", errs.ZapError(err))
// revert to old config
Expand Down Expand Up @@ -1747,7 +1759,11 @@ func (s *Server) GetGlobalTS() (uint64, error) {

// GetExternalTS returns external timestamp.
func (s *Server) GetExternalTS() uint64 {
return s.GetRaftCluster().GetExternalTS()
rc := s.GetRaftCluster()
if rc == nil {
return 0
}
return rc.GetExternalTS()
}

// SetExternalTS returns external timestamp.
Expand All @@ -1761,14 +1777,18 @@ func (s *Server) SetExternalTS(externalTS uint64) error {
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS))
return errors.New(desc)
}
currentExternalTS := s.GetRaftCluster().GetExternalTS()
c := s.GetRaftCluster()
if c == nil {
return errs.ErrNotBootstrapped.FastGenByArgs()
}
currentExternalTS := c.GetExternalTS()
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
desc := "the external timestamp should be larger than now"
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("current external timestamp", currentExternalTS))
return errors.New(desc)
}
s.GetRaftCluster().SetExternalTS(externalTS)
return nil

return c.SetExternalTS(externalTS)
}

// SetClient sets the etcd client.
Expand Down
40 changes: 40 additions & 0 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,46 @@ func TestRemovingProgress(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

func TestSendApiWhenRestartRaftCluster(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) {
conf.Replication.MaxReplicas = 1
})
re.NoError(err)
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)
leader := cluster.GetServer(cluster.WaitLeader())

grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
clusterID := leader.GetClusterID()
req := &pdpb.BootstrapRequest{
Header: testutil.NewRequestHeader(clusterID),
Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"},
Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}},
}
resp, err := grpcPDClient.Bootstrap(context.Background(), req)
re.NoError(err)
re.Nil(resp.GetHeader().GetError())

// Mock restart raft cluster
rc := leader.GetRaftCluster()
re.NotNil(rc)
rc.Stop()

// Mock client-go will still send request
output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError)
re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first")

err = rc.Start(leader.GetServer())
re.NoError(err)
rc = leader.GetRaftCluster()
re.NotNil(rc)
}

func TestPreparingProgress(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))
Expand Down

0 comments on commit df236db

Please sign in to comment.