Skip to content

Commit

Permalink
add scheduled-backup test case (#322)
Browse files Browse the repository at this point in the history
* add scheduled-backup test case
  • Loading branch information
shuijing198799 authored and weekface committed Mar 19, 2019
1 parent 9d33e26 commit 61652fc
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 26 deletions.
227 changes: 213 additions & 14 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

_ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
"github.com/pingcap/errors"
pingcapErrors "github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
Expand All @@ -39,8 +39,10 @@ import (
"k8s.io/api/apps/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)
Expand All @@ -64,9 +66,7 @@ func NewOperatorActions(cli versioned.Interface, kubeCli kubernetes.Interface, l
const (
DefaultPollTimeout time.Duration = 10 * time.Minute
DefaultPollInterval time.Duration = 10 * time.Second
)

const (
getBackupDirPodName = "get-backup-dir"
grafanaUsername = "admin"
grafanaPassword = "admin"
)
Expand All @@ -93,6 +93,7 @@ type OperatorActions interface {
CheckRestore(from *TidbClusterInfo, to *TidbClusterInfo) error
ForceDeploy(info *TidbClusterInfo) error
CreateSecret(info *TidbClusterInfo) error
getBackupDir(info *TidbClusterInfo) ([]string, error)
}

type FaultTriggerActions interface {
Expand Down Expand Up @@ -135,6 +136,7 @@ type OperatorInfo struct {
}

type TidbClusterInfo struct {
BackupPVC string
Namespace string
ClusterName string
OperatorTag string
Expand All @@ -143,6 +145,7 @@ type TidbClusterInfo struct {
TiDBImage string
StorageClassName string
Password string
InitSql string
RecordCount string
InsertBatchSize string
Resources map[string]string
Expand All @@ -153,9 +156,6 @@ type TidbClusterInfo struct {

func (tc *TidbClusterInfo) HelmSetString() string {

// add a database and table for test
initSql := `"create database record;use record;create table test(t char(32));"`

set := map[string]string{
"clusterName": tc.ClusterName,
"pd.storageClassName": tc.StorageClassName,
Expand All @@ -167,7 +167,7 @@ func (tc *TidbClusterInfo) HelmSetString() string {
"tikv.image": tc.TiKVImage,
"tidb.image": tc.TiDBImage,
"tidb.passwordSecretName": "set-secret",
"tidb.initSql": initSql,
"tidb.initSql": tc.InitSql,
"monitor.create": strconv.FormatBool(tc.Monitor),
}

Expand Down Expand Up @@ -272,6 +272,7 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterInfo) error {
info.ClusterName,
fmt.Sprintf("%s-backup", info.ClusterName),
fmt.Sprintf("%s-restore", info.ClusterName),
fmt.Sprintf("%s-scheduler-backup", info.ClusterName),
}
for _, chartName := range charts {
res, err := exec.Command("helm", "del", "--purge", chartName).CombinedOutput()
Expand All @@ -281,6 +282,12 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterInfo) error {
}
}

err := oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(getBackupDirPodName, &metav1.DeleteOptions{})

if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete dir pod %v", err)
}

setStr := label.New().Instance(info.ClusterName).String()

resources := []string{"pvc"}
Expand Down Expand Up @@ -411,7 +418,7 @@ func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterInfo) error {
glog.Info("[SCALE] " + cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
return errors.Wrapf(err, "failed to scale tidb cluster: %s", string(res))
return pingcapErrors.Wrapf(err, "failed to scale tidb cluster: %s", string(res))
}
return nil
}
Expand All @@ -422,7 +429,7 @@ func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterInfo) error {
glog.Info("[UPGRADE] " + cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
return errors.Wrapf(err, "failed to upgrade tidb cluster: %s", string(res))
return pingcapErrors.Wrapf(err, "failed to upgrade tidb cluster: %s", string(res))
}
return nil
}
Expand Down Expand Up @@ -1052,7 +1059,7 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error {
}()
sets := map[string]string{
"clusterName": info.ClusterName,
"name": "test-backup",
"name": info.BackupPVC,
"mode": "backup",
"user": "root",
"password": info.Password,
Expand All @@ -1076,6 +1083,7 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error {
if err != nil {
return fmt.Errorf("failed to launch adhoc backup job: %v, %s", err, string(res))
}

return nil
}

Expand All @@ -1085,7 +1093,7 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterInfo) error {
glog.Infof("deploy clean backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace)
}()

jobName := fmt.Sprintf("%s-%s", info.ClusterName, "test-backup")
jobName := fmt.Sprintf("%s-%s", info.ClusterName, info.BackupPVC)
fn := func() (bool, error) {
job, err := oa.kubeCli.BatchV1().Jobs(info.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
Expand All @@ -1104,6 +1112,7 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterInfo) error {
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v", err)
}

return nil
}

Expand All @@ -1114,7 +1123,7 @@ func (oa *operatorActions) Restore(from *TidbClusterInfo, to *TidbClusterInfo) e
}()
sets := map[string]string{
"clusterName": to.ClusterName,
"name": "test-backup",
"name": to.BackupPVC,
"mode": "restore",
"user": "root",
"password": to.Password,
Expand Down Expand Up @@ -1148,7 +1157,7 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterInfo, to *TidbClusterIn
glog.Infof("check restore end cluster[%s] namespace[%s]", to.ClusterName, to.Namespace)
}()

jobName := fmt.Sprintf("%s-restore-test-backup", to.ClusterName)
jobName := fmt.Sprintf("%s-restore-%s", to.ClusterName, from.BackupPVC)
fn := func() (bool, error) {
job, err := oa.kubeCli.BatchV1().Jobs(to.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -1268,13 +1277,203 @@ func releaseIsExist(err error) bool {
}

func (oa *operatorActions) DeployScheduledBackup(info *TidbClusterInfo) error {
glog.Infof("begin to deploy scheduled backup")
defer func() {
glog.Infof("deploy shceduled backup end")
}()

cron := fmt.Sprintf("'*/1 * * * *'")
sets := map[string]string{
"clusterName": info.ClusterName,
"scheduledBackup.create": "true",
"scheduledBackup.user": "root",
"scheduledBackup.password": info.Password,
"scheduledBackup.schedule": cron,
"scheduledBackup.storage": "10Gi",
}
var buffer bytes.Buffer
for k, v := range sets {
set := fmt.Sprintf(" --set %s=%s", k, v)
_, err := buffer.WriteString(set)
if err != nil {
return err
}
}

setStr := buffer.String()

cmd := fmt.Sprintf("helm upgrade %s /charts/%s/tidb-cluster %s",
info.ClusterName, info.OperatorTag, setStr)

glog.Infof("scheduled-backup delploy [%s]", cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v, %s", err, string(res))
}
return nil
}

func (oa *operatorActions) CheckScheduledBackup(info *TidbClusterInfo) error {
glog.Infof("begin to check scheduler backup cluster[%s] namespace[%s]", info.ClusterName, info.Namespace)
defer func() {
glog.Infof("deploy check scheduler end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace)
}()

jobName := fmt.Sprintf("%s-scheduled-backup", info.ClusterName)
fn := func() (bool, error) {
job, err := oa.kubeCli.BatchV1beta1().CronJobs(info.Namespace).Get(jobName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get cronjobs %s ,%v", jobName, err)
return false, nil
}

jobs, err := oa.kubeCli.BatchV1().Jobs(info.Namespace).List(metav1.ListOptions{})
if err != nil {
glog.Errorf("failed to list jobs %s ,%v", info.Namespace, err)
return false, nil
}

backupJobs := []batchv1.Job{}
for _, j := range jobs.Items {
if pid, found := getParentUIDFromJob(j); found && pid == job.UID {
backupJobs = append(backupJobs, j)
}
}

if len(backupJobs) == 0 {
glog.Errorf("cluster [%s] scheduler jobs is creating, please wait!", info.ClusterName)
return false, nil
}

for _, j := range backupJobs {
if j.Status.Succeeded == 0 {
glog.Errorf("cluster [%s] back up job is not completed, please wait! ", info.ClusterName)
return false, nil
}
}

return true, nil
}

err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v", err)
}

// sleep 1 minute for cronjob
time.Sleep(60 * time.Second)

dirs, err := oa.getBackupDir(info)
if err != nil {
return fmt.Errorf("failed to get backup dir: %v", err)
}

if len(dirs) != 3 {
return fmt.Errorf("scheduler job failed!")
}

return nil
}

func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) {
controllerRef := metav1.GetControllerOf(&j)

if controllerRef == nil {
return types.UID(""), false
}

if controllerRef.Kind != "CronJob" {
glog.Infof("Job with non-CronJob parent, name %s namespace %s", j.Name, j.Namespace)
return types.UID(""), false
}

return controllerRef.UID, true
}

func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) ([]string, error) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: getBackupDirPodName,
Namespace: info.Namespace,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: getBackupDirPodName,
Image: "pingcap/tidb-cloud-backup:latest",
Command: []string{"sleep", "3000"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "data",
MountPath: "/data",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "data",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: info.BackupPVC,
},
},
},
},
},
}

