Skip to content

Commit

Permalink
stability: add more checks for scale & upgrade (#327)
Browse files Browse the repository at this point in the history
stability: add more checks for scale & upgrade
  • Loading branch information
zyguan authored and weekface committed Mar 22, 2019
1 parent a845bbb commit f89131a
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 1 deletion.
253 changes: 253 additions & 0 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"net/url"
"os/exec"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -81,7 +82,10 @@ type OperatorActions interface {
BeginInsertDataTo(info *TidbClusterInfo) error
StopInsertDataTo(info *TidbClusterInfo) error
ScaleTidbCluster(info *TidbClusterInfo) error
CheckScaleInSafely(info *TidbClusterInfo) error
CheckScaledCorrectly(info *TidbClusterInfo, podUIDsBeforeScale map[string]types.UID) error
UpgradeTidbCluster(info *TidbClusterInfo) error
CheckUpgradeProgress(info *TidbClusterInfo) error
DeployAdHocBackup(info *TidbClusterInfo) error
CheckAdHocBackup(info *TidbClusterInfo) error
DeployScheduledBackup(info *TidbClusterInfo) error
Expand All @@ -92,6 +96,8 @@ type OperatorActions interface {
CheckRestore(from *TidbClusterInfo, to *TidbClusterInfo) error
ForceDeploy(info *TidbClusterInfo) error
CreateSecret(info *TidbClusterInfo) error
GetPodUIDMap(info *TidbClusterInfo) (map[string]types.UID, error)
GetNodeMap(info *TidbClusterInfo, component string) (map[string][]string, error)
getBackupDir(info *TidbClusterInfo) ([]string, error)
}

Expand Down Expand Up @@ -434,6 +440,63 @@ func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterInfo) error {
return nil
}

func (oa *operatorActions) CheckScaleInSafely(info *TidbClusterInfo) error {
return wait.Poll(DefaultPollInterval, DefaultPollTimeout, func() (done bool, err error) {
tc, err := oa.cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{})
if err != nil {
glog.Infof("failed to get tidbcluster when scale in tidbcluster, error: %v", err)
return false, nil
}

tikvSetName := controller.TiKVMemberName(info.ClusterName)
tikvSet, err := oa.kubeCli.AppsV1beta1().StatefulSets(info.Namespace).Get(tikvSetName, metav1.GetOptions{})
if err != nil {
glog.Infof("failed to get tikvSet statefulset: [%s], error: %v", tikvSetName, err)
return false, nil
}

pdClient := controller.NewDefaultPDControl().GetPDClient(tc)
stores, err := pdClient.GetStores()
if err != nil {
glog.Infof("pdClient.GetStores failed,error: %v", err)
return false, nil
}
if len(stores.Stores) > int(*tikvSet.Spec.Replicas) {
glog.Infof("stores.Stores: %v", stores.Stores)
glog.Infof("tikvSet.Spec.Replicas: %d", *tikvSet.Spec.Replicas)
return false, fmt.Errorf("the tikvSet.Spec.Replicas may reduce before tikv complete offline")
}

if *tikvSet.Spec.Replicas == tc.Spec.TiKV.Replicas {
return true, nil
}

return false, nil
})
}

func (oa *operatorActions) CheckScaledCorrectly(info *TidbClusterInfo, podUIDsBeforeScale map[string]types.UID) error {
return wait.Poll(DefaultPollInterval, DefaultPollTimeout, func() (done bool, err error) {
podUIDs, err := oa.GetPodUIDMap(info)
if err != nil {
glog.Infof("failed to get pd pods's uid, error: %v", err)
return false, nil
}

if len(podUIDsBeforeScale) == len(podUIDs) {
return false, fmt.Errorf("the length of pods before scale equals the length of pods after scale")
}

for podName, uidAfter := range podUIDs {
if uidBefore, ok := podUIDsBeforeScale[podName]; ok && uidBefore != uidAfter {
return false, fmt.Errorf("pod: [%s] have be recreated", podName)
}
}

return true, nil
})
}

func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterInfo) error {
cmd := fmt.Sprintf("helm upgrade %s %s --set-string %s",
info.ClusterName, chartPath("tidb-cluster", info.OperatorTag), info.TidbClusterHelmSetString(nil))
Expand All @@ -445,6 +508,146 @@ func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterInfo) error {
return nil
}

