Skip to content

Commit

Permalink
pdms: optimize upgrade pdms to avoid unnecessary primary transfer (#2414
Browse files Browse the repository at this point in the history
)
  • Loading branch information
HuSharp authored Sep 4, 2024
1 parent a16d04f commit 0b15af9
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
28 changes: 23 additions & 5 deletions pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,19 @@ func Upgrade(
if instance.IgnoreMonitorAgent() {
noAgentHosts.Insert(instance.GetManageHost())
}

// Usage within the switch statement
switch component.Name() {
case spec.ComponentPD:
// defer PD leader to be upgraded after others
isLeader, err := instance.(*spec.PDInstance).IsLeader(ctx, topo, int(options.APITimeout), tlsCfg)
case spec.ComponentPD, spec.ComponentTSO, spec.ComponentScheduling:
// defer PD related leader/primary to be upgraded after others
isLeader, err := checkAndDeferPDLeader(ctx, topo, int(options.APITimeout), tlsCfg, instance)
if err != nil {
logger.Warnf("cannot found pd leader, ignore: %s", err)
logger.Warnf("cannot found pd related leader/primary, ignore: %s, instance: %s", err, instance.ID())
return err
}
if isLeader {
deferInstances = append(deferInstances, instance)
logger.Debugf("Deferred upgrading of PD leader %s", instance.ID())
logger.Debugf("Upgrading deferred instance %s...", instance.ID())
continue
}
case spec.ComponentCDC:
Expand Down Expand Up @@ -218,6 +220,22 @@ func Upgrade(
return RestartMonitored(ctx, uniqueHosts.Slice(), noAgentHosts, topo.GetMonitoredOptions(), options.OptTimeout, systemdMode)
}

// checkAndDeferPDLeader checks the PD related leader/primary instance's status and defers its upgrade if necessary.
func checkAndDeferPDLeader(ctx context.Context, topo spec.Topology, apiTimeout int, tlsCfg *tls.Config, instance spec.Instance) (isLeader bool, err error) {
switch instance.ComponentName() {
case spec.ComponentPD:
isLeader, err = instance.(*spec.PDInstance).IsLeader(ctx, topo, apiTimeout, tlsCfg)
case spec.ComponentScheduling:
isLeader, err = instance.(*spec.SchedulingInstance).IsPrimary(ctx, topo, tlsCfg)
case spec.ComponentTSO:
isLeader, err = instance.(*spec.TSOInstance).IsPrimary(ctx, topo, tlsCfg)
}
if err != nil {
return false, err
}
return isLeader, nil
}

func upgradeInstance(
ctx context.Context,
topo spec.Topology,
Expand Down
26 changes: 25 additions & 1 deletion pkg/cluster/spec/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/pingcap/tiup/pkg/utils"
)

var schedulingService = "scheduling"

// SchedulingSpec represents the scheduling topology specification in topology.yaml
type SchedulingSpec struct {
Host string `yaml:"host"`
Expand Down Expand Up @@ -66,7 +69,7 @@ func (s *SchedulingSpec) Status(ctx context.Context, timeout time.Duration, tlsC
return "Down"
}

primary, err := pc.GetServicePrimary("scheduling")
primary, err := pc.GetServicePrimary(schedulingService)
if err != nil {
return "ERR"
}
Expand Down Expand Up @@ -309,6 +312,27 @@ func (i *SchedulingInstance) setTLSConfig(ctx context.Context, enableTLS bool, c
return configs, nil
}

// IsPrimary checks if the instance is primary
func (i *SchedulingInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) {
tidbTopo, ok := topo.(*Specification)
if !ok {
panic("topo should be type of tidb topology")
}
pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg)
primary, err := pdClient.GetServicePrimary(schedulingService)
if err != nil {
return false, errors.Annotatef(err, "failed to get Scheduling primary %s", i.GetHost())
}

spec := i.InstanceSpec.(*SchedulingSpec)
enableTLS := false
if tlsCfg != nil {
enableTLS = true
}

return primary == spec.GetAdvertiseListenURL(enableTLS), nil
}

// ScaleConfig deploy temporary config on scaling
func (i *SchedulingInstance) ScaleConfig(
ctx context.Context,
Expand Down
26 changes: 25 additions & 1 deletion pkg/cluster/spec/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
"github.com/pingcap/tiup/pkg/cluster/template/scripts"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/pingcap/tiup/pkg/utils"
)

var tsoService = "tso"

// TSOSpec represents the TSO topology specification in topology.yaml
type TSOSpec struct {
Host string `yaml:"host"`
Expand Down Expand Up @@ -66,7 +69,7 @@ func (s *TSOSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls
return "Down"
}

primary, err := pc.GetServicePrimary("tso")
primary, err := pc.GetServicePrimary(tsoService)
if err != nil {
return "ERR"
}
Expand Down Expand Up @@ -309,6 +312,27 @@ func (i *TSOInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs
return configs, nil
}

// IsPrimary checks if the instance is primary
func (i *TSOInstance) IsPrimary(ctx context.Context, topo Topology, tlsCfg *tls.Config) (bool, error) {
tidbTopo, ok := topo.(*Specification)
if !ok {
panic("topo should be type of tidb topology")
}
pdClient := api.NewPDClient(ctx, tidbTopo.GetPDListWithManageHost(), time.Second*5, tlsCfg)
primary, err := pdClient.GetServicePrimary(tsoService)
if err != nil {
return false, errors.Annotatef(err, "failed to get TSO primary %s", i.GetHost())
}

spec := i.InstanceSpec.(*TSOSpec)
enableTLS := false
if tlsCfg != nil {
enableTLS = true
}

return primary == spec.GetAdvertiseListenURL(enableTLS), nil
}

// ScaleConfig deploy temporary config on scaling
func (i *TSOInstance) ScaleConfig(
ctx context.Context,
Expand Down

0 comments on commit 0b15af9

Please sign in to comment.