Skip to content

Commit

Permalink
pdms: Choose a suitable pdms to transfer primary when upgrade (#5643)
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Aug 14, 2024
1 parent 545685a commit dbe75c0
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 7 deletions.
14 changes: 14 additions & 0 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ func NewFakePDClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *
return pdClient
}

// NewFakePDMSClient creates a fake pdmsclient that is set as the pdms client
func NewFakePDMSClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster, curService string) *pdapi.FakePDMSClient {
pdmsClient := pdapi.NewFakePDMSClient()
if tc.Spec.Cluster != nil {
pdControl.SetPDMSClientWithClusterDomain(pdapi.Namespace(tc.Spec.Cluster.Namespace), tc.Spec.Cluster.Name, tc.Spec.Cluster.ClusterDomain, curService, pdmsClient)
}
if tc.Spec.ClusterDomain != "" {
pdControl.SetPDMSClientWithClusterDomain(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.Spec.ClusterDomain, curService, pdmsClient)
}
pdControl.SetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), curService, pdmsClient)

return pdmsClient
}

// NewFakePDClientWithAddress creates a fake pdclient that is set as the pd client
func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string) *pdapi.FakePDClient {
pdClient := pdapi.NewFakePDClient()
Expand Down
88 changes: 86 additions & 2 deletions pkg/manager/member/pd_ms_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"
"github.com/pingcap/tidb-operator/pkg/util/cmpver"
apps "k8s.io/api/apps/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -120,12 +121,95 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
}
continue
}
mngerutils.SetUpgradePartition(newSet, i)
return nil

return u.upgradePDMSPod(tc, i, newSet, curService)
}
return nil
}

func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, newSet *apps.StatefulSet, curService string) error {
// Only support after `8.3.0` to keep compatibility.
if check, err := pdMSSupportMicroServicesWithName.Check(tc.PDMSVersion(curService)); check && err == nil {
ns := tc.GetNamespace()
tcName := tc.GetName()
upgradePDMSName := PDMSName(tcName, ordinal, tc.Namespace, tc.Spec.ClusterDomain, tc.Spec.AcrossK8s, curService)
upgradePodName := PDMSPodName(tcName, ordinal, curService)

pdClient := controller.GetPDClient(u.deps.PDControl, tc)
primary, err := pdClient.GetMSPrimary(curService)
if err != nil {
return err
}

klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
primary, upgradePDMSName, upgradePodName)
// If current pdms is primary, transfer primary to other pdms pod
if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) {
targetName := ""

if tc.PDMSStsActualReplicas(curService) > 1 {
targetName = choosePDMSToTransferFromMembers(tc, newSet, ordinal)
}

if targetName != "" {
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName)
if err != nil {
klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
return err
}
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
} else {
klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
}
}
}

mngerutils.SetUpgradePartition(newSet, ordinal)
return nil
}

// choosePDMSToTransferFromMembers choose a pdms to transfer primary from members
//
// Assume that current primary ordinal is x, and range is [0, n]
// 1. Find the max suitable ordinal in (x, n], because they have been upgraded
// 2. If no suitable ordinal, find the min suitable ordinal in [0, x) to reduce the count of transfer
func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string {
ns := tc.GetNamespace()
tcName := tc.GetName()
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet)

// set ordinal to max ordinal if ordinal isn't exist
if !ordinals.Has(ordinal) {
ordinal = helper.GetMaxPodOrdinal(*newSet.Spec.Replicas, newSet)
}

targetName := ""
list := ordinals.List()
if len(list) == 0 {
return ""
}

// just using pods index for now. TODO: add healthy checker for pdms.
// find the maximum ordinal which is larger than ordinal
if len(list) > int(ordinal)+1 {
targetName = PDMSPodName(tcName, list[len(list)-1], controller.PDMSTrimName(newSet.Name))
}

if targetName == "" && ordinal != 0 {
// find the minimum ordinal which is less than ordinal
targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name))
}

klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
return targetName
}

// PDMSSupportMicroServicesWithName returns true if the given version of PDMS supports microservices with name.
// related https://github.com/tikv/pd/pull/8157.
var pdMSSupportMicroServicesWithName, _ = cmpver.NewConstraint(cmpver.GreaterOrEqual, "v8.3.0")

type fakePDMSUpgrader struct{}

