Skip to content

Commit

Permalink
Automated cherry pick of #955: ensure pods unchanged when upgrading (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
weekface authored Dec 19, 2019
1 parent c7491ef commit 67c5190
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/fatih/camelcase v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-sql-driver/mysql v1.4.0
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/groupcache v0.0.0-20180513044358-24b0969c4cb7 // indirect
Expand Down
128 changes: 109 additions & 19 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"os/exec"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/ghodss/yaml"
// To register MySQL driver
_ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
Expand Down Expand Up @@ -493,6 +495,15 @@ func (oa *operatorActions) CleanOperatorOrDie(info *OperatorConfig) {

func (oa *operatorActions) UpgradeOperator(info *OperatorConfig) error {
glog.Infof("upgrading tidb-operator %s", info.ReleaseName)

listOptions := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(
label.New().Labels()).String(),
}
pods1, err := oa.kubeCli.CoreV1().Pods(metav1.NamespaceAll).List(listOptions)
if err != nil {
return err
}
if err := oa.checkoutTag(info.Tag); err != nil {
return err
}
Expand All @@ -505,7 +516,66 @@ func (oa *operatorActions) UpgradeOperator(info *OperatorConfig) error {
if err != nil {
return fmt.Errorf("failed to upgrade operator to: %s, %v, %s", info.Image, err, string(res))
}
return nil

// ensure pods unchanged when upgrading operator
waitFn := func() (done bool, err error) {
pods2, err := oa.kubeCli.CoreV1().Pods(metav1.NamespaceAll).List(listOptions)
if err != nil {
glog.Error(err)
return false, nil
}

err = ensurePodsUnchanged(pods1, pods2)
if err != nil {
return true, err
}

return false, nil
}

err = wait.Poll(oa.pollInterval, 5*time.Minute, waitFn)
if err == wait.ErrWaitTimeout {
return nil
}
return err
}

func ensurePodsUnchanged(pods1, pods2 *corev1.PodList) error {
pods1UIDs := getUIDs(pods1)
pods2UIDs := getUIDs(pods2)
pods1Yaml, err := yaml.Marshal(pods1)
if err != nil {
return err
}
pods2Yaml, err := yaml.Marshal(pods2)
if err != nil {
return err
}
if reflect.DeepEqual(pods1UIDs, pods2UIDs) {
glog.V(4).Infof("%s", string(pods1Yaml))
glog.V(4).Infof("%s", string(pods2Yaml))
glog.V(4).Infof("%v", pods1UIDs)
glog.V(4).Infof("%v", pods2UIDs)
glog.V(4).Infof("pods unchanged after operator upgraded")
return nil
}

glog.Infof("%s", string(pods1Yaml))
glog.Infof("%s", string(pods2Yaml))
glog.Infof("%v", pods1UIDs)
glog.Infof("%v", pods2UIDs)
return fmt.Errorf("some pods changed after operator upgraded")
}

func getUIDs(pods *corev1.PodList) []string {
arr := make([]string, 0, len(pods.Items))

for _, pod := range pods.Items {
arr = append(arr, string(pod.UID))
}

sort.Strings(arr)
return arr
}

func (oa *operatorActions) UpgradeOperatorOrDie(info *OperatorConfig) {
Expand Down Expand Up @@ -606,6 +676,8 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error {
var beforePVNames []string
for _, pv := range pvList.Items {
beforePVNames = append(beforePVNames, pv.GetName())
glog.V(4).Infof("%s, %s, %v", pv.Name, pv.Spec.PersistentVolumeReclaimPolicy, pv.Labels)
glog.V(4).Info(pv.Spec.ClaimRef)
}
glog.V(4).Info(beforePVNames)

Expand Down Expand Up @@ -634,24 +706,34 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error {
afterPVCNames = append(afterPVCNames, pvc.GetName())
}
glog.V(4).Info(afterPVCNames)

pvList, err = oa.kubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
var afterPVNames []string
for _, pv := range pvList.Items {
afterPVNames = append(afterPVNames, pv.GetName())
}
glog.V(4).Info(afterPVNames)

if !reflect.DeepEqual(beforePVCNames, afterPVCNames) {
return fmt.Errorf("pvc changed when we delete cluster: %s/%s, before: %v, after: %v",
ns, tcName, beforePVCNames, afterPVCNames)
}
if !reflect.DeepEqual(beforePVNames, afterPVNames) {
return fmt.Errorf("pv changed when we delete cluster: %s/%s, before: %v, after: %v",
ns, tcName, beforePVNames, afterPVNames)

waitPVFn := func() (done bool, err error) {
pvList, err = oa.kubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, nil
}
var afterPVNames []string
for _, pv := range pvList.Items {
afterPVNames = append(afterPVNames, pv.GetName())
}
glog.V(4).Info(afterPVNames)

if !reflect.DeepEqual(beforePVNames, afterPVNames) {
glog.Errorf("pv changed when we delete cluster: %s/%s, before: %v, after: %v",
ns, tcName, beforePVNames, afterPVNames)
return false, nil
}

return true, nil
}

err = wait.Poll(oa.pollInterval, DefaultPollTimeout, waitPVFn)
if err != nil {
return err
}

err = oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(info.GenerateBackupDirPodName(), &metav1.DeleteOptions{})
Expand Down Expand Up @@ -686,9 +768,11 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error {
return fmt.Errorf("failed to delete configmaps: %v, %s", err, string(res))
}

patchPVCmd := fmt.Sprintf("kubectl get pv | grep %s | grep %s | awk '{print $1}' | "+
patchPVCmd := fmt.Sprintf("kubectl get pv -l %s=%s,%s=%s,%s=%s | awk '{print $1}' | "+
"xargs -I {} kubectl patch pv {} -p '{\"spec\":{\"persistentVolumeReclaimPolicy\":\"Delete\"}}'",
info.Namespace, info.ClusterName)
label.ManagedByLabelKey, "tidb-operator",
label.NamespaceLabelKey, info.Namespace,
label.InstanceLabelKey, info.ClusterName)
glog.V(4).Info(patchPVCmd)
if res, err := exec.Command("/bin/sh", "-c", patchPVCmd).CombinedOutput(); err != nil {
return fmt.Errorf("failed to patch pv: %v, %s", err, string(res))
Expand Down Expand Up @@ -1050,6 +1134,8 @@ func (oa *operatorActions) CheckUpgrade(ctx context.Context, info *TidbClusterCo
replicas := tc.TiKVRealReplicas()
for i := replicas - 1; i >= 0; i-- {
if err := wait.PollImmediate(1*time.Second, 10*time.Minute, func() (done bool, err error) {
podName := fmt.Sprintf("%s-tikv-%d", tcName, i)
scheduler := fmt.Sprintf("evict-leader-scheduler-%s", findStoreFn(tc, podName))
schedulers, err := pdClient.GetEvictLeaderSchedulers()
if err != nil {
glog.Errorf("failed to get evict leader schedulers, %v", err)
Expand All @@ -1058,13 +1144,17 @@ func (oa *operatorActions) CheckUpgrade(ctx context.Context, info *TidbClusterCo
glog.V(4).Infof("index:%d,schedulers:%v,error:%v", i, schedulers, err)
if len(schedulers) > 1 {
glog.Errorf("there are too many evict leader schedulers: %v", schedulers)
for _, s := range schedulers {
if s == scheduler {
glog.Infof("found scheudler: %s", scheduler)
return true, nil
}
}
return false, nil
}
if len(schedulers) == 0 {
return false, nil
}
podName := fmt.Sprintf("%s-tikv-%d", tcName, i)
scheduler := fmt.Sprintf("evict-leader-scheduler-%s", findStoreFn(tc, podName))
if schedulers[0] == scheduler {
glog.Infof("index: %d,the schedulers: %s = %s", i, schedulers[0], scheduler)
return true, nil
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,6 @@ func (tc *TidbClusterConfig) BuildSubValues(path string) (string, error) {
if err != nil {
return "", err
}
glog.Infof("subValues:\n %s", subValues)
glog.V(4).Infof("subValues:\n %s", subValues)
return subVaulesPath, nil
}
7 changes: 3 additions & 4 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func run() {
time.Sleep(30 * time.Second)
oa.CheckTidbClustersAvailableOrDie([]*tests.TidbClusterConfig{cluster})
// rollback conf
cluster.PDPreStartScript = strconv.Quote("")
cluster.TiKVPreStartScript = strconv.Quote("")
cluster.TiDBPreStartScript = strconv.Quote("")
cluster.PDPreStartScript = strconv.Quote("# noop")
cluster.TiKVPreStartScript = strconv.Quote("# noop")
cluster.TiDBPreStartScript = strconv.Quote("# noop")
oa.UpgradeTidbClusterOrDie(cluster)
// wait upgrade complete
oa.CheckUpgradeCompleteOrDie(cluster)
Expand Down Expand Up @@ -319,7 +319,6 @@ func run() {
ocfg.Image = cfg.UpgradeOperatorImage
ocfg.Tag = cfg.UpgradeOperatorTag
oa.UpgradeOperatorOrDie(ocfg)
time.Sleep(5 * time.Minute)
postUpgrade := []*tests.TidbClusterConfig{
cluster3,
cluster1,
Expand Down

0 comments on commit 67c5190

Please sign in to comment.