func (oa *operatorActions) CheckUpgradeProgress(info *TidbClusterInfo) error {
return wait.Poll(DefaultPollInterval, DefaultPollTimeout, func() (done bool, err error) {
tc, err := oa.cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{})
if err != nil {
glog.Infof("failed to get tidbcluster: [%s], error: %v", info.ClusterName, err)
return false, nil
}

pdSetName := controller.PDMemberName(info.ClusterName)
pdSet, err := oa.kubeCli.AppsV1beta1().StatefulSets(info.Namespace).Get(pdSetName, metav1.GetOptions{})
if err != nil {
glog.Infof("failed to get pd statefulset: [%s], error: %v", pdSetName, err)
return false, nil
}

tikvSetName := controller.TiKVMemberName(info.ClusterName)
tikvSet, err := oa.kubeCli.AppsV1beta1().StatefulSets(info.Namespace).Get(tikvSetName, metav1.GetOptions{})
if err != nil {
glog.Infof("failed to get tikvSet statefulset: [%s], error: %v", tikvSetName, err)
return false, nil
}

tidbSetName := controller.TiDBMemberName(info.ClusterName)
tidbSet, err := oa.kubeCli.AppsV1beta1().StatefulSets(info.Namespace).Get(tidbSetName, metav1.GetOptions{})
if err != nil {
glog.Infof("failed to get tidbSet statefulset: [%s], error: %v", tidbSetName, err)
return false, nil
}

imageUpgraded := func(memberType v1alpha1.MemberType, set *v1beta1.StatefulSet) bool {
image := ""
switch memberType {
case v1alpha1.PDMemberType:
image = tc.Spec.PD.Image
case v1alpha1.TiKVMemberType:
image = tc.Spec.TiKV.Image
case v1alpha1.TiDBMemberType:
image = tc.Spec.TiDB.Image
}
memberName := string(memberType)
c, ok := getComponentContainer(set)
if !ok || c.Image != image {
glog.Infof("check %s image: getContainer(set).Image(%s) != tc.Spec.%s.Image(%s)",
memberName, c.Image, strings.ToUpper(memberName), image)
}
return ok && c.Image == image
}
setUpgraded := func(set *v1beta1.StatefulSet) bool {
return set.Generation <= *set.Status.ObservedGeneration && set.Status.CurrentRevision == set.Status.UpdateRevision
}

// check upgrade order
if tc.Status.PD.Phase == v1alpha1.UpgradePhase {
glog.Infof("pd is upgrading")
if tc.Status.TiKV.Phase == v1alpha1.UpgradePhase {
return false, pingcapErrors.New("tikv is upgrading while pd is upgrading")
}
if tc.Status.TiDB.Phase == v1alpha1.UpgradePhase {
return false, pingcapErrors.New("tidb is upgrading while pd is upgrading")
}
if !imageUpgraded(v1alpha1.PDMemberType, pdSet) {
return false, pingcapErrors.New("pd image is not updated while pd is upgrading")
}
if !setUpgraded(pdSet) {
if imageUpgraded(v1alpha1.TiKVMemberType, tikvSet) {
return false, pingcapErrors.New("tikv image is updated while pd is upgrading")
}
if imageUpgraded(v1alpha1.TiDBMemberType, tidbSet) {
return false, pingcapErrors.New("tidb image is updated while pd is upgrading")
}
}
return false, nil
} else if tc.Status.TiKV.Phase == v1alpha1.UpgradePhase {
glog.Infof("tikv is upgrading")
if tc.Status.TiDB.Phase == v1alpha1.UpgradePhase {
return false, pingcapErrors.New("tidb is upgrading while tikv is upgrading")
}
if !imageUpgraded(v1alpha1.PDMemberType, pdSet) {
return false, pingcapErrors.New("pd image is not updated while tikv is upgrading")
}
if !setUpgraded(pdSet) {
return false, pingcapErrors.New("pd stateful set is not upgraded while tikv is upgrading")
}
if !imageUpgraded(v1alpha1.TiKVMemberType, tikvSet) {
return false, pingcapErrors.New("tikv image is not updated while tikv is upgrading")
}
if !setUpgraded(tikvSet) {
if imageUpgraded(v1alpha1.TiDBMemberType, tidbSet) {
return false, pingcapErrors.New("tidb image is updated while tikv is upgrading")
}
}
return false, nil
} else if tc.Status.TiDB.Phase == v1alpha1.UpgradePhase {
glog.Infof("tidb is upgrading")
if !imageUpgraded(v1alpha1.PDMemberType, pdSet) {
return false, pingcapErrors.New("pd image is not updated while tidb is upgrading")
}
if !setUpgraded(pdSet) {
return false, pingcapErrors.New("pd stateful set is not upgraded while tidb is upgrading")
}
if !imageUpgraded(v1alpha1.TiKVMemberType, tikvSet) {
return false, pingcapErrors.New("tikv image is not updated while tidb is upgrading")
}
if !setUpgraded(tikvSet) {
return false, pingcapErrors.New("tikv stateful set is not upgraded while tidb is upgrading")
}
if !imageUpgraded(v1alpha1.TiDBMemberType, tidbSet) {
return false, pingcapErrors.New("tidb image is not updated while tikv is upgrading")
}
return false, nil
}

// check pd final state
if !imageUpgraded(v1alpha1.PDMemberType, pdSet) {
return false, nil
}
if !setUpgraded(pdSet) {
glog.Infof("check pd stateful set upgraded failed")
return false, nil
}
// check tikv final state
if !imageUpgraded(v1alpha1.TiKVMemberType, tikvSet) {
return false, nil
}
if !setUpgraded(tikvSet) {
glog.Infof("check tikv stateful set upgraded failed")
return false, nil
}
// check tidb final state
if !imageUpgraded(v1alpha1.TiDBMemberType, tidbSet) {
return false, nil
}
if !setUpgraded(tidbSet) {
glog.Infof("check tidb stateful set upgraded failed")
return false, nil
}
return true, nil
})
}