// NewFakePDMSUpgrader returns a fakePDUpgrader
Expand Down
20 changes: 17 additions & 3 deletions pkg/manager/member/pd_ms_upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
"github.com/pingcap/tidb-operator/pkg/pdapi"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -44,8 +45,20 @@ func TestPDMSUpgraderUpgrade(t *testing.T) {

testFn := func(test *testcase) {
t.Log(test.name)
upgrader, podInformer := newPDMSUpgrader()
upgrader, pdControl, podInformer := newPDMSUpgrader()
tc := newTidbClusterForPDMSUpgrader()
pdClient := controller.NewFakePDClient(pdControl, tc)
pdMSClient := controller.NewFakePDMSClient(pdControl, tc, "tso")

pdClient.AddReaction(pdapi.GetPDMSPrimaryActionType, func(action *pdapi.Action) (interface{}, error) {
return "upgrader-tso-1", nil
})
pdMSClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) {
return nil, nil
})
pdMSClient.AddReaction(pdapi.PDMSTransferPrimaryActionType, func(action *pdapi.Action) (interface{}, error) {
return nil, nil
})

if test.changeFn != nil {
test.changeFn(tc)
Expand Down Expand Up @@ -218,11 +231,12 @@ func TestPDMSUpgraderUpgrade(t *testing.T) {
}
}

func newPDMSUpgrader() (Upgrader, podinformers.PodInformer) {
func newPDMSUpgrader() (Upgrader, *pdapi.FakePDControl, podinformers.PodInformer) {
fakeDeps := controller.NewFakeDependencies()
pdMSUpgrader := &pdMSUpgrader{deps: fakeDeps}
podInformer := fakeDeps.KubeInformerFactory.Core().V1().Pods()
return pdMSUpgrader, podInformer
pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl)
return pdMSUpgrader, pdControl, podInformer
}

func newStatefulSetForPDMSUpgrader() *apps.StatefulSet {
Expand Down
15 changes: 15 additions & 0 deletions pkg/manager/member/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ func PdName(tcName string, ordinal int32, namespace string, clusterDomain string
return PdPodName(tcName, ordinal)
}

// PDMSName should match the start arg `--name` of pd-server
// See the start script of PDMS in pkg/manager/member/startscript/v2.renderPDMSStartScript
func PDMSName(tcName string, ordinal int32, namespace, clusterDomain string, acrossK8s bool, component string) string {
if len(clusterDomain) > 0 {
return fmt.Sprintf("%s.%s-%s-peer.%s.svc.%s", PDMSPodName(tcName, ordinal, component), component, tcName, namespace, clusterDomain)
}

// clusterDomain is not set
if acrossK8s {
return fmt.Sprintf("%s.%s-%s-peer.%s.svc", PDMSPodName(tcName, ordinal, component), component, tcName, namespace)
}

return PDMSPodName(tcName, ordinal, component)
}

// NeedForceUpgrade check if force upgrade is necessary
func NeedForceUpgrade(ann map[string]string) bool {
// Check if annotation 'pingcap.com/force-upgrade: "true"' is set
Expand Down
48 changes: 48 additions & 0 deletions pkg/pdapi/fake_pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
GetClusterActionType ActionType = "GetCluster"
GetMembersActionType ActionType = "GetMembers"
GetPDMSMembersActionType ActionType = "GetPDMSMembers"
GetPDMSPrimaryActionType ActionType = "GetPDMSPrimary"
GetStoresActionType ActionType = "GetStores"
GetTombStoneStoresActionType ActionType = "GetTombStoneStores"
GetStoreActionType ActionType = "GetStore"
Expand All @@ -45,6 +46,7 @@ const (
TransferPDLeaderActionType ActionType = "TransferPDLeader"
GetAutoscalingPlansActionType ActionType = "GetAutoscalingPlans"
GetRecoveringMarkActionType ActionType = "GetRecoveringMark"
PDMSTransferPrimaryActionType ActionType = "PDMSTransferPrimary"
)

type NotFoundReaction struct {
Expand Down Expand Up @@ -78,6 +80,15 @@ func (c *FakePDClient) GetMSMembers(_ string) ([]string, error) {
return result.([]string), nil
}

func (c *FakePDClient) GetMSPrimary(_ string) (string, error) {
action := &Action{}
result, err := c.fakeAPI(GetPDMSPrimaryActionType, action)
if err != nil {
return "", err
}
return result.(string), nil
}

func NewFakePDClient() *FakePDClient {
return &FakePDClient{reactions: map[ActionType]Reaction{}}
}
Expand Down Expand Up @@ -291,3 +302,40 @@ func (c *FakePDClient) GetRecoveringMark() (bool, error) {

return true, nil
}

// FakePDMSClient implements a fake version of PDMSClient.
type FakePDMSClient struct {
reactions map[ActionType]Reaction
}

func NewFakePDMSClient() *FakePDMSClient {
return &FakePDMSClient{reactions: map[ActionType]Reaction{}}
}

func (c *FakePDMSClient) AddReaction(actionType ActionType, reaction Reaction) {
c.reactions[actionType] = reaction
}

// fakeAPI is a small helper for fake API calls
func (c *FakePDMSClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) {
if reaction, ok := c.reactions[actionType]; ok {
result, err := reaction(action)
if err != nil {
return nil, err
}
return result, nil
}
return nil, &NotFoundReaction{actionType}
}

func (c *FakePDMSClient) GetHealth() error {
action := &Action{}
_, err := c.fakeAPI(GetHealthActionType, action)
return err
}

func (c *FakePDMSClient) TransferPrimary(newPrimary string) error {
action := &Action{Name: newPrimary}
_, err := c.fakeAPI(PDMSTransferPrimaryActionType, action)
return err
}
14 changes: 13 additions & 1 deletion pkg/pdapi/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ type FakePDControl struct {

func NewFakePDControl(secretLister corelisterv1.SecretLister) *FakePDControl {
return &FakePDControl{
defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}},
defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}, pdMSClients: map[string]PDMSClient{}},
}
}

