Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: ensure pods unaffected when upgrading #955

Merged
merged 14 commits into from
Oct 16, 2019
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/fatih/camelcase v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-openapi/spec v0.19.2
github.com/go-sql-driver/mysql v1.4.0
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
Expand Down
128 changes: 109 additions & 19 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"os/exec"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/ghodss/yaml"
// To register MySQL driver
_ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
Expand Down Expand Up @@ -490,6 +492,15 @@ func (oa *operatorActions) CleanOperatorOrDie(info *OperatorConfig) {

func (oa *operatorActions) UpgradeOperator(info *OperatorConfig) error {
glog.Infof("upgrading tidb-operator %s", info.ReleaseName)

listOptions := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(
label.New().Labels()).String(),
}
pods1, err := oa.kubeCli.CoreV1().Pods(metav1.NamespaceAll).List(listOptions)
if err != nil {
return err
}
if err := oa.checkoutTag(info.Tag); err != nil {
return err
}
Expand All @@ -502,7 +513,66 @@ func (oa *operatorActions) UpgradeOperator(info *OperatorConfig) error {
if err != nil {
return fmt.Errorf("failed to upgrade operator to: %s, %v, %s", info.Image, err, string(res))
}
return nil

// ensure pods unchanged when upgrading operator
waitFn := func() (done bool, err error) {
pods2, err := oa.kubeCli.CoreV1().Pods(metav1.NamespaceAll).List(listOptions)
if err != nil {
glog.Error(err)
return false, nil
}

err = ensurePodsUnchanged(pods1, pods2)
if err != nil {
return true, err
}

return false, nil
}

err = wait.Poll(oa.pollInterval, 5*time.Minute, waitFn)
if err == wait.ErrWaitTimeout {
return nil
}
return err
}

func ensurePodsUnchanged(pods1, pods2 *corev1.PodList) error {
pods1UIDs := getUIDs(pods1)
pods2UIDs := getUIDs(pods2)
pods1Yaml, err := yaml.Marshal(pods1)
if err != nil {
return err
}
pods2Yaml, err := yaml.Marshal(pods2)
if err != nil {
return err
}
if reflect.DeepEqual(pods1UIDs, pods2UIDs) {
glog.V(4).Infof("%s", string(pods1Yaml))
glog.V(4).Infof("%s", string(pods2Yaml))
glog.V(4).Infof("%v", pods1UIDs)
glog.V(4).Infof("%v", pods2UIDs)
glog.V(4).Infof("pods unchanged after operator upgraded")
return nil
}

glog.Infof("%s", string(pods1Yaml))
glog.Infof("%s", string(pods2Yaml))
glog.Infof("%v", pods1UIDs)
glog.Infof("%v", pods2UIDs)
return fmt.Errorf("some pods changed after operator upgraded")
}

func getUIDs(pods *corev1.PodList) []string {
arr := make([]string, 0, len(pods.Items))

for _, pod := range pods.Items {
arr = append(arr, string(pod.UID))
}

sort.Strings(arr)
return arr
}

func (oa *operatorActions) UpgradeOperatorOrDie(info *OperatorConfig) {
Expand Down Expand Up @@ -603,6 +673,8 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error {
var beforePVNames []string
for _, pv := range pvList.Items {
beforePVNames = append(beforePVNames, pv.GetName())
glog.V(4).Infof("%s, %s, %v", pv.Name, pv.Spec.PersistentVolumeReclaimPolicy, pv.Labels)
glog.V(4).Info(pv.Spec.ClaimRef)
}
glog.V(4).Info(beforePVNames)

Expand Down Expand Up @@ -634,24 +706,34 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error {
afterPVCNames = append(afterPVCNames, pvc.GetName())
}
glog.V(4).Info(afterPVCNames)

pvList, err = oa.kubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
var afterPVNames []string
for _, pv := range pvList.Items {
afterPVNames = append(afterPVNames, pv.GetName())
}
glog.V(4).Info(afterPVNames)

if !reflect.DeepEqual(beforePVCNames, afterPVCNames) {
return fmt.Errorf("pvc changed when we delete cluster: %s/%s, before: %v, after: %v",
ns, tcName, beforePVCNames, afterPVCNames)
}
if !reflect.DeepEqual(beforePVNames, afterPVNames) {
return fmt.Errorf("pv changed when we delete cluster: %s/%s, before: %v, after: %v",
ns, tcName, beforePVNames, afterPVNames)

waitPVFn := func() (done bool, err error) {
pvList, err = oa.kubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, nil
}
var afterPVNames []string
for _, pv := range pvList.Items {
afterPVNames = append(afterPVNames, pv.GetName())
}
glog.V(4).Info(afterPVNames)

if !reflect.DeepEqual(beforePVNames, afterPVNames) {
glog.Errorf("pv changed when we delete cluster: %s/%s, before: %v, after: %v",
ns, tcName, beforePVNames, afterPVNames)
return false, nil
}

return true, nil
}

err = wait.Poll(oa.pollInterval, DefaultPollTimeout, waitPVFn)
if err != nil {
return err
}

err = oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(getBackupDirPodName, &metav1.DeleteOptions{})
Expand Down Expand Up @@ -693,9 +775,11 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error {
return fmt.Errorf("failed to delete configmaps: %v, %s", err, string(res))
}

patchPVCmd := fmt.Sprintf("kubectl get pv | grep %s | grep %s | awk '{print $1}' | "+
patchPVCmd := fmt.Sprintf("kubectl get pv -l %s=%s,%s=%s,%s=%s | awk '{print $1}' | "+
"xargs -I {} kubectl patch pv {} -p '{\"spec\":{\"persistentVolumeReclaimPolicy\":\"Delete\"}}'",
info.Namespace, info.ClusterName)
label.ManagedByLabelKey, "tidb-operator",
label.NamespaceLabelKey, info.Namespace,
label.InstanceLabelKey, info.ClusterName)
glog.V(4).Info(patchPVCmd)
if res, err := exec.Command("/bin/sh", "-c", patchPVCmd).CombinedOutput(); err != nil {
return fmt.Errorf("failed to patch pv: %v, %s", err, string(res))
Expand Down Expand Up @@ -1061,6 +1145,8 @@ func (oa *operatorActions) CheckUpgrade(ctx context.Context, info *TidbClusterCo
replicas := tc.TiKVRealReplicas()
for i := replicas - 1; i >= 0; i-- {
if err := wait.PollImmediate(1*time.Second, 10*time.Minute, func() (done bool, err error) {
podName := fmt.Sprintf("%s-tikv-%d", tcName, i)
scheduler := fmt.Sprintf("evict-leader-scheduler-%s", findStoreFn(tc, podName))
schedulers, err := pdClient.GetEvictLeaderSchedulers()
if err != nil {
glog.Errorf("failed to get evict leader schedulers, %v", err)
Expand All @@ -1069,13 +1155,17 @@ func (oa *operatorActions) CheckUpgrade(ctx context.Context, info *TidbClusterCo
glog.V(4).Infof("index:%d,schedulers:%v,error:%v", i, schedulers, err)
if len(schedulers) > 1 {
glog.Errorf("there are too many evict leader schedulers: %v", schedulers)
for _, s := range schedulers {
if s == scheduler {
glog.Infof("found scheudler: %s", scheduler)
return true, nil
}
}
return false, nil
}
if len(schedulers) == 0 {
return false, nil
}
podName := fmt.Sprintf("%s-tikv-%d", tcName, i)
scheduler := fmt.Sprintf("evict-leader-scheduler-%s", findStoreFn(tc, podName))
if schedulers[0] == scheduler {
glog.Infof("index: %d,the schedulers: %s = %s", i, schedulers[0], scheduler)
return true, nil
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,6 @@ func (tc *TidbClusterConfig) BuildSubValues(path string) (string, error) {
if err != nil {
return "", err
}
glog.Infof("subValues:\n %s", subValues)
glog.V(4).Infof("subValues:\n %s", subValues)
return subVaulesPath, nil
}
7 changes: 3 additions & 4 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ func run() {
time.Sleep(30 * time.Second)
oa.CheckTidbClustersAvailableOrDie([]*tests.TidbClusterConfig{cluster})
// rollback conf
cluster.PDPreStartScript = strconv.Quote("")
cluster.TiKVPreStartScript = strconv.Quote("")
cluster.TiDBPreStartScript = strconv.Quote("")
cluster.PDPreStartScript = strconv.Quote("# noop")
cluster.TiKVPreStartScript = strconv.Quote("# noop")
cluster.TiDBPreStartScript = strconv.Quote("# noop")
oa.UpgradeTidbClusterOrDie(cluster)
// wait upgrade complete
oa.CheckUpgradeOrDie(ctx, cluster)
Expand Down Expand Up @@ -343,7 +343,6 @@ func run() {
ocfg.Image = cfg.UpgradeOperatorImage
ocfg.Tag = cfg.UpgradeOperatorTag
oa.UpgradeOperatorOrDie(ocfg)
time.Sleep(5 * time.Minute)
postUpgrade := []*tests.TidbClusterConfig{
cluster3,
cluster1,
Expand Down