func (oa *operatorActions) DeployMonitor(info *TidbClusterInfo) error { return nil }
func (oa *operatorActions) CleanMonitor(info *TidbClusterInfo) error { return nil }

Expand Down Expand Up @@ -609,6 +812,11 @@ func (oa *operatorActions) tidbMembersReadyFn(tc *v1alpha1.TidbCluster) (bool, e
ns, tidbSetName, tidbSet.Status.ReadyReplicas, replicas)
return false, nil
}
if len(tc.Status.TiDB.Members) != int(tc.Spec.TiDB.Replicas) {
glog.Infof("tidbcluster: %s/%s .status.TiDB.Members count(%d) != %d",
ns, tcName, len(tc.Status.TiDB.Members), tc.Spec.TiDB.Replicas)
return false, nil
}
if tidbSet.Status.ReadyReplicas != tidbSet.Status.Replicas {
glog.Infof("statefulset: %s/%s .status.ReadyReplicas(%d) != .status.Replicas(%d)",
ns, tidbSetName, tidbSet.Status.ReadyReplicas, tidbSet.Status.Replicas)
Expand All @@ -625,6 +833,11 @@ func (oa *operatorActions) tidbMembersReadyFn(tc *v1alpha1.TidbCluster) (bool, e
glog.Errorf("failed to get service: %s/%s", ns, tidbSetName)
return false, nil
}
_, err = oa.kubeCli.CoreV1().Services(ns).Get(controller.TiDBPeerMemberName(tcName), metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get peer service: %s/%s", ns, controller.TiDBPeerMemberName(tcName))
return false, nil
}

return true, nil
}
Expand Down Expand Up @@ -1635,3 +1848,43 @@ func (oa *operatorActions) drainerHealth(info *TidbClusterInfo, hostName string)
}
return len(healths.PumpPos) > 0 && healths.Synced
}

func (oa *operatorActions) GetPodUIDMap(info *TidbClusterInfo) (map[string]types.UID, error) {
result := map[string]types.UID{}

selector, err := label.New().Instance(info.ClusterName).Selector()
if err != nil {
return nil, err
}
pods, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
result[pod.GetName()] = pod.GetUID()
}

return result, nil
}

func (oa *operatorActions) GetNodeMap(info *TidbClusterInfo, component string) (map[string][]string, error) {
nodeMap := make(map[string][]string)
selector := label.New().Instance(info.ClusterName).Component(component).Labels()
podList, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(selector).String(),
})
if err != nil {
return nil, err
}

for _, pod := range podList.Items {
nodeName := pod.Spec.NodeName
if len(nodeMap[nodeName]) == 0 {
nodeMap[nodeName] = make([]string, 0)
}
nodeMap[nodeName] = append(nodeMap[nodeName], pod.GetName())
sort.Strings(nodeMap[nodeName])
}

return nodeMap, nil
}
2 changes: 1 addition & 1 deletion tests/pkg/workload/ddl/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (w *DDLWorkload) Enter() error {
return errors.New("already in ddl workload context")
}
w.ctx, w.cancel = context.WithCancel(context.Background())
go internal.Run(w.ctx, w.DSN, w.Concurrency, w.Tables, false, internal.SerialDDLTest)
go internal.Run(w.ctx, w.DSN, w.Concurrency, w.Tables, false, internal.ParallelDDLTest)
return nil
}

Expand Down

0 comments on commit f89131a

Please sign in to comment.