Expand All @@ -352,3 +352,15 @@ func (fpc *FakePDControl) SetPDClientWithClusterDomain(namespace Namespace, tcNa
func (fpc *FakePDControl) SetPDClientWithAddress(peerURL string, pdclient PDClient) {
fpc.defaultPDControl.pdClients[peerURL] = pdclient
}

func (fpc *FakePDControl) SetPDMSClient(namespace Namespace, tcName, curService string, pdmsclient PDMSClient) {
fpc.defaultPDControl.pdMSClients[genClientUrl(namespace, tcName, "http", "", curService, false)] = pdmsclient
}

func (fpc *FakePDControl) SetPDMSClientWithClusterDomain(namespace Namespace, tcName, tcClusterDomain, curService string, pdmsclient PDMSClient) {
fpc.defaultPDControl.pdMSClients[genClientUrl(namespace, tcName, "http", tcClusterDomain, curService, false)] = pdmsclient
}

func (fpc *FakePDControl) SetPDMSClientWithAddress(peerURL string, pdmsclient PDMSClient) {
fpc.defaultPDControl.pdMSClients[peerURL] = pdmsclient
}
17 changes: 17 additions & 0 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type PDClient interface {
GetRecoveringMark() (bool, error)
// GetMSMembers returns all PDMS members service-addr from cluster by specific Micro Service
GetMSMembers(service string) ([]string, error)
// GetMSPrimary returns the primary PDMS member service-addr from cluster by specific Micro Service
GetMSPrimary(service string) (string, error)
}

var (
Expand Down Expand Up @@ -341,6 +343,21 @@ func (c *pdClient) GetMSMembers(service string) ([]string, error) {
return addrs, nil
}

func (c *pdClient) GetMSPrimary(service string) (string, error) {
apiURL := fmt.Sprintf("%s/%s/primary/%s", c.url, MicroServicePrefix, service)
body, err := httputil.GetBodyOK(c.httpClient, apiURL)
if err != nil {
return "", err
}
var primary string
err = json.Unmarshal(body, &primary)
if err != nil {
return "", err
}

return primary, nil
}

func (c *pdClient) getStores(apiURL string) (*StoresInfo, error) {
body, err := httputil.GetBodyOK(c.httpClient, apiURL)
if err != nil {
Expand Down
25 changes: 24 additions & 1 deletion pkg/pdapi/pdms_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package pdapi

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
Expand All @@ -27,10 +29,13 @@ import (
type PDMSClient interface {
// GetHealth returns ping result
GetHealth() error
// TransferPrimary transfers the primary to the newPrimary
TransferPrimary(newPrimary string) error
}

var (
pdMSHealthPrefix = "api/v1/health"
pdMSHealthPrefix = "api/v1/health"
pdMSPrimaryTransferPrefix = "api/v1/primary/transfer"
)

// pdMSClient is default implementation of PDClient
Expand Down Expand Up @@ -69,3 +74,21 @@ func (c *pdMSClient) GetHealth() error {
}
return nil
}

func (c *pdMSClient) TransferPrimary(newPrimary string) error {
apiURL := fmt.Sprintf("%s/%s/%s", c.url, c.serviceName, pdMSPrimaryTransferPrefix)
data, err := json.Marshal(struct {
NewPrimary string `json:"new_primary"`
}{
NewPrimary: newPrimary,
})
if err != nil {
return err
}
_, err = httputil.PostBodyOK(c.httpClient, apiURL, bytes.NewBuffer(data))
if err != nil {
return err
}

return nil
}

0 comments on commit dbe75c0

Please sign in to comment.