fn := func() (bool, error) {
_, err := oa.kubeCli.CoreV1().Pods(info.Namespace).Get(getBackupDirPodName, metav1.GetOptions{})
if !errors.IsNotFound(err) {
return false, nil
}
return true, nil
}

err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)

if err != nil {
return nil, fmt.Errorf("failed to delete pod %s", getBackupDirPodName)
}

_, err = oa.kubeCli.CoreV1().Pods(info.Namespace).Create(pod)
if err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("cluster: [%s/%s] create get backup dir pod failed, error :%v", info.Namespace, info.ClusterName, err)
return nil, err
}

fn = func() (bool, error) {
_, err := oa.kubeCli.CoreV1().Pods(info.Namespace).Get(getBackupDirPodName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, nil
}
return true, nil
}

err = wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)

if err != nil {
return nil, fmt.Errorf("failed to create pod %s", getBackupDirPodName)
}

cmd := fmt.Sprintf("kubectl exec %s -n %s ls /data", getBackupDirPodName, info.Namespace)
glog.Infof(cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
glog.Errorf("cluster:[%s/%s] exec :%s failed,error:%v,result:%s", info.Namespace, info.ClusterName, cmd, err, res)
return nil, err
}

dirs := strings.Split(string(res), "\n")
glog.Infof("dirs in pod info name [%s] dir name [%s]", info.BackupPVC, strings.Join(dirs, ","))
return dirs, nil
}

func (info *TidbClusterInfo) FullName() string {
return fmt.Sprintf("%s/%s", info.Namespace, info.ClusterName)
}

func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterInfo, to *TidbClusterInfo) error {
return nil
}
Expand Down
Loading

0 comments on commit 61652fc

Please sign in to comment.