diff --git a/tests/actions.go b/tests/actions.go index 857c742c2b..25f452570c 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -21,6 +21,7 @@ import ( "net/http" "net/url" "os/exec" + "sort" "strconv" "strings" "time" @@ -81,7 +82,10 @@ type OperatorActions interface { BeginInsertDataTo(info *TidbClusterInfo) error StopInsertDataTo(info *TidbClusterInfo) error ScaleTidbCluster(info *TidbClusterInfo) error + CheckScaleInSafely(info *TidbClusterInfo) error + CheckScaledCorrectly(info *TidbClusterInfo, podUIDsBeforeScale map[string]types.UID) error UpgradeTidbCluster(info *TidbClusterInfo) error + CheckUpgradeProgress(info *TidbClusterInfo) error DeployAdHocBackup(info *TidbClusterInfo) error CheckAdHocBackup(info *TidbClusterInfo) error DeployScheduledBackup(info *TidbClusterInfo) error @@ -92,6 +96,8 @@ type OperatorActions interface { CheckRestore(from *TidbClusterInfo, to *TidbClusterInfo) error ForceDeploy(info *TidbClusterInfo) error CreateSecret(info *TidbClusterInfo) error + GetPodUIDMap(info *TidbClusterInfo) (map[string]types.UID, error) + GetNodeMap(info *TidbClusterInfo, component string) (map[string][]string, error) getBackupDir(info *TidbClusterInfo) ([]string, error) } @@ -434,6 +440,63 @@ func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterInfo) error { return nil } +func (oa *operatorActions) CheckScaleInSafely(info *TidbClusterInfo) error { + return wait.Poll(DefaultPollInterval, DefaultPollTimeout, func() (done bool, err error) { + tc, err := oa.cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{}) + if err != nil { + glog.Infof("failed to get tidbcluster when scale in tidbcluster, error: %v", err) + return false, nil + } + + tikvSetName := controller.TiKVMemberName(info.ClusterName) + tikvSet, err := oa.kubeCli.AppsV1beta1().StatefulSets(info.Namespace).Get(tikvSetName, metav1.GetOptions{}) + if err != nil { + glog.Infof("failed to get tikvSet statefulset: [%s], error: %v", tikvSetName, err) + return false, nil + } + + pdClient := controller.NewDefaultPDControl().GetPDClient(tc) + stores, err := pdClient.GetStores() + if err != nil { + glog.Infof("pdClient.GetStores failed,error: %v", err) + return false, nil + } + if len(stores.Stores) > int(*tikvSet.Spec.Replicas) { + glog.Infof("stores.Stores: %v", stores.Stores) + glog.Infof("tikvSet.Spec.Replicas: %d", *tikvSet.Spec.Replicas) + return false, fmt.Errorf("the tikvSet.Spec.Replicas may reduce before tikv complete offline") + } + + if *tikvSet.Spec.Replicas == tc.Spec.TiKV.Replicas { + return true, nil + } + + return false, nil + }) +} + +func (oa *operatorActions) CheckScaledCorrectly(info *TidbClusterInfo, podUIDsBeforeScale map[string]types.UID) error { + return wait.Poll(DefaultPollInterval, DefaultPollTimeout, func() (done bool, err error) { + podUIDs, err := oa.GetPodUIDMap(info) + if err != nil { + glog.Infof("failed to get pd pods's uid, error: %v", err) + return false, nil + } + + if len(podUIDsBeforeScale) == len(podUIDs) { + return false, fmt.Errorf("the length of pods before scale equals the length of pods after scale") + } + + for podName, uidAfter := range podUIDs { + if uidBefore, ok := podUIDsBeforeScale[podName]; ok && uidBefore != uidAfter { + return false, fmt.Errorf("pod: [%s] have be recreated", podName) + } + } + + return true, nil + }) +} + func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterInfo) error { cmd := fmt.Sprintf("helm upgrade %s %s --set-string %s", info.ClusterName, chartPath("tidb-cluster", info.OperatorTag), info.TidbClusterHelmSetString(nil)) @@ -445,6 +508,146 @@ func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterInfo) error { return nil } +func (oa *operatorActions) CheckUpgradeProgress(info *TidbClusterInfo) error { + return wait.Poll(DefaultPollInterval, DefaultPollTimeout, func() (done bool, err error) { + tc, err := oa.cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{}) + if err != nil { + glog.Infof("failed to get tidbcluster: [%s], error: %v", info.ClusterName, err) + return false, nil + } + + pdSetName := controller.PDMemberName(info.ClusterName) + pdSet, err := oa.kubeCli.AppsV1beta1().StatefulSets(info.Namespace).Get(pdSetName, metav1.GetOptions{}) + if err != nil { + glog.Infof("failed to get pd statefulset: [%s], error: %v", pdSetName, err) + return false, nil + } + + tikvSetName := controller.TiKVMemberName(info.ClusterName) + tikvSet, err := oa.kubeCli.AppsV1beta1().StatefulSets(info.Namespace).Get(tikvSetName, metav1.GetOptions{}) + if err != nil { + glog.Infof("failed to get tikvSet statefulset: [%s], error: %v", tikvSetName, err) + return false, nil + } + + tidbSetName := controller.TiDBMemberName(info.ClusterName) + tidbSet, err := oa.kubeCli.AppsV1beta1().StatefulSets(info.Namespace).Get(tidbSetName, metav1.GetOptions{}) + if err != nil { + glog.Infof("failed to get tidbSet statefulset: [%s], error: %v", tidbSetName, err) + return false, nil + } + + imageUpgraded := func(memberType v1alpha1.MemberType, set *v1beta1.StatefulSet) bool { + image := "" + switch memberType { + case v1alpha1.PDMemberType: + image = tc.Spec.PD.Image + case v1alpha1.TiKVMemberType: + image = tc.Spec.TiKV.Image + case v1alpha1.TiDBMemberType: + image = tc.Spec.TiDB.Image + } + memberName := string(memberType) + c, ok := getComponentContainer(set) + if !ok || c.Image != image { + glog.Infof("check %s image: getContainer(set).Image(%s) != tc.Spec.%s.Image(%s)", + memberName, c.Image, strings.ToUpper(memberName), image) + } + return ok && c.Image == image + } + setUpgraded := func(set *v1beta1.StatefulSet) bool { + return set.Generation <= *set.Status.ObservedGeneration && set.Status.CurrentRevision == set.Status.UpdateRevision + } + + // check upgrade order + if tc.Status.PD.Phase == v1alpha1.UpgradePhase { + glog.Infof("pd is upgrading") + if tc.Status.TiKV.Phase == v1alpha1.UpgradePhase { + return false, pingcapErrors.New("tikv is upgrading while pd is upgrading") + } + if tc.Status.TiDB.Phase == v1alpha1.UpgradePhase { + return false, pingcapErrors.New("tidb is upgrading while pd is upgrading") + } + if !imageUpgraded(v1alpha1.PDMemberType, pdSet) { + return false, pingcapErrors.New("pd image is not updated while pd is upgrading") + } + if !setUpgraded(pdSet) { + if imageUpgraded(v1alpha1.TiKVMemberType, tikvSet) { + return false, pingcapErrors.New("tikv image is updated while pd is upgrading") + } + if imageUpgraded(v1alpha1.TiDBMemberType, tidbSet) { + return false, pingcapErrors.New("tidb image is updated while pd is upgrading") + } + } + return false, nil + } else if tc.Status.TiKV.Phase == v1alpha1.UpgradePhase { + glog.Infof("tikv is upgrading") + if tc.Status.TiDB.Phase == v1alpha1.UpgradePhase { + return false, pingcapErrors.New("tidb is upgrading while tikv is upgrading") + } + if !imageUpgraded(v1alpha1.PDMemberType, pdSet) { + return false, pingcapErrors.New("pd image is not updated while tikv is upgrading") + } + if !setUpgraded(pdSet) { + return false, pingcapErrors.New("pd stateful set is not upgraded while tikv is upgrading") + } + if !imageUpgraded(v1alpha1.TiKVMemberType, tikvSet) { + return false, pingcapErrors.New("tikv image is not updated while tikv is upgrading") + } + if !setUpgraded(tikvSet) { + if imageUpgraded(v1alpha1.TiDBMemberType, tidbSet) { + return false, pingcapErrors.New("tidb image is updated while tikv is upgrading") + } + } + return false, nil + } else if tc.Status.TiDB.Phase == v1alpha1.UpgradePhase { + glog.Infof("tidb is upgrading") + if !imageUpgraded(v1alpha1.PDMemberType, pdSet) { + return false, pingcapErrors.New("pd image is not updated while tidb is upgrading") + } + if !setUpgraded(pdSet) { + return false, pingcapErrors.New("pd stateful set is not upgraded while tidb is upgrading") + } + if !imageUpgraded(v1alpha1.TiKVMemberType, tikvSet) { + return false, pingcapErrors.New("tikv image is not updated while tidb is upgrading") + } + if !setUpgraded(tikvSet) { + return false, pingcapErrors.New("tikv stateful set is not upgraded while tidb is upgrading") + } + if !imageUpgraded(v1alpha1.TiDBMemberType, tidbSet) { + return false, pingcapErrors.New("tidb image is not updated while tikv is upgrading") + } + return false, nil + } + + // check pd final state + if !imageUpgraded(v1alpha1.PDMemberType, pdSet) { + return false, nil + } + if !setUpgraded(pdSet) { + glog.Infof("check pd stateful set upgraded failed") + return false, nil + } + // check tikv final state + if !imageUpgraded(v1alpha1.TiKVMemberType, tikvSet) { + return false, nil + } + if !setUpgraded(tikvSet) { + glog.Infof("check tikv stateful set upgraded failed") + return false, nil + } + // check tidb final state + if !imageUpgraded(v1alpha1.TiDBMemberType, tidbSet) { + return false, nil + } + if !setUpgraded(tidbSet) { + glog.Infof("check tidb stateful set upgraded failed") + return false, nil + } + return true, nil + }) +} + func (oa *operatorActions) DeployMonitor(info *TidbClusterInfo) error { return nil } func (oa *operatorActions) CleanMonitor(info *TidbClusterInfo) error { return nil } @@ -609,6 +812,11 @@ func (oa *operatorActions) tidbMembersReadyFn(tc *v1alpha1.TidbCluster) (bool, e ns, tidbSetName, tidbSet.Status.ReadyReplicas, replicas) return false, nil } + if len(tc.Status.TiDB.Members) != int(tc.Spec.TiDB.Replicas) { + glog.Infof("tidbcluster: %s/%s .status.TiDB.Members count(%d) != %d", + ns, tcName, len(tc.Status.TiDB.Members), tc.Spec.TiDB.Replicas) + return false, nil + } if tidbSet.Status.ReadyReplicas != tidbSet.Status.Replicas { glog.Infof("statefulset: %s/%s .status.ReadyReplicas(%d) != .status.Replicas(%d)", ns, tidbSetName, tidbSet.Status.ReadyReplicas, tidbSet.Status.Replicas) @@ -625,6 +833,11 @@ func (oa *operatorActions) tidbMembersReadyFn(tc *v1alpha1.TidbCluster) (bool, e glog.Errorf("failed to get service: %s/%s", ns, tidbSetName) return false, nil } + _, err = oa.kubeCli.CoreV1().Services(ns).Get(controller.TiDBPeerMemberName(tcName), metav1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get peer service: %s/%s", ns, controller.TiDBPeerMemberName(tcName)) + return false, nil + } return true, nil } @@ -1635,3 +1848,43 @@ func (oa *operatorActions) drainerHealth(info *TidbClusterInfo, hostName string) } return len(healths.PumpPos) > 0 && healths.Synced } + +func (oa *operatorActions) GetPodUIDMap(info *TidbClusterInfo) (map[string]types.UID, error) { + result := map[string]types.UID{} + + selector, err := label.New().Instance(info.ClusterName).Selector() + if err != nil { + return nil, err + } + pods, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return nil, err + } + for _, pod := range pods.Items { + result[pod.GetName()] = pod.GetUID() + } + + return result, nil +} + +func (oa *operatorActions) GetNodeMap(info *TidbClusterInfo, component string) (map[string][]string, error) { + nodeMap := make(map[string][]string) + selector := label.New().Instance(info.ClusterName).Component(component).Labels() + podList, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(selector).String(), + }) + if err != nil { + return nil, err + } + + for _, pod := range podList.Items { + nodeName := pod.Spec.NodeName + if len(nodeMap[nodeName]) == 0 { + nodeMap[nodeName] = make([]string, 0) + } + nodeMap[nodeName] = append(nodeMap[nodeName], pod.GetName()) + sort.Strings(nodeMap[nodeName]) + } + + return nodeMap, nil +} diff --git a/tests/pkg/workload/ddl/workload.go b/tests/pkg/workload/ddl/workload.go index 23e4ca2229..066b02348a 100644 --- a/tests/pkg/workload/ddl/workload.go +++ b/tests/pkg/workload/ddl/workload.go @@ -41,7 +41,7 @@ func (w *DDLWorkload) Enter() error { return errors.New("already in ddl workload context") } w.ctx, w.cancel = context.WithCancel(context.Background()) - go internal.Run(w.ctx, w.DSN, w.Concurrency, w.Tables, false, internal.SerialDDLTest) + go internal.Run(w.ctx, w.DSN, w.Concurrency, w.Tables, false, internal.ParallelDDLTest) return nil }